cutlass.pipeline#
- class cutlass.pipeline.Agent(value)#
Bases:
Enum
Agent indicates what is participating in the pipeline synchronization.
- Thread = 1#
- ThreadBlock = 2#
- ThreadBlockCluster = 3#
- class cutlass.pipeline.CooperativeGroup(
- agent: Agent,
- size: int = 1,
- alignment=None,
Bases:
object
CooperativeGroup contains size and alignment restrictions for an Agent.
- class cutlass.pipeline.PipelineOp(value)#
Bases:
Enum
PipelineOp assigns an operation to an agent corresponding to a specific hardware feature.
- AsyncThread = 1#
- TCGen05Mma = 2#
- TmaLoad = 3#
- TmaStore = 4#
- Composite = 5#
- AsyncLoad = 6#
- class cutlass.pipeline.SyncObject#
Bases:
ABC
Abstract base class for hardware synchronization primitives.
This class defines the interface for different types of hardware synchronization mechanisms including shared memory barriers, named barriers, and fences.
- abstract arrive() None #
- abstract wait() None #
- abstract arrive_and_wait() None #
- abstract arrive_and_drop() None #
- abstract get_barrier() cutlass.cute.typing.Pointer | int | None #
- abstract max() int | None #
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.MbarrierArray(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
Bases:
SyncObject
MbarrierArray implements an abstraction for an array of smem barriers.
- __init__(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
- recast_to_new_op_type(
- new_op_type: PipelineOp,
Creates a copy of MbarrierArray with a different op_type without re-initializing barriers
- mbarrier_init() None #
Initializes an array of mbarriers using warp 0.
- arrive(
- index: int,
- dst: int,
- cta_group: CtaGroup | None = None,
Select the arrive corresponding to this MbarrierArray’s PipelineOp.
- Parameters:
index (int) – Index of the mbarrier in the array to arrive on
dst (int | None) – Destination parameter for selective arrival, which can be either a mask or destination cta rank. When None, both
TCGen05Mma
andAsyncThread
will arrive on their local mbarrier. - ForTCGen05Mma
,dst
serves as a multicast mask (e.g., 0b1011 allows arrive signal to be multicast to CTAs in the cluster with rank = 0, 1, and 3). - ForAsyncThread
,dst
serves as a destination cta rank (e.g., 3 means threads will arrive on the mbarrier with rank = 3 in the cluster).cta_group (
cute.nvgpu.tcgen05.CtaGroup
, optional) – CTA group forTCGen05Mma
, defaults to None for other op types
- arrive_mbarrier(
- index: int,
- dst_rank: int | None = None,
- arrive_cp_async_mbarrier(index: int)#
- arrive_and_expect_tx(index: int, tx_count: int) None #
- try_wait(
- index: int,
- phase: int,
- wait(index: int, phase: int) None #
- arrive_and_drop() None #
- get_barrier(index: int) cutlass.cute.typing.Pointer #
- max() int #
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.NamedBarrier(barrier_id: int, num_threads: int)#
Bases:
SyncObject
NamedBarrier is an abstraction for named barriers managed by hardware. There are 16 named barriers available, with barrier_ids 0-15.
See the PTX documentation.
- barrier_id: int#
- num_threads: int#
- arrive() None #
The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.
- arrive_unaligned() None #
The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.
- wait() None #
NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.
- wait_unaligned() None #
- arrive_and_wait() None #
- arrive_and_drop() None #
- sync() None #
- get_barrier() int #
- max() int #
- __init__(barrier_id: int, num_threads: int) None #
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.PipelineOrder(
- sync_object_full: SyncObject,
- depth: int,
- length: int,
- group_id: int,
- state: PipelineState,
Bases:
object
PipelineOrder is used for managing ordered pipeline execution with multiple groups.
This class implements a pipeline ordering mechanism where work is divided into groups and stages, allowing for controlled progression through pipeline stages with proper synchronization between different groups.
The pipeline ordering works as follows: - The pipeline is divided into ‘length’ number of groups - Each group has ‘depth’ number of stages - Groups execute in a specific order with synchronization barriers - Each group waits for the previous group to complete before proceeding
Example:
# Create pipeline order with 3 groups, each with 2 stages pipeline_order = PipelineOrder.create( barrier_storage=smem_ptr, # shared memory pointer for barriers depth=2, # 2 stages per group length=3, # 3 groups total group_id=0, # current group ID (0, 1, or 2) producer_group=producer_warp # cooperative group for producers ) # In the pipeline loop for stage in range(num_stages): pipeline_order.wait() # Wait for previous group to complete # Process current stage pipeline_order.arrive() # Signal completion to next group
- sync_object_full: SyncObject#
- depth: int#
- length: int#
- group_id: int#
- state: PipelineState#
- static create(
- barrier_storage: cutlass.cute.typing.Pointer,
- depth: int,
- length: int,
- group_id: int,
- producer_group: CooperativeGroup,
- get_barrier_for_current_stage_idx(group_id)#
- arrive()#
- wait()#
- __init__(
- sync_object_full: SyncObject,
- depth: int,
- length: int,
- group_id: int,
- state: PipelineState,
- class cutlass.pipeline.TmaStoreFence(num_stages: int = 0)#
Bases:
SyncObject
TmaStoreFence is used for a multi-stage epilogue buffer.
- __init__(num_stages: int = 0) None #
- arrive() None #
- wait() None #
- arrive_and_wait() None #
- arrive_and_drop() None #
- get_barrier() None #
- max() None #
- tail() None #
- _abc_impl = <_abc._abc_data object>#
- class cutlass.pipeline.PipelineUserType(value)#
Bases:
Enum
An enumeration.
- Producer = 1#
- Consumer = 2#
- class cutlass.pipeline.PipelineState(stages: int, count, index, phase)#
Bases:
object
Pipeline state contains an index and phase bit corresponding to the current position in the circular buffer.
- __init__(stages: int, count, index, phase)#
- clone() PipelineState #
- property index: cutlass.cutlass_dsl.Int32#
- property count: cutlass.cutlass_dsl.Int32#
- property stages: int#
- property phase: cutlass.cutlass_dsl.Int32#
- reset_count()#
- advance()#
- reverse()#
- class cutlass.pipeline.PipelineAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
object
PipelineAsync is a generic pipeline class where both the producer and consumer are AsyncThreads. It also serves as a base class for specialized pipeline classes.
This class implements a producer-consumer pipeline pattern where both sides operate asynchronously. The pipeline maintains synchronization state using barrier objects to coordinate between producer and consumer threads.
The pipeline state transitions of one pipeline entry(mbarrier) can be represented as:
Table 2 Pipeline State Transitions# Barrier
State
p.acquire
p.commit
c.wait
c.release
empty_bar
empty
<Return>
n/a
n/a
empty_bar
wait
<Block>
n/a
n/a
-> empty
full_bar
wait
n/a
-> full
<Block >
n/a
full_bar
full
n/a
<Return>
n/a
Where:
p: producer
c: consumer
<Block>: This action is blocked until transition to a state allow it to proceed by other side - e.g.
p.acquire()
is blocked untilempty_bar
transition toempty
state byc.release()
Array of mbarriers as circular buffer: Advance Direction <------------------- Producer Consumer | ^ V | +-----------------+ --|X|X|W|D|D|D|D|R|X|<-. / +-----------------+ \ | | `------------------------'
Where:
X: Empty buffer (initial state)
W: Producer writing (producer is waiting for buffer to be empty)
D: Data ready (producer has written data to buffer)
R: Consumer reading (consumer is consuming data from buffer)
Example:
# Create pipeline with 5 stages pipeline = PipelineAsync.create( num_stages=5, # number of pipeline stages producer_group=producer_warp, consumer_group=consumer_warp barrier_storage=smem_ptr, # smem pointer for array of mbarriers in shared memory ) producer, consumer = pipeline.make_participants() # Producer side for i in range(num_iterations): handle = producer.acquire_and_advance() # Wait for buffer to be empty & Move index to next stage # Write data to pipeline buffer handle.commit() # Signal buffer is full # Consumer side for i in range(num_iterations): handle = consumer.wait_and_advance() # Wait for buffer to be full & Move index to next stage # Read data from pipeline buffer handle.release() # Signal buffer is empty
- sync_object_full: SyncObject#
- sync_object_empty: SyncObject#
- num_stages: int#
- producer_mask: cutlass.cutlass_dsl.Int32 | None#
- consumer_mask: cutlass.cutlass_dsl.Int32 | None#
- static _make_sync_object(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: int,
- agent: tuple[PipelineOp, CooperativeGroup],
- tx_count: int = 0,
Returns a SyncObject corresponding to an agent’s PipelineOp.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
Creates and initializes a new PipelineAsync instance.
This helper function computes necessary attributes and returns an instance of PipelineAsync with the specified configuration for producer and consumer synchronization.
- Parameters:
barrier_storage (cute.Pointer) – Pointer to the shared memory address for this pipeline’s mbarriers
num_stages (int) – Number of buffer stages for this pipeline
producer_group (CooperativeGroup) – CooperativeGroup for the producer agent
consumer_group (CooperativeGroup) – CooperativeGroup for the consumer agent
producer_mask (Int32, optional) – Mask for signaling arrives for the producer agent, defaults to
None
consumer_mask (Int32, optional) – Mask for signaling arrives for the consumer agent, defaults to
None
- Returns:
A new PipelineAsync instance
- Return type:
- Raises:
ValueError – If barrier_storage is not a cute.Pointer instance
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
- producer_try_acquire(
- state: PipelineState,
- producer_commit(
- state: PipelineState,
- consumer_wait(
- state: PipelineState,
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
- consumer_try_wait(
- state: PipelineState,
- consumer_release(
- state: PipelineState,
- producer_get_barrier(
- state: PipelineState,
- producer_tail(
- state: PipelineState,
Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.
- Parameters:
state (PipelineState) – The pipeline state that points to next useful buffer
- make_producer()#
- make_consumer()#
- make_participants()#
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineCpAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
PipelineAsync
PipelineCpAsync is used for CpAsync producers and AsyncThread consumers (e.g. Hopper non-TMA mainloops).
- static create(
- barrier_storage: cutlass.cute.typing.Pointer,
- num_stages: cutlass.cutlass_dsl.Int32,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
This helper function computes any necessary attributes and returns an instance of PipelineAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param producer_mask: Mask for signaling arrives for the producer agent :type producer_mask: Int32 | None :param consumer_mask: Mask for signaling arrives for the consumer agent :type consumer_mask: Int32 | None
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineTmaAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
Bases:
PipelineAsync
PipelineTmaAsync is used for TMA producers and AsyncThread consumers (e.g. Hopper mainloops).
- is_signalling_thread: cutlass.cutlass_dsl.Boolean#
- static init_empty_barrier_arrive_signal(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- tidx: cutlass.cutlass_dsl.Int32,
- mcast_mode_mn: tuple[int, int],
Initialize the empty barrier arrive signal This function returns the destination cta rank and a boolean indicating if the signalling thread is the same as the current thread
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- tidx: cutlass.cutlass_dsl.Int32 | None = None,
- mcast_mode_mn: tuple[int, int] = (1, 1),
This helper function computes any necessary attributes and returns an instance of PipelineTmaAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None :param tidx: thread index to consumer async threads :type tidx: Int32 | None :param mcast_mode_mn: Tuple of two integers, specifying whether mcast is enabled for the m and n modes. At least one of the two integers must be 1. :type mcast_mode_mn: tuple[int, int]
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
TMA producer commit conditionally waits on buffer empty and sets the transaction barrier.
- producer_commit(
- state: PipelineState,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- consumer_release(
- state: PipelineState,
TMA consumer release conditionally signals the empty buffer to the producer.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_signalling_thread: cutlass.cutlass_dsl.Boolean,
- class cutlass.pipeline.PipelineTmaUmma(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- cta_group: CtaGroup,
Bases:
PipelineAsync
PipelineTmaUmma is used for TMA producers and UMMA consumers (e.g. Blackwell mainloops).
- is_leader_cta: bool#
- static _compute_mcast_arrival_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
- mcast_mode_mn: tuple[int, int],
Computes a mask for signaling arrivals to multicasting threadblocks.
- static _compute_is_leader_cta(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
- mcast_mode_mn: tuple[int, int] = (1, 1),
This helper function computes any necessary attributes and returns an instance of PipelineTmaUmma. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None :param mcast_mode_mn: Tuple of two integers, specifying whether mcast is enabled for the m and n modes. At least one of the two integers must be 1. :type mcast_mode_mn: tuple[int, int]
- consumer_release(
- state: PipelineState,
UMMA consumer release buffer empty, cta_group needs to be provided.
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
TMA producer commit conditionally waits on buffer empty and sets the transaction barrier for leader threadblocks.
- producer_commit(
- state: PipelineState,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineTmaMultiConsumersAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- sync_object_empty_umma: SyncObject,
- sync_object_empty_async: SyncObject,
- cta_group: CtaGroup,
Bases:
PipelineAsync
PipelineTmaMultiConsumersAsync is used for TMA producers and UMMA+Async consumers.
- is_leader_cta: bool#
- sync_object_empty_umma: SyncObject#
- sync_object_empty_async: SyncObject#
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group_umma: CooperativeGroup,
- consumer_group_async: CooperativeGroup,
- tx_count: int,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
This helper function computes any necessary attributes and returns an instance of PipelineTmaMultiConsumersAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group_umma: CooperativeGroup for the UMMA consumer agent :type consumer_group_umma: CooperativeGroup :param consumer_group_async: CooperativeGroup for the AsyncThread consumer agent :type consumer_group_async: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None
- producer_acquire(
- state: PipelineState,
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
TMA producer acquire waits on buffer empty and sets the transaction barrier for leader threadblocks.
- producer_commit(
- state: PipelineState,
TMA producer commit is a noop since TMA instruction itself updates the transaction count.
- consumer_release(
- state: PipelineState,
- op_type: PipelineOp,
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- is_leader_cta: bool,
- sync_object_empty_umma: SyncObject,
- sync_object_empty_async: SyncObject,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineAsyncUmma(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
Bases:
PipelineAsync
PipelineAsyncUmma is used for AsyncThread producers and UMMA consumers (e.g. Blackwell input fusion pipelines).
- static _compute_leading_cta_rank(cta_v_size)#
Computes the leading CTA rank.
- static _compute_is_leader_cta(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.
- static _compute_peer_cta_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
Computes a mask for signaling arrivals to multicasting threadblocks.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
This helper function computes any necessary attributes and returns an instance of PipelineAsyncUmma. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None
- consumer_release(
- state: PipelineState,
UMMA consumer release buffer empty, cta_group needs to be provided.
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineUmmaAsync(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
Bases:
PipelineAsync
PipelineUmmaAsync is used for UMMA producers and AsyncThread consumers (e.g. Blackwell accumulator pipelines).
- static _compute_tmem_sync_mask(
- cta_layout_vmnk: cutlass.cute.typing.Layout,
Computes a mask to signal completion of tmem buffers for 2CTA kernels.
- static _compute_peer_cta_rank()#
Computes a mask to signal release of tmem buffers for 2CTA kernels.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
- consumer_group: CooperativeGroup,
- barrier_storage: cutlass.cute.typing.Pointer | None = None,
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
This helper function computes any necessary attributes and returns an instance of PipelineUmmaAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None
- producer_commit(
- state: PipelineState,
UMMA producer commit buffer full, cta_group needs to be provided.
- producer_tail(
- state: PipelineState,
Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.
- Parameters:
state (PipelineState) – The pipeline state that points to next useful buffer
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- cta_group: CtaGroup,
- class cutlass.pipeline.PipelineTmaStore(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
Bases:
PipelineAsync
PipelineTmaStore is used for synchronizing TMA stores in the epilogue. It does not use mbarriers.
- static create(
- *,
- num_stages: int,
- producer_group: CooperativeGroup,
This helper function computes any necessary attributes and returns an instance of PipelineTmaStore. :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup
- producer_acquire()#
- producer_commit()#
- consumer_wait()#
- consumer_release()#
- producer_tail()#
Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.
- Parameters:
state (PipelineState) – The pipeline state that points to next useful buffer
- __init__(
- sync_object_full: SyncObject,
- sync_object_empty: SyncObject,
- num_stages: int,
- producer_mask: cutlass.cutlass_dsl.Int32 | None,
- consumer_mask: cutlass.cutlass_dsl.Int32 | None,
- class cutlass.pipeline.PipelineProducer(
- pipeline,
- state,
- group: CooperativeGroup,
Bases:
object
A class representing a producer in an asynchronous pipeline.
This class manages the producer side of an asynchronous pipeline, handling synchronization and state management for producing data. It provides methods for acquiring, committing, and advancing through pipeline stages.
- Variables:
__pipeline – The asynchronous pipeline this producer belongs to
__state – The current state of the producer in the pipeline
__group – The cooperative group this producer operates in
Examples:
pipeline = PipelineAsync.create(...) producer, consumer = pipeline.make_participants() for i in range(iterations): # Try to acquire the current buffer without blocking try_acquire_token = producer.try_acquire() # Do something else independently ... # Wait for current buffer to be empty & Move index to next stage # If try_acquire_token is True, return immediately # If try_acquire_token is False, block until buffer is empty handle = producer.acquire_and_advance(try_acquire_token) # Produce data handle.commit()
- class ImmutableResourceHandle(
- _ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
- _ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
Bases:
ImmutableResourceHandle
- property barrier#
Get the barrier pointer for the current pipeline stage.
- Returns:
Pointer to the barrier for the current stage
- Return type:
cute.Pointer
- commit()#
Signal that data production is complete for the current stage.
This allows consumers to start processing the data.
- __init__(
- _ImmutableResourceHandle__origin: PipelineAsync,
- _ImmutableResourceHandle__immutable_state: PipelineState,
- __init__(
- pipeline,
- state,
- group: CooperativeGroup,
Initialize a new Producer instance.
- Parameters:
pipeline (PipelineAsync) – The pipeline this producer belongs to
state (PipelineState) – Initial pipeline state
group (CooperativeGroup) – The cooperative group for synchronization
- __pipeline: PipelineAsync#
- __state: PipelineState#
- __group: CooperativeGroup#
- reset()#
Reset the count of how many handles this producer has committed.
- acquire(
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
Wait for the current buffer to be empty before producing data. This is a blocking operation.
- Parameters:
try_acquire_token (Optional[Boolean]) – Optional token to try to acquire the buffer
- Returns:
A handle to the producer for committing the data
- Return type:
- advance()#
Move to the next pipeline stage.
- acquire_and_advance(
- try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
Acquire the current buffer and advance to the next pipeline stage.
This method combines the acquire() and advance() operations into a single call. It first waits for the current buffer to be empty before producing data, then advances the pipeline to the next stage.
- Parameters:
try_acquire_token (Optional[Boolean]) – Token indicating whether to try non-blocking acquire. If True, returns immediately without waiting. If False or None, blocks until buffer is empty.
- Returns:
A handle to the producer that can be used to commit data to the acquired buffer stage
- Return type:
- try_acquire() cutlass.cutlass_dsl.Boolean #
Attempt to acquire the current buffer without blocking.
This method tries to acquire the current buffer stage for producing data without waiting. It can be used to check buffer availability before committing to a blocking acquire operation.
- Returns:
A boolean token indicating whether the buffer was successfully acquired
- Return type:
Boolean
- commit(
- handle: ImmutableResourceHandle | None = None,
Signal that data production is complete for the current stage.
This allows consumers to start processing the data.
- Parameters:
handle (Optional[ImmutableResourceHandle]) – Optional handle to commit, defaults to None
- Raises:
AssertionError – If provided handle does not belong to this producer
- tail()#
Ensure all used buffers are properly synchronized before producer exit.
This should be called before the producer finishes to avoid dangling signals.
- class cutlass.pipeline.PipelineConsumer(
- pipeline,
- state: PipelineState,
- group: CooperativeGroup,
Bases:
object
A class representing a consumer in an asynchronous pipeline.
The Consumer class manages the consumer side of an asynchronous pipeline, handling synchronization and state management for consuming data. It provides methods for waiting, releasing, and advancing through pipeline stages.
- Variables:
__pipeline – The asynchronous pipeline this consumer belongs to
__state – The current state of the consumer in the pipeline
__group – The cooperative group this consumer operates in
Examples:
pipeline = PipelineAsync.create(...) producer, consumer = pipeline.make_participants() for i in range(iterations): # Try to wait for buffer to be full try_wait_token = consumer.try_wait() # Do something else independently ... # Wait for buffer to be full & Move index to next stage # If try_wait_token is True, return immediately # If try_wait_token is False, block until buffer is full handle = consumer.wait_and_advance(try_wait_token) # Consume data handle.release( ) # Signal buffer is empty # Alternative way to do this is: # handle.release() # Signal buffer is empty
- class ImmutableResourceHandle(
- _ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
- _ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
Bases:
ImmutableResourceHandle
- release()#
Signal that data production is complete for the current stage. This allows consumers to start processing the data.
- __init__(
- _ImmutableResourceHandle__origin: PipelineAsync,
- _ImmutableResourceHandle__immutable_state: PipelineState,
- __init__(
- pipeline,
- state: PipelineState,
- group: CooperativeGroup,
Initialize a new Consumer instance.
- Parameters:
pipeline (PipelineAsync) – The pipeline this consumer belongs to
state (PipelineState) – Initial pipeline state
group (CooperativeGroup) – The cooperative group for synchronization
- __pipeline: PipelineAsync#
- __group: CooperativeGroup#
- __state: PipelineState#
- reset()#
Reset the count of how many handles this consumer has consumed.
- wait(
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
Wait for data to be ready in the current buffer. This is a blocking operation that will not return until data is available.
- Parameters:
try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.
- Returns:
An immutable handle to the consumer that can be used to release the buffer once data consumption is complete
- Return type:
- advance()#
Advance the consumer to the next pipeline stage.
This updates the internal state to point to the next buffer in the pipeline. Should be called after consuming data from the current buffer.
- wait_and_advance(
- try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
Atomically wait for data and advance to next pipeline stage.
This is a convenience method that combines wait() and advance() into a single atomic operation. It will block until data is available in the current buffer, then automatically advance to the next stage.
- Parameters:
try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.
- Returns:
An immutable handle to the consumer that can be used to release the buffer once data consumption is complete
- Return type:
- try_wait() cutlass.cutlass_dsl.Boolean #
Non-blocking check if data is ready in the current buffer.
This method provides a way to test if data is available without blocking. Unlike wait(), this will return immediately regardless of buffer state.
- Returns:
True if data is ready to be consumed, False if the buffer is not yet ready
- Return type:
Boolean
- release(
- handle: ImmutableResourceHandle | None = None,
Signal that data consumption is complete for the current stage. This allows producers to start producing new data.
- cutlass.pipeline.make_pipeline_state(
- type: PipelineUserType,
- stages: int,
Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.
- cutlass.pipeline.pipeline_init_wait(
- cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
Fences the mbarrier init and syncs the threadblock or cluster
- cutlass.pipeline.arrive(barrier_id: int, num_threads: int)#
The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.
- cutlass.pipeline.arrive_unaligned(barrier_id: int, num_threads: int)#
The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.
- cutlass.pipeline.wait(barrier_id: int, num_threads: int)#
NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.
- cutlass.pipeline.wait_unaligned(barrier_id: int, num_threads: int)#
- cutlass.pipeline.arrive_and_wait(barrier_id: int, num_threads: int)#
- cutlass.pipeline.sync(barrier_id: int = 0)#