package stream
- Alphabetic
- Public
- All
Type Members
-
case class
ObjectState(id: String, start: Timestamp, end: Timestamp, points: List[Coordinate], sampling: Int = 1, lastXPoints: List[Coordinate] = List.empty, object: Object = null, embeddings: Array[Embedding] = Array.empty) extends Product with Serializable
tracks detections of a given object over a period of time, this includes geo points i.e latitude and longitude, timestamp, object appearance/embeddings and in future other attributes like Pose, Gaze and Gesture This is used for internal state management
tracks detections of a given object over a period of time, this includes geo points i.e latitude and longitude, timestamp, object appearance/embeddings and in future other attributes like Pose, Gaze and Gesture This is used for internal state management
- id
unique id of the object, default implementation use sensor-id + object-id
- start
start timestamp of the object detection
- end
end timestamp of the object detection
- points
List of geo points or Locations
- sampling
if point sampling is x, then 1/x points is the sampling ratio
- embeddings
array of embeddings to represent a given object
-
class
Sink extends Serializable
primarily comprises of multiple writers of kafka sink and a milvus sink, example writer shown below
primarily comprises of multiple writers of kafka sink and a milvus sink, example writer shown below
//example writer implementation val behaviorWriter = new ForeachWriter[Option[Behavior]]{ log.info("init writer") var producer: KProducerByteArray = _ override def open(partitionId: Long, version: Long) = { producer = new KProducerByteArray(brokers) true } override def process(value: Option[Behavior]) = { //enrich Behavior object as needed if(value.isDefined){ val b = value.get producer.send(behaviorTopic, b.getSensor.id, b.toByteArray) } } override def close(errorOrNull: Throwable) = { producer.close } } //use of the above in StreamProcessor class val behaviorQuery = dataset // dataset of Message .groupByKey(msg => msg.key) //represents unique object-id .mapGroupsWithState(processingTimeTimeout())(stateMgmt.updateBehavior) .writeStream .option("checkpointLocation", checkpoint_behavior) //checkpoint location .foreach(sink.behaviorWriter) //write behavior to kafka
-
class
StateMgmt extends Serializable
State Management, for object tracking over a period of time, used for metadata in protobuf
State Management, for object tracking over a period of time, used for metadata in protobuf
//init state management val stateMgmt = new StateMgmt (config) //usage for dataset of Message dataset .groupByKey (msg => msg.key) .mapGroupsWithState (processingTimeTimeout () ) (stateMgmt.updateBehavior)
-
case class
jObjectState(id: String, start: Timestamp, end: Timestamp, points: List[jCoordinate], attributes: String = null) extends Product with Serializable
tracks detections of a given object over a period of time, this includes geo points i.e latitude and longitude, timestamp, object appearance and in future other attributes like Pose, Gaze and Gesture This is used for internal state management
tracks detections of a given object over a period of time, this includes geo points i.e latitude and longitude, timestamp, object appearance and in future other attributes like Pose, Gaze and Gesture This is used for internal state management
- id
unique id of the object, default implementation use sensor-id + object-id
- start
start timestamp of the object detection
- end
end timestamp of the object detection
- points
List of geo points or Locations
- attributes
object specific attributes for person or vehicle is stored as JSON string, to make it portable across different object type
-
class
jSink extends AnyRef
primarily comprises of multiple writer of kafka sink, example writer shown below
primarily comprises of multiple writer of kafka sink, example writer shown below
//example writer implementation val behaviorWriter = new ForeachWriter[Option[Behavior]]{ log.info("init frame writer") var producer: KProducer = _ override def open(partitionId: Long, version: Long) = { producer = new KProducer(brokers) true } override def process(value: Option[Behavior]) = { //enrich Behavior object as needed if(value.isDefined){ val b = value.get val j = Util.jsonString(b) producer.send(framesTopic, b.sensorId, j) } } override def close(errorOrNull: Throwable) = { producer.close } } //use of the above in StreamProcessor class val behaviorQuery = dataset // dataset of Message .groupByKey(msg => msg.key) //represents unique object-id .mapGroupsWithState(processingTimeTimeout())(stateMgmt.updateBehavior) .writeStream .option("checkpointLocation", checkpoint_behavior) //checkpoint location .foreach(kafkasink.behaviorWriter) //write behavior to kafka
-
class
jStateMgmt extends Serializable
State Management, for object tracking over a period of time, use for metadata in json
State Management, for object tracking over a period of time, use for metadata in json
// init state management val stateMgmt = new StateMgmt(config) //usage for dataset of Message dataset .groupByKey(msg => msg.key) .mapGroupsWithState(processingTimeTimeout())(stateMgmt.updateBehavior)