(Day 177) Spark for batch processing

Ivan Ivanov · June 26, 2024

Hello :) Today is Day 177!

A quick summary of today:

  • started Module 5: batch processing from the data eng zoomcamp
    • Spark
    • Spark operations
    • Connecting PySpark to GCS

What is batch processing of data?

  • method of executing data processing tasks on a large volume of collected data all at once, rather than in real-time. It is often done at scheduled times (i.e. hourly, daily, weekly, x times per hour or minutes) or when sufficient data is accumulated

Technologies used for batch processing:

  • python scripts
  • SQL
  • Spark
  • flink

Advantages of batch jobs:

  • easy to manage
  • retry
  • scale

Disadvantages:

  • delay in getting fresh data

Spark is the most popular batch processing tool, and its variation in PySpark is popular. I have used PySpark during my placement at Lloyds Banking Group back in 2019-2020 so there was not much new info~ The most important bit is that spark works with clusters and in order to utilise spark’s power in handling large datasets, we need to partition our data.

Then, I got a refresher on spark cluster (image is from google)

We are the guest where we connect to a SparkContext and execute functions. In the spark cluster there is a cluster master that manages the workload among workers. In turn, each worker node runs multiple executors (individual tasks) and store partitions of data.

Spark groupBy (thank you GPT for the nice summary) Stage Creation:

  • Spark divides the computation into stages based on the shuffle boundaries.
  • For a group by operation, data often needs to be shuffled so that records with the same key end up on the same partition

Shuffling:

  • Spark performs a shuffle operation to redistribute data across the cluster.
  • Each partition will send its records to the appropriate partition based on the hash of the group by key.
  • This step is expensive because it involves disk I/O and network I/O

Aggregation:

  • After the shuffle, each node performs a local aggregation on its partition of the data.
  • Spark uses combiners to perform partial aggregation before shuffling to reduce the amount of data transferred

Result Collection:

  • The results from each node are combined to form the final output

Logical Plan:

Spark join

Spark creates a logical plan based on the join operation specified. This plan is then optimized.

Physical Plan:

  • The logical plan is converted into a physical plan that specifies how the join should be executed (e.g., broadcast join, sort-merge join, etc.)

Execution Strategy:

  • Broadcast Join: If one of the DataFrames is small enough, it can be broadcasted to all worker nodes to avoid shuffling
  • Sort-Merge Join: Both DataFrames are sorted by the join key and then merged. This requires shuffling both DataFrames
  • Shuffle Hash Join: DataFrames are shuffled based on the join key, and a hash table is built for the join

Shuffling:

  • Depending on the join type, Spark may shuffle the data across nodes to ensure that matching keys from both DataFrames are on the same partition

Joining:

  • The actual join operation is performed on the partitions, and results are combined

Then using a download bash script, I downloaded some taxi data as .csv.gz and then using spark, uploaded it to GCS.

Bash script to download the data (ran it for green and yellow taxi for both 2020 and 2021):

Code to transform to parquet

I installed the gcloud cli and uploaded the data through it ~ here is the data in GCS (in the pq folder)

Next, to load data from gcs using pyspark, first I needed to install an extra google cloud util for hadoop so that it can read gs://mage-zoomcamp-ii/…. And after some debugging on spark not being able to connect to an IP ~

Creating a context

And connecting and getting the data ~

The spark debugging took me a lot of time ~ it had to do with the default ip (127.0.0.1) I had set in spark-defaults.conf for spark to connect to, and because I was trying to use GCP, it could not connect so I removed the IP, and now it is just (pic) and it works.

This is not all from this module, there is a bit left, but I will leave that for tomorrow. I uploaded the code I used today to my repo.

Also, a small update from the lab ~ I wrote the first version of my (at the moment titled) ‘Previous works’ section. ^^

That is all for today!

See you tomorrow :)