This post explores the powerful combination of PySpark and AWS Glue for streamlining data ETL (Extract, Transform, Load) processes. We'll delve into:
PySpark: Harnessing Python's flexibility for large-scale data analysis and transformations within the familiar Python environment.
AWS Glue: Simplifying and scaling ETL workflows with a fully managed, serverless service on AWS.
The Challenge:
Efficiently transferring data from an RDS MySQL database to an S3 data lake.
The Solution:
Defining the ETL Job: Moving data from
stats
,ability
, andinfo
tables in MySQL to S3.Setting Up Glue Studio: Selecting
Author code with a script editor
, establishing IAM roles, and downloading the MySQL JDBC driver to S3.-
Coding with PySpark: Utilizing the provided code template for a smooth workflow:
- Creating a SparkSession.
- Adding the JDBC driver.
- Defining a function to extract data from tables.
- Reading data from multiple tables.
- Transforming the "capture_rate" in the "info" table.
- Partitioning data into timestamp-based subfolders.
- Writing data to S3 in Parquet format.
I have prepared a code template for you to easier start with it:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col
from datetime import datetime
spark = SparkSession.builder.getOrCreate() # Create SparkSession
spark.sparkContext.addPyFile("s3://<you-bucket>/mysql-connector-j-8.3.0.jar") # Add the MySQL JDBC driver to the classpath
jdbc_url = "jdbc:mysql://<your-host>:3306/<your-database>"
connection_properties = {"user": "admin", "password": "********"}
def extract_df_to_s3(spark_session, jdbc_url, connection_properties, table_name):
df = spark_session.read.jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
return df
# Read data from multiple tables
df_info = extract_df_to_s3(spark, jdbc_url, connection_properties, "info")
df_ability = extract_df_to_s3(spark, jdbc_url, connection_properties, "ability")
df_stats = extract_df_to_s3(spark, jdbc_url, connection_properties, "stats")
# Transform capture_rate in the info table and cast to int
df_info = df_info.withColumn("capture_rate", regexp_extract("capture_rate", r"^\D*(\d+).*$", 1)) \
.withColumn("capture_rate", col("capture_rate").cast("int"))
# Generate timestamp subfolders for each DataFrame
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
info_subfolder = f"pokemon_info_processed/{timestamp_str}"
ability_subfolder = f"pokemon_ability_processed/{timestamp_str}"
stats_subfolder = f"pokemon_stat_processed/{timestamp_str}"
# Write DataFrames to separate folders
df_info.write.parquet(f"s3://<your-bucket>-datalake/{info_subfolder}", mode="overwrite")
df_ability.write.parquet(f"s3://<your-bucket>-datalake/{ability_subfolder}", mode="overwrite")
df_stats.write.parquet(f"s3://<your-bucket>-datalake/{stats_subfolder}", mode="overwrite")
Now you can run the job and check if the result appear in the S3 Data Lake bucket. ✨