Quick tip: Using SingleStoreDB with Delta Lake

Akmal Chaudhri - Mar 1 '23 - - Dev Community

Abstract

This short article will show how to install Delta Lake with Apache Spark on SingleStore. We'll use the SingleStore Spark Connector to read data from SingleStoreDB and write it into the Delta Lake, then read data from the Delta Lake and write it back into SingleStoreDB.

The notebook file used in this article is available on GitHub.

Introduction

Previously, we explored how to use Apache Iceberg with SingleStoreDB via Spark Dataframes. In this article, we'll focus on Delta Lake and provide details on one possible configuration for using it with SingleStoreDB from a Python notebook environment.

For production environments, please use a robust file system for your Delta Lake.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Iris Demo Group as our Workspace Group Name and iris-demo as our Workspace Name. We'll make a note of the password and store it in the secrets vault using the name password.

Notebook

Let's now start to fill out our notebook.

Install Software

First, we'll need to install pyspark and delta-spark:

!conda install -y --quiet -c conda-forge openjdk

!pip install delta-spark --quiet
!pip install pyspark --quiet
Enter fullscreen mode Exit fullscreen mode

Once the installation is complete, we'll prepare our SparkSession:

# Create Spark session with Delta Lake integration
builder = (SparkSession.builder
             .config("spark.jars", ",".join([destination for _, destination in jar_urls]))
             .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
             .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
             .appName("Spark Delta Lake Test")
)

# Use configure_spark_with_delta_pip to integrate Delta
spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
Enter fullscreen mode Exit fullscreen mode

Next, we'll download some data to use and store it in Pandas and also create a database.

Connect to SingleStoreDB

First, we'll provide connection details for SingleStoreDB:

from sqlalchemy import *

db_connection = create_engine(connection_url)
url = db_connection.url
Enter fullscreen mode Exit fullscreen mode

We'll also write the Pandas data to the database.

We'll now set some parameters for the SingleStore Spark Connector:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

Create Dataframe from SingleStoreDB, Write to Delta Lake

We can create a Spark Dataframe from SingleStoreDB, as follows:

iris_df = (spark.read
                .format("singlestore")
                .load("iris_db.iris")
)
Enter fullscreen mode Exit fullscreen mode

Next, we'll check the data:

iris_df.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+------------+-----------+------------+-----------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|        species|
+------------+-----------+------------+-----------+---------------+
|         6.4|        3.1|         5.5|        1.8| Iris-virginica|
|         4.9|        2.4|         3.3|        1.0|Iris-versicolor|
|         4.8|        3.4|         1.9|        0.2|    Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|    Iris-setosa|
|         5.0|        3.3|         1.4|        0.2|    Iris-setosa|
+------------+-----------+------------+-----------+---------------+
Enter fullscreen mode Exit fullscreen mode

Let's now write the data to the Delta Lake:

(iris_df.write
        .format("delta")
        .save("warehouse/delta-table")
)
Enter fullscreen mode Exit fullscreen mode

Create Dataframe from Delta Lake, Write to SingleStoreDB

Now, let's read the data back from the Delta Lake:

new_iris_df = (spark.read
                    .format("delta")
                    .load("warehouse/delta-table")
)
Enter fullscreen mode Exit fullscreen mode

Next, we'll check the data:

new_iris_df.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+------------+-----------+------------+-----------+---------------+
|sepal_length|sepal_width|petal_length|petal_width|        species|
+------------+-----------+------------+-----------+---------------+
|         5.1|        3.5|         1.4|        0.2|    Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|    Iris-setosa|
|         5.0|        3.5|         1.3|        0.3|    Iris-setosa|
|         5.7|        2.8|         4.1|        1.3|Iris-versicolor|
|         6.5|        3.0|         5.5|        1.8| Iris-virginica|
+------------+-----------+------------+-----------+---------------+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

Let's now write the data to SingleStoreDB:

(new_iris_df.write
            .format("singlestore")
            .option("loadDataCompression", "LZ4")
            .mode("overwrite")
            .save("iris_db.new_iris")
)
Enter fullscreen mode Exit fullscreen mode

We can check that the new_iris table was created, and we can query the data:

SELECT * FROM new_iris LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

Summary

Using Spark Dataframes, we can seamlessly work with SingleStoreDB and Delta Lake.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player