Getting started with Spark Structured Streaming and Kafka

I was recently doing the Cloud Computing Specialization on Coursera and its capstone project is about processing a set of datasets with batch and then with streaming. During the streaming part, I wanted to use Spark and then I saw this new streaming project called Spark Structured Streaming, which I ended up using for this project, along with other technologies, like Kafka and Cassandra.

Spark Structured Streaming is a new streaming engine built on top the Spark SQL engine and Datasets. With it you can create streaming applications using a higher level API, without really having to care about some of the nuances required with the previous Spark Streaming based on RDDs, like writing intermediate results.

The Spark Structured Streaming Programming Guide already does a great job in covering a lot about this new Engine, so the idea of this article is more towards writing a simple application. Currently, Kafka is pretty much a no-brainer choice for most streaming applications, so we’ll be seeing a use case integrating both Spark Structured Streaming and Kafka.

In order to create a simple application, we’ll use a very simple PageView event containing the date, the page viewed and the user. This event is mapped in the following class:

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

case class PageView(date: String, page: String, user: String)

object PageView {

  def apply(line: String): PageView = {
    val Array(at, page, user) = line.split(",")
    PageView(extractDate(at), page, user)
  }

  private def extractDate(at: String): String = {
    val inputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
    val outputFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
    LocalDateTime.parse(at, inputFormatter).format(outputFormatter)
  }

}

A set of these events would be something like:

2017-06-20 23:22:44.111,/page1,user1
2017-06-20 23:23:44.111,/page2,user100
2017-06-20 23:24:44.111,/page2,user101
2017-06-20 23:24:45.111,/page3,user1
2017-06-21 12:24:44.111,/page3,user1
2017-06-21 13:24:44.111,/page3,user101
2017-06-21 14:24:44.111,/page2,user101
2017-06-21 15:24:44.111,/page3,user2

Processing events

In our steaming application, we want to find out two pieces of information:

  • The most accessed pages ever
  • The most active users per day

The code for fulfilling these requirements can be seen below:

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{Dataset, SparkSession}

object StreamingApp extends App {

  // 1
  val spark = SparkSession
    .builder()
    .appName("spark-streaming")
    .getOrCreate()

  import spark.implicits._

  // 2
  spark.udf.register("deserialize", (message: String) => PageView(message))

  // 3
  val ds = createDataSet(spark)

  // 4
  val mostViewedPages = ds
    .groupBy($"page")
    .count()
  // 5
  val mostActiveUsersByDay = ds
    .groupBy($"date", $"user")
    .count()

  // 6
  val mostViewedPagesQuery = startStreamingQueryFrom(mostViewedPages)
  val mostActiveUsersByDayQuery = startStreamingQueryFrom(mostActiveUsersByDay)

  // 7
  Seq(mostViewedPagesQuery, mostActiveUsersByDayQuery)
    .foreach(_.awaitTermination())

  def createDataSet(spark: SparkSession): Dataset[PageView] = {
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<kafka-hosts>")
      .option("subscribe", "<kafka-topic>")
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("""deserialize(CAST(value as STRING)) AS message""")
      .select($"message".as[PageView])
  }

  def startStreamingQueryFrom[T](dataset: Dataset[T]): StreamingQuery =
    dataset.writeStream
      .outputMode("update")
      .format("console")
      .start()
}

Before going through the individual parts of this class, it’s worth noticing that the Spark API used here (Dataset) is really similar to the one used when running Batch jobs, so you can reuse your previous knowledge and code. There’s no specific streaming related code, like accumulating state, which is required with the original Spark Streaming.

I’ll try to break this into smaller parts, so it’s easier to understand:

  • 1: Just creates a Spark Session so that we can import its functions, which is done right below it.
  • 2: We register a user defined function responsible for getting a line (in the csv format) and converting it to an instance of PageView. This function is named deserialise.
  • 3: The method createDataSet is responsible for creating a stream of PageView objects. Here the specifics of the data source needs to be set. In this case, as we’re reading data from Kafka, we set some related properties, like host, and topic. The Kafka messages (both key and value) are always deserialized as Array[Byte], so we convert it to String and then invoke our UDF deserialize.
  • 4: We process the stream of data to create a dataset with the most viewed pages. The Dataset API is quite expressive, note that we just need to group the data by the page and then count it. Under the hood there’s a lot going on here. Spark Sql Catalyst optimizer will try to covert this expression into the most performatic set of operations. Also, even though we’re processing a continuous stream of data, we don’t need to have any special code for that at this moment.
  • 5: Similar to the step above, in this case we’re counting the number of page views by user grouped by day.
  • 6: So far we just have a simple Dataset object, nothing has really happened. Each Spark Structured Streaming operation is called StreamingQuery. This abstraction is responsible for continuously applying the dataset rules/operations as new data arrives. When creating a StreamingQuery, a few important parameters need to be set, like Output Mode and Sink/Format. In this example the results are just being outputed to the console, but in a real application you’d probably output them to another messaging system or some persistent storage, like a distributed file system or a database.
  • 7: To finish, we leave our queries running until the application is stopped. It’s interesting to notice that multiple queries can be started within a single application and Spark will automatically take care of distributing them in the cluster.

Running this application against a Kafka topic receiving the set of events shown above could produce different intermediate results, depending on the time frame they’d arrive, but the final result would be the same, with something like:

+----------+-------+-----+
| date     | user  |count|
+----------+-------+-----+
|2017-06-21| user1 |    1|
|2017-06-21|user101|    2|
|2017-06-20|user100|    1|
|2017-06-21| user2 |    1|
|2017-06-20|user101|    1|
|2017-06-20| user1 |    2|
+----------+-------+-----+

+------+-----+
| page |count|
+------+-----+
|/page3|    4|
|/page1|    1|
|/page2|    3|
+------+-----+

Conclusion

The idea of this post was just to give a brief overview about Spark Structured Streaming. I really encourage you to go trough its documentation, as it covers this topic with way more details and covers some specific points, like:

Leave a comment