nat.object_store.in_memory_object_store#

Classes#

InMemoryObjectStoreConfig

Object store that stores objects in memory. Objects are not persisted when the process shuts down.

InMemoryObjectStore

Implementation of ObjectStore that stores objects in memory. Objects are not persisted when the process shuts down.

Functions#

in_memory_object_store(config, builder)

Module Contents#

class InMemoryObjectStoreConfig(/, **data: Any)#

Bases: nat.data_models.object_store.ObjectStoreBaseConfig

Object store that stores objects in memory. Objects are not persisted when the process shuts down.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

class InMemoryObjectStore#

Bases: nat.object_store.interfaces.ObjectStore

Implementation of ObjectStore that stores objects in memory. Objects are not persisted when the process shuts down.

_lock#
_store: dict[str, nat.object_store.models.ObjectStoreItem]#
async put_object(
key: str,
item: nat.object_store.models.ObjectStoreItem,
) None#

Save an ObjectStoreItem in the object store with the given key. If the key already exists, raise an error.

Args:

key (str): The key to save the item under. item (ObjectStoreItem): The item to save.

Raises:

KeyAlreadyExistsError: If the key already exists.

async upsert_object(
key: str,
item: nat.object_store.models.ObjectStoreItem,
) None#

Save an ObjectStoreItem in the object store with the given key. If the key already exists, update the item.

Args:

key (str): The key to save the item under. item (ObjectStoreItem): The item to save.

async get_object(key: str) nat.object_store.models.ObjectStoreItem#

Get an ObjectStoreItem from the object store by key.

Args:

key (str): The key to get the item from.

Returns:

ObjectStoreItem: The item retrieved from the object store.

Raises:

NoSuchKeyError: If the item does not exist.

async delete_object(key: str) None#

Delete an ObjectStoreItem from the object store by key.

Args:

key (str): The key to delete the item from.

Raises:

NoSuchKeyError: If the item does not exist.

async in_memory_object_store(
config: InMemoryObjectStoreConfig,
builder: nat.builder.builder.Builder,
)#