Navigation

Spark Connector Python API

Source Code

For the source code that contains the examples below, see introduction.py.

Prerequisites

  • Basic working knowledge of MongoDB and Apache Spark. Refer to the MongoDB documentation and Spark documentation.
  • Running MongoDB instance (version 2.6 or later).
  • Spark 1.6.x.
  • Scala 2.10.x if using the mongo-spark-connector_2.10 package
  • Scala 2.11.x if using the mongo-spark-connector_2.11 package

pyspark Shell

This tutorial uses the pyspark shell. When starting the pyspark shell, you can specify:

  • the --packages option to download the MongoDB Spark Connector package. The following packages are available:

    • mongo-spark-connector_2.10 for use with Scala 2.10.x
    • mongo-spark-connector_2.11 for use with Scala 2.11.x
  • the --conf option to configure the MongoDB Spark Connnector. These settings configure the SparkConf object.

    Note

    When specifying the Connector configuration via SparkConf, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the Configuration Options.

For example,

./bin/pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \
              --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" \
              --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
  • The spark.mongodb.input.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
  • The spark.mongodb.output.uri specifies the MongoDB server address(127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data.

The examples in this tutorial will use this database and collection.

In the pyspark shell, the SparkContext is available as sc, and the SQLContext is available as sqlContext.

MongoDB Python API Basics

The python API works via DataFrames and uses the underlying Scala DataFrame.

Write to MongoDB

The Python API works via DataFrames and underlying Scala DataFrame.

To create a DataFrame, use the SQLContext.createDataFrame() method. To save the DataFrame to MongoDB, use the DataFrameWriter.save() method. For example:

charactersRdd = sc.parallelize([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
   ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)])
characters = sqlContext.createDataFrame(charactersRdd, ["name", "age"])
characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()

Read from MongoDB

To load the collection into a DataFrame, use the sqlContext.read() method with com.mongodb.spark.sql.DefaultSource as the format.

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()

In the pyspark shell, the operation prints the following output:

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

SQL

SQL can be used to filter data. To use SQL, you need to register a temporary table first, and then you can run SQL queries over the data.

The following example registers a temporary table characters, then uses SQL to filter for characters with ages greater than or equal to 100:

df.registerTempTable("characters")
centenarians = sqlContext.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

In the pyspark shell, the operation prints the following output:

+-------+----+
|   name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
|  Balin| 178|
| Dwalin| 169|
|    Oin| 167|
|  Gloin| 158|
+-------+----+