cutlass.pipeline#
- class cutlass.pipeline.Agent(value)#
Bases:
EnumAgent indicates what is participating in the pipeline synchronization.
- Thread = 1#
- ThreadBlock = 2#
- ThreadBlockCluster = 3#
- class cutlass.pipeline.CooperativeGroup(
- agent: Agent,
- size: int = 1,
- alignment=None,
Bases:
objectCooperativeGroup contains size and alignment restrictions for an Agent.
- class cutlass.pipeline.PipelineOp(value)#
Bases:
EnumPipelineOp assigns an operation to an agent corresponding to a specific hardware feature.
- AsyncThread = 1#
- TCGen05Mma = 2#
- TmaLoad = 3#
- ClcLoad = 4#
- TmaStore = 5#
- Composite = 6#
- AsyncLoad = 7#
- class cutlass.pipeline.SyncObject#
Bases:
ABCAbstract base class for hardware synchronization primitives.
This class defines the interface for different types of hardware synchronization mechanisms including shared memory barriers, named barriers, and fences.
- abstract arrive() None#
- abstract wait() None#
- abstract arrive_and_wait() None#
- abstract arrive_and_drop() None#
- abstract get_barrier() cutlass.cute.typing.Pointer | int | None#
- abstract max() int | None#
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.MbarrierArray#
Bases:
SyncObjectMbarrierArray implements an abstraction for an array of smem barriers.
- __init__(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
- *,
- loc=None,
- ip=None,
- recast_to_new_op_type(
- new_op_type: PipelineOp,
Creates a copy of MbarrierArray with a different op_type without re-initializing barriers
- mbarrier_init(*, loc=None, ip=None) None#
Initializes an array of mbarriers using warp 0.
- arrive(
- index: int,
- dst: int,
- cta_group: CtaGroup | None = None,
- *,
- loc=None,
- ip=None,
Select the arrive corresponding to this MbarrierArray’s PipelineOp.
- Parameters:
index (int) – Index of the mbarrier in the array to arrive on
dst (int | None) – Destination parameter for selective arrival, which can be either a mask or destination cta rank. When None, both
TCGen05MmaandAsyncThreadwill arrive on their local mbarrier. - ForTCGen05Mma,dstserves as a multicast mask (e.g., 0b1011 allows arrive signal to be multicast to CTAs in the cluster with rank = 0, 1, and 3). - ForAsyncThread,dstserves as a destination cta rank (e.g., 3 means threads will arrive on the mbarrier with rank = 3 in the cluster).cta_group (
cute.nvgpu.tcgen05.CtaGroup, optional) – CTA group forTCGen05Mma, defaults to None for other op types
- arrive_mbarrier(
- index: int,
- dst_rank: int | None = None,
- *,
- loc=None,
- ip=None,
- arrive_cp_async_mbarrier(
- index: int,
- *,
- loc=None,
- ip=None,
- arrive_and_expect_tx(
- index: int,
- tx_count: int,
- *,
- loc=None,
- ip=None,
- arrive_and_expect_tx_with_dst(
- index: int,
- tx_count: int,
- dst: int | None = None,
- *,
- loc=None,
- ip=None,
- try_wait(
- index: int,
- phase: int,
- *,
- loc=None,
- ip=None,
- wait(
- index: int,
- phase: int,
- *,
- loc=None,
- ip=None,
- arrive_and_wait(
- index: int,
- phase: int,
- dst: int,
- cta_group: CtaGroup | None = None,
- *,
- loc=None,
- ip=None,
- arrive_and_drop(*, loc=None, ip=None) None#
- get_barrier(
- index: int,
- *,
- loc=None,
- ip=None,
- max() int#
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.NamedBarrier(barrier_id: int, num_threads: int)#
Bases:
SyncObjectNamedBarrier is an abstraction for named barriers managed by hardware. There are 16 named barriers available, with barrier_ids 0-15.
See the PTX documentation.
- barrier_id: int#
- num_threads: int#
- arrive(*, loc=None, ip=None) None#
The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.
- arrive_unaligned(*, loc=None, ip=None) None#
The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.
- wait(*, loc=None, ip=None) None#
NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.
- wait_unaligned(*, loc=None, ip=None) None#
- arrive_and_wait(*, loc=None, ip=None) None#
- arrive_and_drop(*, loc=None, ip=None) None#
- sync(*, loc=None, ip=None) None#
- get_barrier(*, loc=None, ip=None) int#
- max() int#
- __init__(barrier_id: int, num_threads: int) None#
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.PipelineOrder(
- sync_object_full: SyncObject,
- depth: int,
- length: int,
- group_id: int,
- state: PipelineState,
Bases:
objectPipelineOrder is used for managing ordered pipeline execution with multiple groups.
This class implements a pipeline ordering mechanism where work is divided into groups and stages, allowing for controlled progression through pipeline stages with proper synchronization between different groups.
The pipeline ordering works as follows: - The pipeline is divided into ‘length’ number of groups - Each group has ‘depth’ number of stages - Groups execute in a specific order with synchronization barriers - Each group waits for the previous group to complete before proceeding
Example:
# Create pipeline order with 3 groups, each with 2 stages pipeline_order = PipelineOrder.create( barrier_storage=smem_ptr, # shared memory pointer for barriers depth=2, # 2 stages per group length=3, # 3 groups total group_id=0, # current group ID (0, 1, or 2) producer_group=producer_warp # cooperative group for producers ) # In the pipeline loop for stage in range(num_stages): pipeline_order.wait() # Wait for previous group to complete # Process current stage pipeline_order.arrive() # Signal completion to next group
- sync_object_full: SyncObject#
- depth: int#
- length: int#
- group_id: int#
- state: PipelineState#
- static create(
- barrier_storage: cutlass.cute.typing.Pointer,
- depth: int,
- length: int,
- group_id: int,
- producer_group: CooperativeGroup,
- defer_sync: bool = False,
- get_barrier_for_current_stage_idx(group_id)#
- arrive(*, loc=None, ip=None)#
- wait(*, loc=None, ip=None)#
- __init__(
- sync_object_full: SyncObject,
- depth: int,
- length: int,
- group_id: int,
- state: PipelineState,
- class cutlass.pipeline.TmaStoreFence(num_stages: int = 0)#
Bases:
SyncObjectTmaStoreFence is used for a multi-stage epilogue buffer.
- __init__(num_stages: int = 0) None#
- arrive(*, loc=None, ip=None) None#
- wait(*, loc=None, ip=None) None#
- arrive_and_wait(*, loc=None, ip=None) None#
- arrive_and_drop(*, loc=None, ip=None) None#
- get_barrier(*, loc=None, ip=None) None#
- max() None#
- tail(*, loc=None, ip=None) None#
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.PipelineUserType(value)#
Bases:
EnumAn enumeration.
- Producer = 1#
- Consumer = 2#
- ProducerConsumer = 3#
- class cutlass.pipeline.PipelineState(stages: int, count, index, phase)#
Bases:
objectPipeline state contains an index and phase bit corresponding to the current position in the circular buffer.
- __init__(stages: int, count, index, phase)#
- clone() PipelineState#
- property index: cutlass.cutlass_dsl.Int32#
- property count: cutlass.cutlass_dsl.Int32#
- property stages: int#
- property phase: cutlass.cutlass_dsl.Int32#
- reset_count(*, loc=None, ip=None)#
- advance(*, loc=None, ip=None) None#
- reverse(*, loc=None, ip=None)#
- class cutlass.pipeline.PipelineAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
objectPipelineAsync is a generic pipeline class where both the producer and consumer are AsyncThreads. It also serves as a base class for specialized pipeline classes.
This class implements a producer-consumer pipeline pattern where both sides operate asynchronously. The pipeline maintains synchronization state using barrier objects to coordinate between producer and consumer threads.
The pipeline state transitions of one pipeline entry(mbarrier) can be represented as:
Table 2 Pipeline State Transitions# Barrier
State
p.acquire
p.commit
c.wait
c.release
empty_bar
empty
<Return>
n/a
n/a
empty_bar
wait
<Block>
n/a
n/a
-> empty
full_bar
wait
n/a
-> full
<Block >
n/a
full_bar
full
n/a
<Return>
n/a
Where:
p: producer
c: consumer
<Block>: This action is blocked until transition to a state allow it to proceed by other side - e.g.
p.acquire()is blocked untilempty_bartransition toemptystate byc.release()
Array of mbarriers as circular buffer: Advance Direction <------------------- Producer Consumer | ^ V | +-----------------+ --|X|X|W|D|D|D|D|R|X|<-. / +-----------------+ \ | | `------------------------'Where:
X: Empty buffer (initial state)
W: Producer writing (producer is waiting for buffer to be empty)
D: Data ready (producer has written data to buffer)
R: Consumer reading (consumer is consuming data from buffer)
Example:
# Create pipeline with 5 stages pipeline = PipelineAsync.create( num_stages=5, # number of pipeline stages producer_group=producer_warp, consumer_group=consumer_warp barrier_storage=smem_ptr, # smem pointer for array of mbarriers in shared memory ) producer, consumer = pipeline.make_participants() # Producer side for i in range(num_iterations): handle = producer.acquire_and_advance() # Wait for buffer to be empty & Move index to next stage # Write data to pipeline buffer handle.commit() # Signal buffer is full # Consumer side for i in range(num_iterations): handle = consumer.wait_and_advance() # Wait for buffer to be full & Move index to next stage # Read data from pipeline buffer handle.release() # Signal buffer is empty
- sync_object_full: SyncObject#
- sync_object_empty: SyncObject#
- num_stages: int#
- producer_mask: cutlass.cutlass_dsl.Int32 | None#
- consumer_mask: cutlass.cutlass_dsl.Int32 | None#
- static _make_sync_object(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
Returns a SyncObject corresponding to an agent’s PipelineOp.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- defer_sync: bool = False,
Creates and initializes a new PipelineAsync instance.
This helper function computes necessary attributes and returns an instance of PipelineAsync with the specified configuration for producer and consumer synchronization.
- Parameters:
barrier_storage (cute.Pointer) – Pointer to the shared memory address for this pipeline’s mbarriers
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) –
CooperativeGroupfor the producer agentconsumer_group (CooperativeGroup) –
CooperativeGroupfor the consumer agentproducer_mask (Int32, optional) – Mask for signaling arrives for the producer agent
consumer_mask (Int32, optional) – Mask for signaling arrives for the consumer agent
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- Returns:
A new
PipelineAsyncinstance- Return type:
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
- producer_try_acquire(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- producer_commit(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- consumer_wait(
- state: PipelineState,
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
- consumer_try_wait(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- consumer_release(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- producer_get_barrier(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- producer_tail(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.
- Parameters:
state (PipelineState) – The pipeline state that points to next useful buffer
- make_producer(*, loc=None, ip=None)#
- make_consumer(*, loc=None, ip=None)#
- make_participants(*, loc=None, ip=None)#
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineCpAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
PipelineAsyncPipelineCpAsync is used for CpAsync producers and AsyncThread consumers
- static create(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: cutlass.cutlass_dsl.Int32,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- defer_sync: bool = False,
Helper function that computes necessary attributes and returns a
PipelineCpAsyncinstance.- Parameters:
barrier_storage (cute.Pointer) – Pointer to the shared memory address for this pipeline’s mbarriers
num_stages (Int32) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) –
CooperativeGroupfor the producer agentconsumer_group (CooperativeGroup) –
CooperativeGroupfor the consumer agentproducer_mask (Int32, optional) – Mask for signaling arrives for the producer agent, defaults to None
consumer_mask (Int32, optional) – Mask for signaling arrives for the consumer agent, defaults to None
- Returns:
A new
PipelineCpAsyncinstance configured with the provided parameters- Return type:
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineTmaAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
Bases:
PipelineAsyncPipelineTmaAsync is used for TMA producers and AsyncThread consumers (e.g. Hopper mainloops).
- is_signalling_thread: cutlass.cutlass_dsl.Boolean#
- static init_empty_barrier_arrive_signal(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- tidx: cutlass.cutlass_dsl.Int32,
- mcast_mode_mn: tuple[int, int] = (1, 1),
Initialize the empty barrier arrive signal.
This function determines which threads should signal empty barrier arrives based on the cluster layout and multicast modes. It returns the destination CTA rank and whether the current thread should signal.
- Parameters:
cta_layout_vmnk (cute.Layout) – Layout describing the cluster shape and CTA arrangement
tidx (Int32) – Thread index within the warp
mcast_mode_mn (tuple[int, int]) – Tuple specifying multicast modes for m and n dimensions (each 0 or 1), defaults to (1,1)
- Raises:
AssertionError – If both multicast modes are disabled (0,0)
- Returns:
Tuple containing destination CTA rank and boolean indicating if current thread signals
- Return type:
tuple[Int32, Boolean]
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- tidx: cutlass.cutlass_dsl.Int32 | None = None,
- mcast_mode_mn: tuple[int, int] = (1, 1),
- defer_sync: bool = False,
Create a new
PipelineTmaAsyncinstance.- Parameters:
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) –
CooperativeGroupfor the producer agentconsumer_group (CooperativeGroup) –
CooperativeGroupfor the consumer agenttx_count (int) – Number of bytes expected to be written to the transaction barrier for one stage
barrier_storage (cute.Pointer, optional) – Pointer to the shared memory address for this pipeline’s mbarriers, defaults to None
cta_layout_vmnk (cute.Layout, optional) – Layout of the cluster shape, defaults to None
tidx (Int32, optional) – Thread index to consumer async threads, defaults to None
mcast_mode_mn (tuple[int, int], optional) – Tuple specifying multicast modes for m and n dimensions (each 0 or 1), defaults to (1,1)
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- Returns:
New
PipelineTmaAsyncinstance- Return type:
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
TMA producer commit conditionally waits on buffer empty and sets the transaction barrier.
- producer_commit(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- consumer_release(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
TMA consumer release conditionally signals the empty buffer to the producer.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
- class cutlass.pipeline.PipelineTmaUmma(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- cta_group: CtaGroup,
Bases:
PipelineAsyncPipelineTmaUmma is used for TMA producers and UMMA consumers (e.g. Blackwell mainloops).
- is_leader_cta: bool#
- _make_sync_object(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
- *,
- loc=None,
- ip=None,
Returns a SyncObject corresponding to an agent’s PipelineOp.
- _compute_mcast_arrival_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- mcast_mode_mn: tuple[int, int],
- *,
- loc=None,
- ip=None,
Computes a mask for signaling arrivals to multicasting threadblocks.
- _compute_is_leader_cta(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- *,
- loc=None,
- ip=None,
Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.
- create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- mcast_mode_mn: tuple[int, int] = (1, 1),
- defer_sync: bool = False,
- loc=None,
- ip=None,
Creates and initializes a new PipelineTmaUmma instance.
- Parameters:
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) – CooperativeGroup for the producer agent
consumer_group (CooperativeGroup) – CooperativeGroup for the consumer agent
tx_count (int) – Number of bytes expected to be written to the transaction barrier for one stage
barrier_storage (cute.Pointer, optional) – Pointer to the shared memory address for this pipeline’s mbarriers
cta_layout_vmnk (cute.Layout, optional) – Layout of the cluster shape
mcast_mode_mn (tuple[int, int], optional) – Tuple specifying multicast modes for m and n dimensions (each 0 or 1)
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- Returns:
A new PipelineTmaUmma instance configured with the provided parameters
- Return type:
- consumer_release(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
UMMA consumer release buffer empty, cta_group needs to be provided.
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
TMA producer commit conditionally waits on buffer empty and sets the transaction barrier for leader threadblocks.
- producer_commit(
- state: PipelineState,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineAsyncUmma(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
Bases:
PipelineAsyncPipelineAsyncUmma is used for AsyncThread producers and UMMA consumers (e.g. Blackwell input fusion pipelines).
- _compute_leading_cta_rank(
- cta_v_size,
- *,
- loc=None,
- ip=None,
Computes the leading CTA rank.
- _compute_is_leader_cta(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- *,
- loc=None,
- ip=None,
Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.
- _compute_peer_cta_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- *,
- loc=None,
- ip=None,
Computes a mask for signaling arrivals to multicasting threadblocks.
- create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- defer_sync: bool = False,
- loc=None,
- ip=None,
Creates and initializes a new PipelineAsyncUmma instance.
- Parameters:
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) – CooperativeGroup for the producer agent
consumer_group (CooperativeGroup) – CooperativeGroup for the consumer agent
barrier_storage (cute.Pointer, optional) – Pointer to the shared memory address for this pipeline’s mbarriers
cta_layout_vmnk (cute.Layout, optional) – Layout of the cluster shape
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- Returns:
A new PipelineAsyncUmma instance configured with the provided parameters
- Return type:
- consumer_release(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
UMMA consumer release buffer empty, cta_group needs to be provided.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineUmmaAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
Bases:
PipelineAsyncPipelineUmmaAsync is used for UMMA producers and AsyncThread consumers (e.g. Blackwell accumulator pipelines).
- _compute_tmem_sync_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- *,
- loc=None,
- ip=None,
Computes a mask to signal completion of tmem buffers for 2CTA kernels.
- _compute_peer_cta_rank(*, loc=None, ip=None)#
Computes a mask to signal release of tmem buffers for 2CTA kernels.
- create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- defer_sync: bool = False,
- loc=None,
- ip=None,
Creates an instance of PipelineUmmaAsync with computed attributes.
- Parameters:
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) –
CooperativeGroupfor the producer agentconsumer_group (CooperativeGroup) –
CooperativeGroupfor the consumer agentbarrier_storage (cute.Pointer, optional) – Pointer to the shared memory address for this pipeline’s mbarriers
cta_layout_vmnk (cute.Layout, optional) – Layout of the cluster shape
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- Returns:
New instance of
PipelineUmmaAsync- Return type:
- producer_commit(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
UMMA producer commit buffer full, cta_group needs to be provided.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineClcFetchAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
Bases:
objectPipelineClcFetchAsync implements a producer-consumer pipeline for Cluster Launch Control based dynamic scheduling. Both producer and consumer operate asynchronously using barrier synchronization to coordinate across pipeline stages and cluster CTAs.
Producer: waits for empty buffer, signals full barrier with transection bytes across all CTAs in cluster, hardware autosignals each CTA’s mbarrier when transaction bytes are written, then the satte advance to next buffer slot.
Consumer: waits for full barrier, then load respinse from local SMEM, then sigals CTA 0’s empty barrier to allow buffer reuse.
- sync_object_full: SyncObject#
- sync_object_empty: SyncObject#
- num_stages: int#
- producer_mask: cutlass.cutlass_dsl.Int32 | None#
- consumer_mask: cutlass.cutlass_dsl.Int32 | None#
- is_signalling_thread: cutlass.cutlass_dsl.Boolean#
- static _init_full_barrier_arrive_signal(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- tidx: cutlass.cutlass_dsl.Int32,
Computes producer barrier signaling parameters, returns destination CTA rank (0 to cluster_size-1) based on thread ID, and a boolean flag indicating if this thread participates in signaling.
- Parameters:
cta_layout_vmnk – Cluster layout defining CTA count
tidx – Thread ID within the CTA
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- defer_sync: bool = False,
This helper function computes any necessary attributes and returns an instance of PipelineClcFetchAsync. :param barrier_storage: Pointer to the shared memory address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: int :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param producer_mask: Mask for signaling arrives for the producer agent, defaults to
None:type producer_mask: Int32, optional :param consumer_mask: Mask for signaling arrives for the consumer agent, defaults toNone:type consumer_mask: Int32, optional
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Producer acquire waits for empty buffer and sets transaction expectation on full barrier.
- Parameters:
state – Pipeline state pointing to the current buffer stage
try_acquire_token – Optional token to skip the empty barrier wait
- consumer_wait(
- state: PipelineState,
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Consumer waits for full barrier to be signaled by hardware multicast.
- Parameters:
state – Pipeline state pointing to the current buffer stage
try_wait_token – Optional token to skip the full barrier wait
- consumer_release(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- producer_get_barrier(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
- producer_tail(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Ensures all in-flight buffers are released before producer exits.
- Parameters:
state – Pipeline state with current position in the buffer
try_acquire_token – Optional token to skip the empty barrier waits
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
- class cutlass.pipeline.PipelineTmaMultiConsumersAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- sync_object_empty_umma: SyncObject,
- sync_object_empty_async: SyncObject,
- cta_group: CtaGroup,
Bases:
PipelineAsyncPipelineTmaMultiConsumersAsync is used for TMA producers and UMMA+Async consumers.
- is_leader_cta: bool#
- sync_object_empty_umma: SyncObject#
- sync_object_empty_async: SyncObject#
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group_umma: CooperativeGroup,
- consumer_group_async: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- defer_sync: bool = False,
This helper function computes any necessary attributes and returns an instance of PipelineTmaMultiConsumersAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group_umma: CooperativeGroup for the UMMA consumer agent :type consumer_group_umma: CooperativeGroup :param consumer_group_async: CooperativeGroup for the AsyncThread consumer agent :type consumer_group_async: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
TMA producer acquire waits on buffer empty and sets the transaction barrier for leader threadblocks.
- producer_commit(
- state: PipelineState,
- *,
- loc=None,
- ip=None,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- consumer_release(
- state: PipelineState,
- op_type: PipelineOp,
- *,
- loc=None,
- ip=None,
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- sync_object_empty_umma: SyncObject,
- sync_object_empty_async: SyncObject,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineTmaStore(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
PipelineAsyncPipelineTmaStore is used for synchronizing TMA stores in the epilogue. It does not use mbarriers.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
This helper function computes any necessary attributes and returns an instance of
PipelineTmaStore.- Parameters:
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) –
CooperativeGroupfor the producer agent
- Returns:
A new
PipelineTmaStoreinstance- Return type:
- producer_acquire(*, loc=None, ip=None)#
- producer_commit(*, loc=None, ip=None)#
- consumer_wait(*, loc=None, ip=None)#
- consumer_release(*, loc=None, ip=None)#
- producer_tail(*, loc=None, ip=None)#
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineProducer(
- pipeline,
- state,
- group: CooperativeGroup,
Bases:
objectA class representing a producer in an asynchronous pipeline.
This class manages the producer side of an asynchronous pipeline, handling synchronization and state management for producing data. It provides methods for acquiring, committing, and advancing through pipeline stages.
- Variables:
__pipeline – The asynchronous pipeline this producer belongs to
__state – The current state of the producer in the pipeline
__group – The cooperative group this producer operates in
Examples:
pipeline = PipelineAsync.create(...) producer, consumer = pipeline.make_participants() for i in range(iterations): # Try to acquire the current buffer without blocking try_acquire_token = producer.try_acquire() # Do something else independently ... # Wait for current buffer to be empty & Move index to next stage # If try_acquire_token is True, return immediately # If try_acquire_token is False, block until buffer is empty handle = producer.acquire_and_advance(try_acquire_token) # Produce data handle.commit()
- class ImmutableResourceHandle(
- _ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
- _ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
Bases:
ImmutableResourceHandle- property barrier#
Get the barrier pointer for the current pipeline stage.
- Returns:
Pointer to the barrier for the current stage
- Return type:
cute.Pointer
- commit(*, loc=None, ip=None)#
Signal that data production is complete for the current stage.
This allows consumers to start processing the data.
- __init__(
- _ImmutableResourceHandle__origin: PipelineAsync,
- _ImmutableResourceHandle__immutable_state: PipelineState,
- __init__(
- pipeline,
- state,
- group: CooperativeGroup,
Initialize a new Producer instance.
- Parameters:
pipeline (PipelineAsync) – The pipeline this producer belongs to
state (PipelineState) – Initial pipeline state
group (CooperativeGroup) – The cooperative group for synchronization
- __pipeline: PipelineAsync#
- __state: PipelineState#
- __group: CooperativeGroup#
- clone()#
Create a new Producer instance with the same state.
- reset(*, loc=None, ip=None)#
Reset the count of how many handles this producer has committed.
- acquire(
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Wait for the current buffer to be empty before producing data. This is a blocking operation.
- Parameters:
try_acquire_token (Optional[Boolean]) – Optional token to try to acquire the buffer
- Returns:
A handle to the producer for committing the data
- Return type:
- advance(*, loc=None, ip=None)#
Move to the next pipeline stage.
- acquire_and_advance(
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Acquire the current buffer and advance to the next pipeline stage.
This method combines the acquire() and advance() operations into a single call. It first waits for the current buffer to be empty before producing data, then advances the pipeline to the next stage.
- Parameters:
try_acquire_token (Optional[Boolean]) – Token indicating whether to try non-blocking acquire. If True, returns immediately without waiting. If False or None, blocks until buffer is empty.
- Returns:
A handle to the producer that can be used to commit data to the acquired buffer stage
- Return type:
- try_acquire(
- *,
- loc=None,
- ip=None,
Attempt to acquire the current buffer without blocking.
This method tries to acquire the current buffer stage for producing data without waiting. It can be used to check buffer availability before committing to a blocking acquire operation.
- Returns:
A boolean token indicating whether the buffer was successfully acquired
- Return type:
Boolean
- commit(
- handle: ImmutableResourceHandle | None = None,
- *,
- loc=None,
- ip=None,
Signal that data production is complete for the current stage.
This allows consumers to start processing the data.
- Parameters:
handle (Optional[ImmutableResourceHandle]) – Optional handle to commit, defaults to None
- Raises:
AssertionError – If provided handle does not belong to this producer
- tail(*, loc=None, ip=None)#
Ensure all used buffers are properly synchronized before producer exit.
This should be called before the producer finishes to avoid dangling signals.
- class cutlass.pipeline.PipelineConsumer(
- pipeline,
- state: PipelineState,
- group: CooperativeGroup,
Bases:
objectA class representing a consumer in an asynchronous pipeline.
The Consumer class manages the consumer side of an asynchronous pipeline, handling synchronization and state management for consuming data. It provides methods for waiting, releasing, and advancing through pipeline stages.
- Variables:
__pipeline – The asynchronous pipeline this consumer belongs to
__state – The current state of the consumer in the pipeline
__group – The cooperative group this consumer operates in
Examples:
pipeline = PipelineAsync.create(...) producer, consumer = pipeline.make_participants() for i in range(iterations): # Try to wait for buffer to be full try_wait_token = consumer.try_wait() # Do something else independently ... # Wait for buffer to be full & Move index to next stage # If try_wait_token is True, return immediately # If try_wait_token is False, block until buffer is full handle = consumer.wait_and_advance(try_wait_token) # Consume data handle.release( ) # Signal buffer is empty # Alternative way to do this is: # handle.release() # Signal buffer is empty
- class ImmutableResourceHandle(
- _ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
- _ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
Bases:
ImmutableResourceHandle- release(*, loc=None, ip=None)#
Signal that data production is complete for the current stage. This allows consumers to start processing the data.
- __init__(
- _ImmutableResourceHandle__origin: PipelineAsync,
- _ImmutableResourceHandle__immutable_state: PipelineState,
- __init__(
- pipeline,
- state: PipelineState,
- group: CooperativeGroup,
Initialize a new Consumer instance.
- Parameters:
pipeline (PipelineAsync) – The pipeline this consumer belongs to
state (PipelineState) – Initial pipeline state
group (CooperativeGroup) – The cooperative group for synchronization
- __pipeline: PipelineAsync#
- __group: CooperativeGroup#
- __state: PipelineState#
- clone()#
Create a new Consumer instance with the same state.
- reset(*, loc=None, ip=None)#
Reset the count of how many handles this consumer has consumed.
- wait(
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Wait for data to be ready in the current buffer. This is a blocking operation that will not return until data is available.
- Parameters:
try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.
- Returns:
An immutable handle to the consumer that can be used to release the buffer once data consumption is complete
- Return type:
- advance(*, loc=None, ip=None)#
Advance the consumer to the next pipeline stage.
This updates the internal state to point to the next buffer in the pipeline. Should be called after consuming data from the current buffer.
- wait_and_advance(
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
- *,
- loc=None,
- ip=None,
Atomically wait for data and advance to next pipeline stage.
This is a convenience method that combines wait() and advance() into a single atomic operation. It will block until data is available in the current buffer, then automatically advance to the next stage.
- Parameters:
try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.
- Returns:
An immutable handle to the consumer that can be used to release the buffer once data consumption is complete
- Return type:
- try_wait(
- *,
- loc=None,
- ip=None,
Non-blocking check if data is ready in the current buffer.
This method provides a way to test if data is available without blocking. Unlike wait(), this will return immediately regardless of buffer state.
- Returns:
True if data is ready to be consumed, False if the buffer is not yet ready
- Return type:
Boolean
- release(
- handle: ImmutableResourceHandle | None = None,
- *,
- loc=None,
- ip=None,
Signal that data consumption is complete for the current stage. This allows producers to start producing new data.
- cutlass.pipeline.make_pipeline_state(
- type: PipelineUserType,
- stages: int,
- *,
- loc=None,
- ip=None,
Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.
- cutlass.pipeline.pipeline_init_arrive(
- cluster_shape_mn: cutlass.cute.typing.Layout | None = None,
- is_relaxed: bool = False,
- *,
- loc=None,
- ip=None,
Fences the mbarrier_init and sends an arrive if using clusters.
- cutlass.pipeline.pipeline_init_wait(
- cluster_shape_mn: cutlass.cute.typing.Layout | None = None,
- *,
- loc=None,
- ip=None,
Syncs the threadblock or cluster
- cutlass.pipeline.agent_sync(
- group: Agent,
- is_relaxed: bool = False,
- *,
- loc=None,
- ip=None,
Syncs all threads within an agent.
- cutlass.pipeline.arrive(barrier_id: int, num_threads: int, *, loc=None, ip=None)#
The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.
- cutlass.pipeline.arrive_unaligned(
- barrier_id: int,
- num_threads: int,
- *,
- loc=None,
- ip=None,
The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.
- cutlass.pipeline.wait(*, loc=None, ip=None)#
NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.
- cutlass.pipeline.wait_unaligned(
- barrier_id: int,
- num_threads: int,
- *,
- loc=None,
- ip=None,
- cutlass.pipeline.arrive_and_wait(
- barrier_id: int,
- num_threads: int,
- *,
- loc=None,
- ip=None,
- cutlass.pipeline.sync(barrier_id: int = 0, *, loc=None, ip=None)#