Quick tip: Using the SingleStore Spark Connector with SingleStore Notebooks

Akmal Chaudhri - Mar 31 - - Dev Community

Abstract

Continuing our series on using Apache Spark with SingleStore, we'll use a simple example to demonstrate how we can write a Spark DataFrame to SingleStore and read the data back from SingleStore into a new Spark DataFrame, using the SingleStore Spark Connector.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:

  • Workspace Group Name: Spark Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: spark-demo
  • Size: S-00

We'll make a note of the password and store it in the secrets vault using the name password.

Create a new notebook

From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.

In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

Figure 1. New Notebook.

We'll call the notebook spark_connector_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let's install Java:

!conda install -y --quiet -c conda-forge openjdk
Enter fullscreen mode Exit fullscreen mode

Next, we'll create a directory to store some jar files:

os.makedirs("jars", exist_ok = True)
Enter fullscreen mode Exit fullscreen mode

We'll now download some jar files, as follows:

def download_jar(url, destination):
    response = requests.get(url)
    with open(destination, "wb") as f:
        f.write(response.content)

jar_urls = [
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-jdbc-client/1.2.4/singlestore-jdbc-client-1.2.4.jar", "jars/singlestore-jdbc-client-1.2.4.jar"),
    ("https://repo1.maven.org/maven2/com/singlestore/singlestore-spark-connector_2.12/4.1.8-spark-3.5.0/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar", "jars/singlestore-spark-connector_2.12-4.1.8-spark-3.5.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-dbcp2/2.12.0/commons-dbcp2-2.12.0.jar", "jars/commons-dbcp2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.12.0/commons-pool2-2.12.0.jar", "jars/commons-pool2-2.12.0.jar"),
    ("https://repo1.maven.org/maven2/io/spray/spray-json_3/1.3.6/spray-json_3-1.3.6.jar", "jars/spray-json_3-1.3.6.jar")
]

for url, destination in jar_urls:
    download_jar(url, destination)

print("JAR files downloaded successfully")
Enter fullscreen mode Exit fullscreen mode

These jar files include the SingleStore JDBC Client and the SingleStore Spark Connector, as well as several other jar files needed for connectivity and data management.

Now we are ready to create a SparkSession:

# Create a Spark session
spark = (SparkSession
             .builder
             .config("spark.jars", ",".join([destination for _, destination in jar_urls]))
             .appName("Spark Connector Test")
             .getOrCreate()
        )

spark.sparkContext.setLogLevel("ERROR")
Enter fullscreen mode Exit fullscreen mode

Next, we'll create a simple DataFrame:

# Create a DataFrame
data = [("Peter", 27), ("Paul", 28), ("Mary", 29)]
df = spark.createDataFrame(data, ["Name", "Age"])
Enter fullscreen mode Exit fullscreen mode

Now we'll show the DataFrame:

# Show the content of the DataFrame
df.show()
Enter fullscreen mode Exit fullscreen mode

The output should be as follows:

+-----+---+
| Name|Age|
+-----+---+
|Peter| 27|
| Paul| 28|
| Mary| 29|
+-----+---+
Enter fullscreen mode Exit fullscreen mode

A database is required, so we'll create one:

DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
Enter fullscreen mode Exit fullscreen mode

We'll now prepare the connection to SingleStore:

from sqlalchemy import *

db_connection = create_engine(connection_url)
url = db_connection.url
Enter fullscreen mode Exit fullscreen mode

Now we'll create the Spark connection to SingleStore:

password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
Enter fullscreen mode Exit fullscreen mode

We also need to set some configuration parameters:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

We'll now write the DataFrame:

(df.write
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .mode("overwrite")
    .save("spark_demo.demo")
)
Enter fullscreen mode Exit fullscreen mode

In this case, the demo table will be created for us.

Next, we'll read the data back into a new DataFrame:

new_df = (spark.read
    .format("singlestore")
    .load("spark_demo.demo")
)
Enter fullscreen mode Exit fullscreen mode

Now we'll show the new DataFrame:

# Show the content of the DataFrame
new_df.show()
Enter fullscreen mode Exit fullscreen mode

The output could be as follows:

+-----+---+
| Name|Age|
+-----+---+
| Paul| 28|
| Mary| 29|
|Peter| 27|
+-----+---+
Enter fullscreen mode Exit fullscreen mode

Finally, we'll stop the SparkSession:

# Stop the Spark session
spark.stop()
Enter fullscreen mode Exit fullscreen mode

Summary

In this simple example, we saw how to configure Spark with the appropriate jar files, create a simple Spark DataFrame and write this to a SingleStore database. We then read the data back into a new DataFrame and confirmed that the data were the same. In the next article, we'll look at some additional features of the SingleStore Spark Connector.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player