cutlass.pipeline#

class cutlass.pipeline.Agent(value)#

Bases: Enum

Agent indicates what is participating in the pipeline synchronization.

Thread = 1#
ThreadBlock = 2#
ThreadBlockCluster = 3#
class cutlass.pipeline.CooperativeGroup(
agent: Agent,
size: int = 1,
alignment=None,
)#

Bases: object

CooperativeGroup contains size and alignment restrictions for an Agent.

__init__(
agent: Agent,
size: int = 1,
alignment=None,
)#
class cutlass.pipeline.PipelineOp(value)#

Bases: Enum

PipelineOp assigns an operation to an agent corresponding to a specific hardware feature.

AsyncThread = 1#
TCGen05Mma = 2#
TmaLoad = 3#
TmaStore = 4#
Composite = 5#
AsyncLoad = 6#
class cutlass.pipeline.SyncObject#

Bases: ABC

Abstract base class for hardware synchronization primitives.

This class defines the interface for different types of hardware synchronization mechanisms including shared memory barriers, named barriers, and fences.

abstract arrive() None#
abstract wait() None#
abstract arrive_and_wait() None#
abstract arrive_and_drop() None#
abstract get_barrier() cutlass.cute.typing.Pointer | int | None#
abstract max() int | None#
_abc_impl = <_abc._abc_data object>#
class cutlass.pipeline.MbarrierArray(
barrier_storage: cutlass.cute.typing.Pointer,
num_stages: int,
agent: tuple[PipelineOp, CooperativeGroup],
tx_count: int = 0,
)#

Bases: SyncObject

MbarrierArray implements an abstraction for an array of smem barriers.

__init__(
barrier_storage: cutlass.cute.typing.Pointer,
num_stages: int,
agent: tuple[PipelineOp, CooperativeGroup],
tx_count: int = 0,
) None#
recast_to_new_op_type(
new_op_type: PipelineOp,
) MbarrierArray#

Creates a copy of MbarrierArray with a different op_type without re-initializing barriers

mbarrier_init() None#

Initializes an array of mbarriers using warp 0.

arrive(
index: int,
dst: int,
cta_group: CtaGroup | None = None,
) None#

Select the arrive corresponding to this MbarrierArray’s PipelineOp.

Parameters:
  • index (int) – Index of the mbarrier in the array to arrive on

  • dst (int | None) – Destination parameter for selective arrival, which can be either a mask or destination cta rank. When None, both TCGen05Mma and AsyncThread will arrive on their local mbarrier. - For TCGen05Mma, dst serves as a multicast mask (e.g., 0b1011 allows arrive signal to be multicast to CTAs in the cluster with rank = 0, 1, and 3). - For AsyncThread, dst serves as a destination cta rank (e.g., 3 means threads will arrive on the mbarrier with rank = 3 in the cluster).

  • cta_group (cute.nvgpu.tcgen05.CtaGroup, optional) – CTA group for TCGen05Mma, defaults to None for other op types

arrive_mbarrier(
index: int,
dst_rank: int | None = None,
) None#
arrive_cp_async_mbarrier(index: int)#
arrive_tcgen05mma(
index: int,
mask: int | None,
cta_group: CtaGroup,
) None#
arrive_and_expect_tx(index: int, tx_count: int) None#
try_wait(
index: int,
phase: int,
) cutlass.cutlass_dsl.Boolean#
wait(index: int, phase: int) None#
arrive_and_wait(
index: int,
phase: int,
dst: int,
cta_group: CtaGroup | None = None,
) None#
arrive_and_drop() None#
get_barrier(index: int) cutlass.cute.typing.Pointer#
max() int#
_abc_impl = <_abc._abc_data object>#
class cutlass.pipeline.NamedBarrier(barrier_id: int, num_threads: int)#

Bases: SyncObject

NamedBarrier is an abstraction for named barriers managed by hardware. There are 16 named barriers available, with barrier_ids 0-15.

See the PTX documentation.

barrier_id: int#
num_threads: int#
arrive() None#

The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.

arrive_unaligned() None#

The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.

wait() None#

NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.

