aiq.utils.reactive.subject#

Attributes#

Classes#

Subject

A Subject is both an Observer (receives events) and an Observable (sends events).

Module Contents#

T#
OnNext#
OnError#
OnComplete#
class Subject#

Bases: aiq.utils.reactive.observable.Observable[T], aiq.utils.reactive.observer.Observer[T], aiq.utils.reactive.base.subject_base.SubjectBase[T]

A Subject is both an Observer (receives events) and an Observable (sends events). - Maintains a list of ObserverBase[T]. - No internal buffering or replay; events are only delivered to current subscribers. - Thread-safe via a lock.

Once on_error or on_complete is called, the Subject is closed.

_lock#
_closed = False#
_error: Exception | None = None#
_observers: list[aiq.utils.reactive.observer.Observer[T]] = []#
_disposed = False#
_subscribe_core(
observer: aiq.utils.reactive.observer.Observer[T],
) aiq.utils.reactive.subscription.Subscription#

Subscribe to this subject. If disposed, returns a dummy subscription. Otherwise, registers the given observer.

on_next(value: T) None#

Called by producers to emit an item. Delivers synchronously to each observer. If closed or disposed, do nothing.

on_error(exc: Exception) None#

Called by producers to signal an error. Notifies all observers.

on_complete() None#

Called by producers to signal completion. Notifies all observers, then clears them. Subject is closed.

_unsubscribe_observer(
observer: aiq.utils.reactive.observer.Observer[T],
) None#
dispose() None#

Immediately close the Subject. No future on_next, on_error, or on_complete. Clears all observers.