class Sink extends Serializable
primarily comprises of multiple writers of kafka sink and a milvus sink, example writer shown below
//example writer implementation val behaviorWriter = new ForeachWriter[Option[Behavior]]{ log.info("init writer") var producer: KProducerByteArray = _ override def open(partitionId: Long, version: Long) = { producer = new KProducerByteArray(brokers) true } override def process(value: Option[Behavior]) = { //enrich Behavior object as needed if(value.isDefined){ val b = value.get producer.send(behaviorTopic, b.getSensor.id, b.toByteArray) } } override def close(errorOrNull: Throwable) = { producer.close } } //use of the above in StreamProcessor class val behaviorQuery = dataset // dataset of Message .groupByKey(msg => msg.key) //represents unique object-id .mapGroupsWithState(processingTimeTimeout())(stateMgmt.updateBehavior) .writeStream .option("checkpointLocation", checkpoint_behavior) //checkpoint location .foreach(sink.behaviorWriter) //write behavior to kafka
- Alphabetic
- By Inheritance
- Sink
- Serializable
- Serializable
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Instance Constructors
- new Sink(config: Map[String, String])
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
val
behaviorEWithTripwireWriter: ForeachWriter[Option[(Behavior, Behavior)]] { ... /* 2 definitions in type refinement */ }
behavior writer used for tripwire, take two inputs, the original trajectory and tracklet to be used for tripwire event detection
-
val
behaviorEWriter: ForeachWriter[Option[Behavior]] { ... /* 2 definitions in type refinement */ }
behavior writer for cartesian / euclidean space
-
val
behaviorIWriter: ForeachWriter[Option[(Behavior, Behavior)]] { ... /* 2 definitions in type refinement */ }
behavior writer in Image Coordinates, used for tripwire, take two inputs, the original trajectory and tracklet to be used for tripwire event detection
-
val
behaviorInferenceWriter: ForeachWriter[(String, List[Behavior])] { ... /* 2 definitions in type refinement */ }
behavior writer for Inference As a Service, inference is done using Triton Inference Server the current inference service is used find cluster index in which a trajectory belong
behavior writer for Inference As a Service, inference is done using Triton Inference Server the current inference service is used find cluster index in which a trajectory belong
In future more services like trajectory smoothing, pose estimation, gesture and gaze estimation will be added
-
val
behaviorWriter: ForeachWriter[Option[Behavior]] { ... /* 2 definitions in type refinement */ }
behavior writer used for geo spatial use case
behavior writer used for geo spatial use case
Apart from persisting object behavior across time, also check of anomalies like abnormal movement, speed violation and vehicle stopping
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
val
frameListWriter: ForeachWriter[List[FrameMessage]] { ... /* 2 definitions in type refinement */ }
frame list writer used for enhanced frame
-
val
frameWriter: ForeachWriter[FrameMessage] { ... /* 2 definitions in type refinement */ }
frame writer used for enhanced frame
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
lazy val
log: Logger
init logger
init logger
- Annotations
- @transient()
- val milvusWriter: ForeachWriter[(String, List[Behavior])]
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
- def upsert(behaviors: List[Behavior]): Unit
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
Deprecated Value Members
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] ) @Deprecated
- Deprecated