Spark Structured Streaming - Custom Session Windows
Spark Structured Streaming allows us to perform aggregations over a sliding event-time window in a straightforward manner
out of the box using windowing operation on event time and watermarking.
Use case
Continue to sum the wins of a user for the duration of the session, and when there is a deposit event in the stream, deposit the current balance and close the session.
Windowing and watermarking
Windowing and watermarking by default work with time based windows. In our use case this does not work as we want to close the session when we get a signal to deposit the current balance.
Result
We can achieve this by implementing a custom session window based on the mapGroupsWithState function
Step 0: Define the Custom session classes
case class Transaction(sessionId: String, winAmount: Double, deposit: Boolean)
case class SessionTrackingValue(totalSum: Double)
case class SessionUpdate(sessionId: String, currentBalance: Double, depositCurrentBalance: Boolean)
Create a local SparkSession running locally utilizing all cores
val spark: SparkSession = SparkSession
.builder
.master("local[*]")
.appName(getClass.getName)
.getOrCreate()
Create a socket stream bound to port 9999 on localhost
val socketStream: DataFrame = spark.readStream
// socket as stream input
.format("socket")
// connect to socket port localhost:9999 waiting for incoming stream
.option("host", "localhost")
.option("port", 9999)
.load()
Map the input to a Transaction case class
import spark.implicits._
val transactions = socketStream
.as[String]
.map(inputLine => {
val fields = inputLine.split(",")
Transaction(fields(0), fields(1).toDouble, Try(fields(2).toBoolean).getOrElse(false))
})
If the deposit
field is contained in the input, it will be populated into the transaction.
If it is not contained in the input, we use .getOrElse
to give it a value of false
Create a KeyValueGroupedDataset of kv pairs (sessionId, Transaction)
val idSessionKv: KeyValueGroupedDataset[String, Transaction] = transactions.groupByKey(x => x.sessionId)
Use .mapGroupsWithState to check whether state exists and perform action on the state
val sessionUpdates: Dataset[SessionUpdate] = idSessionKv.mapGroupsWithState[SessionTrackingValue, SessionUpdate](GroupStateTimeout.NoTimeout()) {
// mapGroupsWithState: key: K, it: Iterator[V], s: GroupState[S]
case (sessionId: String, eventsIter: Iterator[Transaction], state: GroupState[SessionTrackingValue]) => {
val events = eventsIter.toSeq
val updatedSession =
if (state.exists) {
val existingState = state.get
val updatedEvents = SessionTrackingValue(existingState.totalSum + events.map(_.winAmount).sum)
updatedEvents
}
else {
SessionTrackingValue(events.map(event => event.winAmount).sum)
}
state.update(updatedSession)
val toCloseSession = events.exists(_.deposit)
// when there is an deposit in the event, close the session by removing the state
if (toCloseSession) {
// here we could perform a specific action when we receive the end of the session signal (store, send, update other state)
// in this case we would just deposit the current balance to a data store
// state.save() .. TODO unimplemented for this example
state.remove()
SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = true)
}
else {
SessionUpdate(sessionId, updatedSession.totalSum, depositCurrentBalance = false)
}
}
}
TODO: explain
Run the app
val query: StreamingQuery = sessionUpdates
.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
Full code github-repo