Spark Streaming

Spark Streaming allows on-the-fly analysis of live data streams with MongoDB. See the Apache documentation for a detailed description of Spark Streaming functionality.

This tutorial uses the Spark Shell.For more information about starting the Spark Shell and configuring it for use with MongoDB, see Getting Started.

This tutorial demonstrates how to use Spark Streaming to analyze input data from a TCP port. It uses Netcat, a lightweight network utility, to send text inputs to a local port, then uses Scala to determine how many times each word occurs in each line and write the results to a MongoDB collection.

Start Netcat from the command line:

$ nc -lk 9999

Start the Spark Shell at another terminal prompt.

import com.mongodb.spark.sql._
import org.apache.spark.streaming._

Create a new StreamingContext object and assign it to ssc. sc is a SparkContext object that is automatically created when you start the Spark Shell. The second argument specifies how often to check for new input data.

val ssc = new StreamingContext(sc, Seconds(1))

Use the socketTextStream method to create a connection to Netcat on port 9999:

val lines = ssc.socketTextStream("localhost", 9999)

Determine how many times each word occurs in each line:

val words = lines.flatMap(_.split(" "))
val pairs = => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

Create a data structure to hold the results:

case class WordCount(word: String, count: Int)

Use a foreachRDD loop to collect results and write to the MongoDB collection specified in the Spark Connector configuration. The append mode causes data to be appended to the collection, whereas overwrite mode replaces the existing data.

wordCounts.foreachRDD({ rdd =>
  import spark.implicits._
  val wordCounts ={ case (word: String, count: Int)
          => WordCount(word, count) }).toDF()

Start listening:


To give your program something to listen to, go back to the terminal prompt where you started Netcat and start typing.

hello world
cats cats dogs dogs dogs

In your MongoDB collection you’ll find something similar to the following:

{ "_id" : ObjectId("588a539927c22bd43214131f"), "word" : "hello", "count" : 1 }
{ "_id" : ObjectId("588a539927c22bd432141320"), "word" : "world", "count" : 1 }
{ "_id" : ObjectId("588a53b227c22bd432141322"), "word" : "cats", "count" : 2 }
{ "_id" : ObjectId("588a53b227c22bd432141323"), "word" : "dogs", "count" : 3 }

To end your Netcat process, use ctrl-c. To end your Spark Shell session, use System.exit(0).