mdx.mtmc.stream.state.behavior module

class StateManager(config: AppConfig)

Bases: object

Module to manage behavior state

Parameters:

config (AppConfig) – configuration for the app

behavior_state = StateManager(config)
delete_older_state() Set[str]

Deletes behaviors in state that have not been updated for a configurable time interval

Returns:

deleted behavior keys

Returns:

Set[str]

deleted_behavior_keys = behavior_state.delete_older_state()
get_behavior(behavior_id: str) Behavior | None

Gets a behavior given the behavior ID

Parameters:

behavior_id (str) – behavior ID

Returns:

behavior

Return type:

Optional[Behavior]

behavior = behavior_state.get_behavior(behavior_id)
get_behavior_ids_in_state() Set[str]

Gets all the behavior IDs in state

Returns:

set of behavior IDs

Return type:

Set[str]

behavior_ids = behavior_state.get_behavior_ids_in_state()
get_behavior_keys_in_state() Set[str]

Gets all the behavior keys (sensor-object-timestamp IDs) in state

Returns:

set of behavior keys (sensor-object-timestamp IDs)

Return type:

Set[str]

behavior_keys = behavior_state.get_behavior_keys_in_state()
get_behaviors(behavior_ids: List[str]) List[Behavior | None]

Gets a list of behaviors given the behavior IDs

Parameters:

behavior_ids (List[str]) – list of behavior IDs

Returns:

list of behaviors

Return type:

List[Optional[Behavior]]

behaviors = behavior_state.get_behaviors(behavior_ids)
get_behaviors_in_state() List[Behavior]

Gets all the behaviors in state

Returns:

list of behaviors

Return type:

List[Behavior]

behaviors = behavior_state.get_behaviors_in_state()
update_places_and_locations(sensor_state_objects: Dict[str, SensorStateObject], updated_sensor_ids: Set[str]) None

Updates places and locations of behavior state objects

Parameters:
  • sensor_state_objects (Dict[str,SensorStateObject]) – map from sensor IDs to sensor state objects

  • updated_sensor_ids (Set[str]) – updated sensor IDs

Returns:

None

behavior_state.update_places_and_locations(sensor_state_objects, updated_sensor_ids)
update_state(behaviors: List[Behavior], preprocessor: Preprocessor) None

Updates behavior state

Parameters:
  • behaviors (List[Behavior]) – list of behaviors

  • preprocessor (Preprocessor) – preprocessor used to sum embeddings of a behavior

Returns:

None

behavior_state.update_state(behaviors, preprocessor)