PySpark and Apache Spark Broadcast Mechanism

Adi Polak - Apr 6 '20 - - Dev Community

🐦 Follow me on Twitter, happy to take your suggestions on topics.


Apache Spark is based on distributed computation and distributed data concepts. Each machine/task gets a piece of the data to process.

Many times, we will need something like a lookup table or parameters to base our calculations. Those parameters will be static and won't change during the calculation, they will be read-only params.

Broadcast variables are used when static(read-only) variables need to be shared across executers.

Why Should We Use It?

Without broadcast variables, these variables would be shipped to each executor for every transformation and action; this can cause network overhead. However, with broadcast variables, they are shipped once to all executors and are cached for future reference. See the example next.

Python code sample with PySpark :

Here, we create a broadcast from a list of strings.
Loading a Parquet file to Spark DataFrame and filter the DataFrame based on the broadcast value. The broadcast is being shipped to the executers only once(network call for each executor). If we used a list without the broadcasting mechanism, for every row/entry in the DataFrame, we would send the whole list, which will result in many networking requests.

The number of requests will be equal or greater than the number of rows in the DataFrame. Since we talk about Big Data computation, the number of executors is necessarily smaller than the number of rows. And will clutter our cluster.
In the end, we release the executor dedicated memory by calling broadcastVar.unpersist().

from pyspark import SparkContext, broadcast
from pyspark.sql import SparkSession 
import pyspark.sql.functions as func

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("App Name") \
    .getOrCreate() # starts spark session locally

sc = spark.sparkContext

words_new = sc.broadcast(["list of values to broadcast", "spark", "python"]) 
data = words_new.value # accessing the value stored in the broadcast in master

df = spark.read.parquet ('some_file_with_header.parquet')  # loading parquet file into Spark DataFrame
filtered = df.filter(func.col('name').isin(words_new.value)) # filtering dataframe based on broadcast list with isin functionality

words_new.unpersist() # sending requests to each executer to release static variables from dedicated memory
Enter fullscreen mode Exit fullscreen mode

That was Apache Spark Broadcast with PySpark in UNDER 3 min! Which is part of Apache Spark Bitesize series.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player