Navigation
This version of the documentation is archived and no longer supported.

Spark Connector SparkR API

Source Code

For the source code that contains the examples below, see introduction.R.

Prerequisites

  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
  • Running MongoDB instance (version 2.6 or later).
  • Spark 1.6.x.
  • Scala 2.10.x if using the mongo-spark-connector_2.10 package
  • Scala 2.11.x if using the mongo-spark-connector_2.11 package

sparkR Shell

This tutorial uses the sparkR shell. When starting the sparkR shell, you can specify:

  • the --packages option to download the MongoDB Spark Connector package. The following packages are available:

    • mongo-spark-connector_2.10 for use with Scala 2.10.x
    • mongo-spark-connector_2.11 for use with Scala 2.11.x
  • the --conf option to configure the MongoDB Spark Connnector. These settings configure the SparkConf object.

    Note

    When specifying the Connector configuration via SparkConf, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuration Options.

For example,

./bin/sparkR  --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
              --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \
              --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
  • The spark.mongodb.input.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
  • The spark.mongodb.output.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data.

The examples in this tutorial will use this database and collection.

In the sparkR shell, SparkContext is available as sc, SQL context is available as sqlContext.

The SparkR API Basics

The Spark R API works via DataFrames and uses the underlying Scala DataFrame.

Write to MongoDB

The Spark R API works via DataFrames and underlying Scala DataFrame.

To create a DataFrame, use the createDataFrame method to convert R data.frame to a DataFrame. To save the DataFrame to MongoDB, use the write.df() method. For example:

charactersRdf <- data.frame(list(name=c("Bilbo Baggins", "Gandalf", "Thorin", "Balin", "Kili", "Dwalin", "Oin", "Gloin", "Fili", "Bombur"),
                                 age=c(50, 1000, 195, 178, 77, 169, 167, 158, 82, NA)))
charactersSparkdf <- createDataFrame(sqlContext, charactersRdf)
write.df(charactersSparkdf, "", source = "com.mongodb.spark.sql.DefaultSource", mode = "overwrite")

Read from MongoDB

To load the collection into a DataFrame, use the read.df() method with com.mongodb.spark.sql.DefaultSource as the source.

characters <- read.df(sqlContext, source = "com.mongodb.spark.sql.DefaultSource")
printSchema(characters)

In the sparkR shell, the operation prints the following output:

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: double (nullable = true)
 |-- name: string (nullable = true)

SQL

SQL can be used to filter data. To use SQL, you need to register a temporary table first, and then you can run SQL queries over the data.

The following example registers a temporary table characters, then uses SQL to filter for characters with ages greater than or equal to 100:

registerTempTable(characters, "characters")
centenarians <- sql(sqlContext, "SELECT name, age FROM characters WHERE age >= 100")
head(centenarians)

In the sparkR shell, the operation prints the following output:

     name  age
1 Gandalf 1000
2  Thorin  195
3   Balin  178
4  Dwalin  169
5     Oin  167
6   Gloin  158