wait_unaligned() None#
arrive_and_wait() None#
arrive_and_drop() None#
sync() None#
get_barrier() int#
max() int#
__init__(barrier_id: int, num_threads: int) None#
_abc_impl = <_abc._abc_data object>#
class cutlass.pipeline.PipelineOrder(
sync_object_full: SyncObject,
depth: int,
length: int,
group_id: int,
state: PipelineState,
)#

Bases: object

PipelineOrder is used for managing ordered pipeline execution with multiple groups.

This class implements a pipeline ordering mechanism where work is divided into groups and stages, allowing for controlled progression through pipeline stages with proper synchronization between different groups.

The pipeline ordering works as follows: - The pipeline is divided into ‘length’ number of groups - Each group has ‘depth’ number of stages - Groups execute in a specific order with synchronization barriers - Each group waits for the previous group to complete before proceeding

Example:

# Create pipeline order with 3 groups, each with 2 stages
pipeline_order = PipelineOrder.create(
    barrier_storage=smem_ptr,      # shared memory pointer for barriers
    depth=2,                       # 2 stages per group
    length=3,                      # 3 groups total
    group_id=0,                    # current group ID (0, 1, or 2)
    producer_group=producer_warp   # cooperative group for producers
)

# In the pipeline loop
for stage in range(num_stages):
    pipeline_order.wait()          # Wait for previous group to complete
    # Process current stage
    pipeline_order.arrive()        # Signal completion to next group
sync_object_full: SyncObject#
depth: int#
length: int#
group_id: int#
state: PipelineState#
static create(
barrier_storage: cutlass.cute.typing.Pointer,
depth: int,
length: int,
group_id: int,
producer_group: CooperativeGroup,
)#
get_barrier_for_current_stage_idx(group_id)#
arrive()#
wait()#
__init__(
sync_object_full: SyncObject,
depth: int,
length: int,
group_id: int,
state: PipelineState,
) None#
class cutlass.pipeline.TmaStoreFence(num_stages: int = 0)#

Bases: SyncObject

TmaStoreFence is used for a multi-stage epilogue buffer.

__init__(num_stages: int = 0) None#
arrive() None#
wait() None#
arrive_and_wait() None#
arrive_and_drop() None#
get_barrier() None#
max() None#
tail() None#
_abc_impl = <_abc._abc_data object>#
class cutlass.pipeline.PipelineUserType(value)#

Bases: Enum

An enumeration.

Producer = 1#
Consumer = 2#
class cutlass.pipeline.PipelineState(stages: int, count, index, phase)#

Bases: object

Pipeline state contains an index and phase bit corresponding to the current position in the circular buffer.

__init__(stages: int, count, index, phase)#
clone() PipelineState#
property index: cutlass.cutlass_dsl.Int32#
property count: cutlass.cutlass_dsl.Int32#
property stages: int#
property phase: cutlass.cutlass_dsl.Int32#
reset_count()#
advance()#
reverse()#
class cutlass.pipeline.PipelineAsync(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
)#

Bases: object

PipelineAsync is a generic pipeline class where both the producer and consumer are AsyncThreads. It also serves as a base class for specialized pipeline classes.

This class implements a producer-consumer pipeline pattern where both sides operate asynchronously. The pipeline maintains synchronization state using barrier objects to coordinate between producer and consumer threads.

The pipeline state transitions of one pipeline entry(mbarrier) can be represented as:

Table 2 Pipeline State Transitions#

Barrier

State

p.acquire

p.commit

c.wait

c.release

empty_bar

empty

<Return>

n/a

n/a

empty_bar

wait

<Block>

n/a

n/a

-> empty

full_bar

wait

n/a

-> full

<Block >

n/a

full_bar

full

n/a

<Return>

n/a

Where:

  • p: producer

  • c: consumer

  • <Block>: This action is blocked until transition to a state allow it to proceed by other side - e.g. p.acquire() is blocked until empty_bar transition to empty state by c.release()

Array of mbarriers as circular buffer:

     Advance Direction
   <-------------------

    Producer   Consumer
        |         ^
        V         |
   +-----------------+
 --|X|X|W|D|D|D|D|R|X|<-.
/  +-----------------+   \
|                        |
`------------------------'

Where:

  • X: Empty buffer (initial state)

  • W: Producer writing (producer is waiting for buffer to be empty)

  • D: Data ready (producer has written data to buffer)

  • R: Consumer reading (consumer is consuming data from buffer)

Example:

# Create pipeline with 5 stages
pipeline = PipelineAsync.create(
    num_stages=5,                   # number of pipeline stages
    producer_group=producer_warp,
    consumer_group=consumer_warp
    barrier_storage=smem_ptr,       # smem pointer for array of mbarriers in shared memory
)

producer, consumer = pipeline.make_participants()
# Producer side
for i in range(num_iterations):
    handle = producer.acquire_and_advance()  # Wait for buffer to be empty & Move index to next stage
    # Write data to pipeline buffer
    handle.commit()   # Signal buffer is full

# Consumer side
for i in range(num_iterations):
    handle = consumer.wait_and_advance()     # Wait for buffer to be full & Move index to next stage
    # Read data from pipeline buffer
    handle.release()  # Signal buffer is empty
sync_object_full: SyncObject#
sync_object_empty: SyncObject#
num_stages: int#
producer_mask: cutlass.cutlass_dsl.Int32 | None#
consumer_mask: cutlass.cutlass_dsl.Int32 | None#
static _make_sync_object(
barrier_storage: cutlass.cute.typing.Pointer,
num_stages: int,
agent: tuple[PipelineOp, CooperativeGroup],
tx_count: int = 0,
) SyncObject#

Returns a SyncObject corresponding to an agent’s PipelineOp.

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
)#

Creates and initializes a new PipelineAsync instance.

This helper function computes necessary attributes and returns an instance of PipelineAsync with the specified configuration for producer and consumer synchronization.

Parameters:
  • barrier_storage (cute.Pointer) – Pointer to the shared memory address for this pipeline’s mbarriers

  • num_stages (int) – Number of buffer stages for this pipeline

  • producer_group (CooperativeGroup) – CooperativeGroup for the producer agent

  • consumer_group (CooperativeGroup) – CooperativeGroup for the consumer agent

  • producer_mask (Int32, optional) – Mask for signaling arrives for the producer agent, defaults to None

  • consumer_mask (Int32, optional) – Mask for signaling arrives for the consumer agent, defaults to None

Returns:

A new PipelineAsync instance

Return type:

PipelineAsync

Raises:

ValueError – If barrier_storage is not a cute.Pointer instance

producer_acquire(
state: PipelineState,
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
)#
producer_try_acquire(
state: PipelineState,
)#
producer_commit(
state: PipelineState,
)#
consumer_wait(
state: PipelineState,
try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
)#
consumer_try_wait(
state: PipelineState,
)#
consumer_release(
state: PipelineState,
)#
producer_get_barrier(
state: PipelineState,
) cutlass.cute.typing.Pointer#
producer_tail(
state: PipelineState,
)#

Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.

Parameters:

state (PipelineState) – The pipeline state that points to next useful buffer

make_producer()#
make_consumer()#
make_participants()#
__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
) None#
class cutlass.pipeline.PipelineCpAsync(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
)#

Bases: PipelineAsync

PipelineCpAsync is used for CpAsync producers and AsyncThread consumers (e.g. Hopper non-TMA mainloops).

static create(
barrier_storage: cutlass.cute.typing.Pointer,
num_stages: cutlass.cutlass_dsl.Int32,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
producer_mask: cutlass.cutlass_dsl.Int32 | None = None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None = None,
)#

This helper function computes any necessary attributes and returns an instance of PipelineAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param producer_mask: Mask for signaling arrives for the producer agent :type producer_mask: Int32 | None :param consumer_mask: Mask for signaling arrives for the consumer agent :type consumer_mask: Int32 | None

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
) None#
class cutlass.pipeline.PipelineTmaAsync(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_signalling_thread: cutlass.cutlass_dsl.Boolean,
)#

Bases: PipelineAsync

PipelineTmaAsync is used for TMA producers and AsyncThread consumers (e.g. Hopper mainloops).

is_signalling_thread: cutlass.cutlass_dsl.Boolean#
static init_empty_barrier_arrive_signal(
cta_layout_vmnk: cutlass.cute.typing.Layout,
tidx: cutlass.cutlass_dsl.Int32,
mcast_mode_mn: tuple[int, int],
)#

Initialize the empty barrier arrive signal This function returns the destination cta rank and a boolean indicating if the signalling thread is the same as the current thread

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
tx_count: int,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
tidx: cutlass.cutlass_dsl.Int32 | None = None,
mcast_mode_mn: tuple[int, int] = (1, 1),
)#

This helper function computes any necessary attributes and returns an instance of PipelineTmaAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None :param tidx: thread index to consumer async threads :type tidx: Int32 | None :param mcast_mode_mn: Tuple of two integers, specifying whether mcast is enabled for the m and n modes. At least one of the two integers must be 1. :type mcast_mode_mn: tuple[int, int]

producer_acquire(
state: PipelineState,
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
)#

TMA producer commit conditionally waits on buffer empty and sets the transaction barrier.

producer_commit(
state: PipelineState,
)#

TMA producer commit is a noop since TMA instruction itself updates the transaction count.

consumer_release(
state: PipelineState,
)#

TMA consumer release conditionally signals the empty buffer to the producer.

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_signalling_thread: cutlass.cutlass_dsl.Boolean,
) None#
class cutlass.pipeline.PipelineTmaUmma(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_leader_cta: bool,
cta_group: CtaGroup,
)#

Bases: PipelineAsync

PipelineTmaUmma is used for TMA producers and UMMA consumers (e.g. Blackwell mainloops).

is_leader_cta: bool#
cta_group: CtaGroup#
static _compute_mcast_arrival_mask(
cta_layout_vmnk: cutlass.cute.typing.Layout,
mcast_mode_mn: tuple[int, int],
)#

Computes a mask for signaling arrivals to multicasting threadblocks.

static _compute_is_leader_cta(
cta_layout_vmnk: cutlass.cute.typing.Layout,
)#

Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
tx_count: int,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
mcast_mode_mn: tuple[int, int] = (1, 1),
)#

This helper function computes any necessary attributes and returns an instance of PipelineTmaUmma. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None :param mcast_mode_mn: Tuple of two integers, specifying whether mcast is enabled for the m and n modes. At least one of the two integers must be 1. :type mcast_mode_mn: tuple[int, int]

consumer_release(
state: PipelineState,
)#

UMMA consumer release buffer empty, cta_group needs to be provided.

producer_acquire(
state: PipelineState,
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
)#

TMA producer commit conditionally waits on buffer empty and sets the transaction barrier for leader threadblocks.

producer_commit(
state: PipelineState,
)#

TMA producer commit is a noop since TMA instruction itself updates the transaction count.

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_leader_cta: bool,
cta_group: CtaGroup,
) None#
class cutlass.pipeline.PipelineTmaMultiConsumersAsync(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_leader_cta: bool,
sync_object_empty_umma: SyncObject,
sync_object_empty_async: SyncObject,
cta_group: CtaGroup,
)#

Bases: PipelineAsync

PipelineTmaMultiConsumersAsync is used for TMA producers and UMMA+Async consumers.

is_leader_cta: bool#
sync_object_empty_umma: SyncObject#
sync_object_empty_async: SyncObject#
cta_group: CtaGroup#
static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group_umma: CooperativeGroup,
consumer_group_async: CooperativeGroup,
tx_count: int,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
)#

This helper function computes any necessary attributes and returns an instance of PipelineTmaMultiConsumersAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group_umma: CooperativeGroup for the UMMA consumer agent :type consumer_group_umma: CooperativeGroup :param consumer_group_async: CooperativeGroup for the AsyncThread consumer agent :type consumer_group_async: CooperativeGroup :param tx_count: Number of bytes expected to be written to the transaction barrier for one stage :type tx_count: int :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None

producer_acquire(
state: PipelineState,
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
)#

TMA producer acquire waits on buffer empty and sets the transaction barrier for leader threadblocks.

producer_commit(
state: PipelineState,
)#

TMA producer commit is a noop since TMA instruction itself updates the transaction count.

consumer_release(
state: PipelineState,
op_type: PipelineOp,
)#
__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
is_leader_cta: bool,
sync_object_empty_umma: SyncObject,
sync_object_empty_async: SyncObject,
cta_group: CtaGroup,
) None#
class cutlass.pipeline.PipelineAsyncUmma(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
cta_group: CtaGroup,
)#

Bases: PipelineAsync

PipelineAsyncUmma is used for AsyncThread producers and UMMA consumers (e.g. Blackwell input fusion pipelines).

cta_group: CtaGroup#
static _compute_leading_cta_rank(cta_v_size)#

Computes the leading CTA rank.

static _compute_is_leader_cta(
cta_layout_vmnk: cutlass.cute.typing.Layout,
)#

Computes leader threadblocks for 2CTA kernels. For 1CTA, all threadblocks are leaders.

static _compute_peer_cta_mask(
cta_layout_vmnk: cutlass.cute.typing.Layout,
)#

Computes a mask for signaling arrivals to multicasting threadblocks.

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
)#

This helper function computes any necessary attributes and returns an instance of PipelineAsyncUmma. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None

consumer_release(
state: PipelineState,
)#

UMMA consumer release buffer empty, cta_group needs to be provided.

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
cta_group: CtaGroup,
) None#
class cutlass.pipeline.PipelineUmmaAsync(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
cta_group: CtaGroup,
)#

Bases: PipelineAsync

PipelineUmmaAsync is used for UMMA producers and AsyncThread consumers (e.g. Blackwell accumulator pipelines).

cta_group: CtaGroup#
static _compute_tmem_sync_mask(
cta_layout_vmnk: cutlass.cute.typing.Layout,
)#

Computes a mask to signal completion of tmem buffers for 2CTA kernels.

static _compute_peer_cta_rank()#

Computes a mask to signal release of tmem buffers for 2CTA kernels.

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
consumer_group: CooperativeGroup,
barrier_storage: cutlass.cute.typing.Pointer | None = None,
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
)#

This helper function computes any necessary attributes and returns an instance of PipelineUmmaAsync. :param barrier_storage: Pointer to the smem address for this pipeline’s mbarriers :type barrier_storage: cute.Pointer :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup :param consumer_group: CooperativeGroup for the consumer agent :type consumer_group: CooperativeGroup :param cta_layout_vmnk: Layout of the cluster shape :type cta_layout_vmnk: cute.Layout | None

producer_commit(
state: PipelineState,
)#

UMMA producer commit buffer full, cta_group needs to be provided.

producer_tail(
state: PipelineState,
)#

Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.

Parameters:

state (PipelineState) – The pipeline state that points to next useful buffer

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
cta_group: CtaGroup,
) None#
class cutlass.pipeline.PipelineTmaStore(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
)#

Bases: PipelineAsync

PipelineTmaStore is used for synchronizing TMA stores in the epilogue. It does not use mbarriers.

static create(
*,
num_stages: int,
producer_group: CooperativeGroup,
)#

This helper function computes any necessary attributes and returns an instance of PipelineTmaStore. :param num_stages: Number of buffer stages for this pipeline :type num_stages: Int32 :param producer_group: CooperativeGroup for the producer agent :type producer_group: CooperativeGroup

producer_acquire()#
producer_commit()#
consumer_wait()#
consumer_release()#
producer_tail()#

Make sure the last used buffer empty signal is visible to producer. Producer tail is usually executed by producer before exit, to avoid dangling mbarrier arrive signals after kernel exit.

Parameters:

state (PipelineState) – The pipeline state that points to next useful buffer

__init__(
sync_object_full: SyncObject,
sync_object_empty: SyncObject,
num_stages: int,
producer_mask: cutlass.cutlass_dsl.Int32 | None,
consumer_mask: cutlass.cutlass_dsl.Int32 | None,
) None#
class cutlass.pipeline.PipelineProducer(
pipeline,
state,
group: CooperativeGroup,
)#

Bases: object

A class representing a producer in an asynchronous pipeline.

This class manages the producer side of an asynchronous pipeline, handling synchronization and state management for producing data. It provides methods for acquiring, committing, and advancing through pipeline stages.

Variables:
  • __pipeline – The asynchronous pipeline this producer belongs to

  • __state – The current state of the producer in the pipeline

  • __group – The cooperative group this producer operates in

Examples:

pipeline = PipelineAsync.create(...)
producer, consumer = pipeline.make_participants()
for i in range(iterations):
    # Try to acquire the current buffer without blocking
    try_acquire_token = producer.try_acquire()

    # Do something else independently
    ...

    # Wait for current buffer to be empty & Move index to next stage
    # If try_acquire_token is True, return immediately
    # If try_acquire_token is False, block until buffer is empty
    handle = producer.acquire_and_advance(try_acquire_token)

    # Produce data
    handle.commit()
class ImmutableResourceHandle(
_ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
_ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
)#

Bases: ImmutableResourceHandle

property barrier#

Get the barrier pointer for the current pipeline stage.

Returns:

Pointer to the barrier for the current stage

Return type:

cute.Pointer

commit()#

Signal that data production is complete for the current stage.

This allows consumers to start processing the data.

__init__(
_ImmutableResourceHandle__origin: PipelineAsync,
_ImmutableResourceHandle__immutable_state: PipelineState,
) None#
__init__(
pipeline,
state,
group: CooperativeGroup,
)#

Initialize a new Producer instance.

Parameters:
__pipeline: PipelineAsync#
__state: PipelineState#
__group: CooperativeGroup#
reset()#

Reset the count of how many handles this producer has committed.

acquire(
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
) ImmutableResourceHandle#

Wait for the current buffer to be empty before producing data. This is a blocking operation.

Parameters:

try_acquire_token (Optional[Boolean]) – Optional token to try to acquire the buffer

Returns:

A handle to the producer for committing the data

Return type:

ImmutableResourceHandle

advance()#

Move to the next pipeline stage.

acquire_and_advance(
try_acquire_token: cutlass.cutlass_dsl.Boolean | None = None,
) ImmutableResourceHandle#

Acquire the current buffer and advance to the next pipeline stage.

This method combines the acquire() and advance() operations into a single call. It first waits for the current buffer to be empty before producing data, then advances the pipeline to the next stage.

Parameters:

try_acquire_token (Optional[Boolean]) – Token indicating whether to try non-blocking acquire. If True, returns immediately without waiting. If False or None, blocks until buffer is empty.

Returns:

A handle to the producer that can be used to commit data to the acquired buffer stage

Return type:

ImmutableResourceHandle

try_acquire() cutlass.cutlass_dsl.Boolean#

Attempt to acquire the current buffer without blocking.

This method tries to acquire the current buffer stage for producing data without waiting. It can be used to check buffer availability before committing to a blocking acquire operation.

Returns:

A boolean token indicating whether the buffer was successfully acquired

Return type:

Boolean

commit(
handle: ImmutableResourceHandle | None = None,
)#

Signal that data production is complete for the current stage.

This allows consumers to start processing the data.

Parameters:

handle (Optional[ImmutableResourceHandle]) – Optional handle to commit, defaults to None

Raises:

AssertionError – If provided handle does not belong to this producer

tail()#

Ensure all used buffers are properly synchronized before producer exit.

This should be called before the producer finishes to avoid dangling signals.

class cutlass.pipeline.PipelineConsumer(
pipeline,
state: PipelineState,
group: CooperativeGroup,
)#

Bases: object

A class representing a consumer in an asynchronous pipeline.

The Consumer class manages the consumer side of an asynchronous pipeline, handling synchronization and state management for consuming data. It provides methods for waiting, releasing, and advancing through pipeline stages.

Variables:
  • __pipeline – The asynchronous pipeline this consumer belongs to

  • __state – The current state of the consumer in the pipeline

  • __group – The cooperative group this consumer operates in

Examples:

pipeline = PipelineAsync.create(...)
producer, consumer = pipeline.make_participants()
for i in range(iterations):
    # Try to wait for buffer to be full
    try_wait_token = consumer.try_wait()

    # Do something else independently
    ...

    # Wait for buffer to be full & Move index to next stage
    # If try_wait_token is True, return immediately
    # If try_wait_token is False, block until buffer is full
    handle = consumer.wait_and_advance(try_wait_token)

    # Consume data
    handle.release(  )  # Signal buffer is empty

    # Alternative way to do this is:
    # handle.release()  # Signal buffer is empty
class ImmutableResourceHandle(
_ImmutableResourceHandle__origin: cutlass.pipeline.sm90.PipelineAsync,
_ImmutableResourceHandle__immutable_state: cutlass.pipeline.helpers.PipelineState,
)#

Bases: ImmutableResourceHandle

release()#

Signal that data production is complete for the current stage. This allows consumers to start processing the data.

__init__(
_ImmutableResourceHandle__origin: PipelineAsync,
_ImmutableResourceHandle__immutable_state: PipelineState,
) None#
__init__(
pipeline,
state: PipelineState,
group: CooperativeGroup,
)#

Initialize a new Consumer instance.

Parameters:
__pipeline: PipelineAsync#
__group: CooperativeGroup#
__state: PipelineState#
reset()#

Reset the count of how many handles this consumer has consumed.

wait(
try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
) ImmutableResourceHandle#

Wait for data to be ready in the current buffer. This is a blocking operation that will not return until data is available.

Parameters:

try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.

Returns:

An immutable handle to the consumer that can be used to release the buffer once data consumption is complete

Return type:

ImmutableResourceHandle

advance()#

Advance the consumer to the next pipeline stage.

This updates the internal state to point to the next buffer in the pipeline. Should be called after consuming data from the current buffer.

wait_and_advance(
try_wait_token: cutlass.cutlass_dsl.Boolean | None = None,
) ImmutableResourceHandle#

Atomically wait for data and advance to next pipeline stage.

This is a convenience method that combines wait() and advance() into a single atomic operation. It will block until data is available in the current buffer, then automatically advance to the next stage.

Parameters:

try_wait_token (Optional[Boolean]) – Token used to attempt a non-blocking wait for the buffer. If provided and True, returns immediately if buffer is not ready.

Returns:

An immutable handle to the consumer that can be used to release the buffer once data consumption is complete

Return type:

ImmutableResourceHandle

try_wait() cutlass.cutlass_dsl.Boolean#

Non-blocking check if data is ready in the current buffer.

This method provides a way to test if data is available without blocking. Unlike wait(), this will return immediately regardless of buffer state.

Returns:

True if data is ready to be consumed, False if the buffer is not yet ready

Return type:

Boolean

release(
handle: ImmutableResourceHandle | None = None,
)#

Signal that data consumption is complete for the current stage. This allows producers to start producing new data.

cutlass.pipeline.make_pipeline_state(
type: PipelineUserType,
stages: int,
)#

Creates a pipeline state. Producers are assumed to start with an empty buffer and have a flipped phase bit of 1.

cutlass.pipeline.pipeline_init_wait(
cta_layout_vmnk: cutlass.cute.typing.Layout | None = None,
)#

Fences the mbarrier init and syncs the threadblock or cluster

cutlass.pipeline.arrive(barrier_id: int, num_threads: int)#

The aligned flavor of arrive is used when all threads in the CTA will execute the same instruction. See PTX documentation.

cutlass.pipeline.arrive_unaligned(barrier_id: int, num_threads: int)#

The unaligned flavor of arrive can be used with an arbitrary number of threads in the CTA.

cutlass.pipeline.wait(barrier_id: int, num_threads: int)#

NamedBarriers do not have a standalone wait like mbarriers, only an arrive_and_wait. If synchronizing two warps in a producer/consumer pairing, the arrive count would be 32 using mbarriers but 64 using NamedBarriers. Only threads from either the producer or consumer are counted for mbarriers, while all threads participating in the sync are counted for NamedBarriers.

cutlass.pipeline.wait_unaligned(barrier_id: int, num_threads: int)#
cutlass.pipeline.arrive_and_wait(barrier_id: int, num_threads: int)#
cutlass.pipeline.sync(barrier_id: int = 0)#