(Day 340) Exploring Spark's ML library

Ivan Ivanov · December 6, 2024

Hello :) Today is Day 340!

A quick summary of today:

Prediction underground demand for Daejeon’s Subway Line 1

image

I saw this (in Korean) competition in my university from the Big Data department and saw that its finals are offline this morning at 10.30am so I decided to go check out what kind of models people made. (I am extra keen even after Vincent from probabl made this video on time-series predictions)

Team 1 - Using XGBoost

  • added about the surrounding area around each station
  • added temperature data
  • 10.59 MAE
  • used K-fold and GridSearchCV

Team 2 - AutoGluon

  • looked into special (one-time/annual) events that happened during the time period, and added a new column that says which event occured near which station
  • added vacation information data (dates when people might be travelling)
  • features: year month week weekday time station event rain vacation; target: count
  • 10.7 MAE
  • no mention of CV

Team 3

  • extra features: date features, month_sin, day_sin, time_sin, weather features, station surrounding features, month X weather interactive features
  • RandomForest - 14.16 MAE + missed CatBoost MAE
  • combined RF + CatBosst -> rf 0.9 + catboost 0.1 = 14.03 MAE

Team 4

  • TimeXer library which is based on a recent paper
  • added weather features, holiday featres
  • I think the MAE was around 30
  • they shared their repo which is awesome as I did not expect anyone to share their code publicly

Team 5

  • explored the difference between using the mean and median as predictors for demand
  • did not present a model, they joined the Big Data department this semester so they are quite new and presented their code for cleaning, transforming the data, and using mean/median for predictions👏

Team 6

  • used data after 2023 only (removing covid data)
  • showed their code on the ppt
  • used ARIMA, used auto_arima to find its hyperparams (p,d,q)

Definitely some interesting ideas ^^ Glad I went

Data Pipelines Pocket Reference - Chapter 1 - Introduction to Data Pipelines

What are they?

Data pipelines are sets of processes that move and transform data from various sources to a destination where new value can be derived. They are the foundation of analytics, reporting, and machine learning capabilities.

image

Who builds them?

Data engineers play a critical role in analytics teams by building and maintaining data pipelines to support machine learning, data science, and timely insights. Their responsibilities go beyond loading data into warehouses, focusing on scalable production systems, data validity, and timeliness through testing, alerting, and contingency planning.

Key skills:

  1. SQL and data warehousing: expertise in high-performance SQL and warehousing fundamentals for efficient database querying and collaboration
  2. programming: proficiency in Python or Java, with emerging interest in Go, tailored to the team’s tech stack
  3. distributed computing: knowledge of platforms like Apache Spark and Hadoop to handle high-volume data processing
  4. system administration: proficiency with Linux commands, cloud services and basic system tasks like log analysis and security troubleshooting
  5. goal-oriented mentality: understanding the purpose behind pipelines to make better architectural decisions aligned with team and stakeholder needs

Why build them?

They are the key behind the fancy dashboards that executives see. Data pipelines do more than just extract data from sources and load them into simple database tables or flat files for analysts to use. Raw data is refined along the way to clean, structure, normalize, combine, aggregate, and at times anonymize or otherwise secure it

Data pipelines ensure that the proper data is delivered so the rest of the analytics organization can focus their time on what they do best: delivering insights

How are they built?

There are open- and closed-source solutions, many are written in Python, but there are in other languages like Java as well.


Data Pipelines Pocket Reference - Chapter 2 - A Modern Data Infrastructure

Diversity of data sources

The majority of organizations have dozens, if not hundreds, of data sources that feed their analytics endeavors.

Source system ownership

It’s typical for an analytics team to ingest data from source systems that are built and owned by the organization as well as from third-party tools and vendors.

image

Ingestion interface and data structure

Regardless of who owns the source data, how you get it and in what form is the first thing a data engineer will examine when building a new data ingestion.

Ingestion interfaces are databases, csv files, json files, kafka streams, etc.

Data volume

Orgs have both small and big datasets

Data cleanliness and validity

Just as there is great diversity in data sources, the quality of source data varies greatly. It’s important to understand the limitations and deficiencies of source data and address them in the appropriate sections of your pipelines.

Cloud data warehouses and data lakes

Three things transformed the landscape of analytics and data warehousing over the last 10 years, and they’re all related to the emergence of the major public cloud providers

  • the ease of building and deploying data pipelines, data lakes, warehouses, and analytics processing in the cloud. No more waiting on IT departments and budget approval for large up-front costs. Managed services—databases in particular—have become mainstream
  • continued drop-in storage costs in the cloud
  • the emergence of highly scalable, columnar databases, such as Amazon Redshift, Snowflake, and Google Big Query

Data ingestion tools

The book will discuss these 3 most common tools: Singer, Stitch, and Fivetran, that can help with ingesting data from various sources.

Data transformation and modelling tools

The terms data modeling and data transformation are often used interchangeably, however the book differentiates them

Data transformation: Transforming data is a broad term that is signified by the T in an ETL or ELT process. A transformation can be something as simple as converting a timestamp stored in a table from one time zone to another. It can also be a more complex operation that creates a new metric from multiple source columns that are aggregated and filtered through some business logic.

Data modeling: Data modeling is a more specific type of data transformation. A data model structures and defines data in a format that is understood and optimized for data analysis. A data model is usually represented as one or more tables in a data warehouse.

Workflow orchestration platforms

Luigi, AWS Glue, Airflow, Kubeflow, others.


Chapter 3 - Common Data Pipeline Patterns

ETL and ELT

ETL and ELT are widely used data processing patterns in data warehousing, business intelligence, and more recently, in data science and machine learning pipelines. They describe methods of preparing data for analysis by analysts and reporting tools, with their primary distinction being the order of the transform and load steps.

The emergence of ELT over ETL

ETL was the standard data pipeline pattern for decades but has been largely supplanted by ELT due to advancements in modern, cloud-based columnar data warehouses like Snowflake and Amazon Redshift

ETL challenges

  • older data warehouses lacked the storage and compute power to process large raw datasets
  • they were row-based, optimized for transactional use cases, but inefficient for bulk queries common in analytics
  • transformations had to occur outside the warehouse before loading data

Emergence of ELT:

  • modern columnar databases support scalable, efficient processing of large datasets
  • features like I/O efficiency, data compression and distributed queries make it cost-effective to load raw data first and perform transformations within the warehouse
  • column-based storage optimizes analytics by reducing disk I/O, focusing only on relevant columns, and enabling better data compression

Impact:

  • ELT enables streamlined pipelines where data is extracted, loaded into a warehouse, and transformed directly within the database
  • this approach suits analytics, ML, and data product development by leveraging the strengths of modern data warehouses

EtLT subpattern

When ELT emerged as the dominant pattern, it became clear that doing some transformation after extraction, but before loading, was still beneficial. However, instead of transformation involving business logic or data modeling, this type of transformation is more limited in scope (hence a small t 😆). Some examples:

  • deduplicate records in a table
  • parse URL parameters into individual components
  • mask or otherwise obfuscate sensitive data

These types of transforms are either fully disconnected from business logic or, in the case of something like masking sensitive data, at times required as early in a pipeline as possible for legal or security reasons

ELT for data analysis

With the emergence of ELT, data analysts have become more autonomous and empowered to deliver value from data without being ‘blocked’ by data engineers.

image

ELT for data science

Data pipelines for data science teams are similar to those for data analysis but differ in their focus as data scientists require more granular and often raw data for exploration and predictive modeling, unlike data analysts who build metrics and dashboards. This distinction influences pipeline design to meet the specific needs of data scientists.

ELT for data products and ML

Data is used for more than analysis, reporting, and predictive models. It’s also used for powering data products. Some common examples of data products include the following:

  • a content recommendation engine that powers a video streaming home screen
  • a personalized search engine on an e-commerce website
  • an application that performs sentiment analysis on user-generated restaurant reviews

Steps in ML pipelines

  • data ingestion
  • data preprocessing
  • model training
  • model deployment

Any good ML pipeline will also include gathering feedback for improving the model


Chapter 4 - Data ingestion

This chapter gave code examples on ingesting data from different sources and how to set that up

  • mysql
  • postgres
  • mongodb
  • rest api
  • kafka

Chapter 5 - Data Ingestion: Loading Data

This chapter as well just shows code snippets and setting up Amazon Redshift/Snowflake and using it as a data warehouse.


Chapter 6 - Transforming Data

Noncontextual transformations

Deduping records in a table

There are a number of reasons it happens:

  • an incremental data ingestion mistakenly overlaps a previous ingestion time window and picks up some records that were already ingested in a previous run
  • duplicate records were inadvertently created in a source system
  • data that was backfilled overlapped with subsequent data loaded into the table during ingestion

We can using group by and having, distinct, or window functions (row_number) to achieve that in SQL

Parsing URLs

  • using urllib3

When to transform? During or after ingestion?

Data transformations without business context can be run either during or after data ingestion from a technical standpoint. However, there are some reasons you should consider running them as part of the ingestion process (the EtLT pattern):

  • the transformation is easiest to do using a language besides SQL
  • the transformation is addressing a data quality issue

Data modelling foundations

  • measures: these are things you want to measure! Examples include a count of customers and a dollar value of revenue
  • attributes: these are things by which you want to filter or group in a report or dashboard. Examples include dates, customer names, and countries
  • granularity: the level of detail stored in a data model
  • source tables: tables that were loaded into the data warehouse or data lake via a data ingestion
  • slowly changing dimensions (I recommend checking out Zach Wilson’s lectures for this! They are awesome!

Learning Spark, 2nd Edition - Chapter 8 - Structured Streaming

Evolution of the Apache Spark stream processing engine

The advent of micro-batch stream processing

Spark introduced Spark Streaming (DStreams) which introduced the idea of micro-batch stream processing, where the streaming computation is modeled as a continuous series of small, map/reduce-style batch processing jobs (hence, ‘micro-batches’) on small chunks of the stream data

image

Spark Streaming divides the data from the input stream into, say, 1-second micro-batches. Each batch is processed in the Spark cluster in a distributed manner with small deterministic tasks that generate the output in micro-batches. Breaking down the streaming computation into these small tasks gives us two advantages over the traditional, continuous-operator model:

  • Spark’s agile task scheduling can very quickly and efficiently recover from failures and straggler executors by rescheduling one or more copies of the tasks on any of the other executors
  • the deterministic nature of the tasks ensures that the output data is the same no matter how many times the task is reexecuted. This crucial characteristic enables Spark Streaming to provide end-to-end exactly-once processing guarantees, that is, the generated output results will be such that every input record was processed exactly once

Lessons learning from DStreams

Despite all the advantages, the DStream API was not without its flaws

  • lack of a single API for batch and stream processing
  • lack of separation between logical and physical plans
  • lack of native support for event-time windows

Structured streaming

Based on these lessons from DStreams, Structured Streaming was designed from scratch with one core philosophy—for developers, writing stream processing pipelines should be as easy as writing batch pipelines. In a nutshell, the guiding principles of Structured Streaming are:

  • a single, unified programming model and interface for batch and stream processing
  • a broader definition of stream processing

The programming model of Structured streaming

Structured streaming views a table as a stream as an unbounded, continuously appended table

image

Every new record received in the data stream is like a new row being appended to the unbounded input table. Structured Streaming will not actually retain all the input, but the output produced by Structured Streaming until time T will be equivalent to having all of the input until T in a static, bounded table and running a batch job on the table.

image

The last part of the model is the output mode. There are 3:

  • append mode: only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only in queries where existing rows in the result table cannot change (e.g. a map on an input stream)
  • update mode: only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table
  • complete mode: the entire updated result table will be written to external storage

The fundamentals of a structured streaming query

5 steps to define a streaming query

  1. define input sources
  2. transform data (stateless vs stateful transformations)
  3. define output sink and output mode
  4. specify processing details (triggering details, checkpoint location)
  5. start the query

Full example

# In Python
from pyspark.sql.functions import *
spark = SparkSession...
lines = (spark
  .readStream.format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load())

words = lines.select(explode(split(col("value"), "\\s")).alias("word"))
counts = words.groupBy("word").count()
checkpointDir = "..."
streamingQuery = (counts
  .writeStream
  .format("console")
  .outputMode("complete")
  .trigger(processingTime="1 second")
  .option("checkpointLocation", checkpointDir)
  .start())
streamingQuery.awaitTermination()

Under the hood of an active streaming query

  1. Spark SQL analyzes and optimizes this logical plan to ensure that it can be executed incrementally and efficiently on streaming data

  2. Spark SQL starts a background thread that continuously executes the following loop:

  • based on the configured trigger interval, the thread checks the streaming sources for the availability of new data
  • if available, the new data is executed by running a micro-batch. From the optimized logical plan, an optimized Spark execution plan is generated that reads the new data from the source, incrementally computes the updated result, and writes the output to the sink according to the configured output mode
  • for every micro-batch, the exact range of data processed (e.g., the set of files or the range of Apache Kafka offsets) and any associated state are saved in the configured checkpoint location so that the query can deterministically reprocess the exact range if needed
  1. This loop continues until the query is terminated, which can occur for one of the following reasons:
  • a failure has occurred in the query (either a processing error or a failure in the cluster)
  • the query is explicitly stopped using streamingQuery.stop()
  • if the trigger is set to Once, then the query will stop on its own after executing a single micro-batch containing all the available data

image

A key point to remember about Structured Streaming is that underneath it is using Spark SQL to execute the data. As such, the full power of Spark SQL’s hyperoptimized execution engine is utilized to maximize the stream processing throughput, providing key performance advantages.

Recovering from failures with exactly-once guarantees

To restart a terminated query in a new process, create a new Spark session, redefine DataFrames, and use the same checkpoint location when starting the query. This ensures the query resumes from where it left off, leveraging checkpoint data like Kafka offsets to process records precisely and maintain fault tolerance.

Key points for ensuring exactly-once guarantees:

  1. replayable streaming sources: the source must support rereading the incomplete micro-batch range
  2. deterministic computations: transformations must yield consistent results with the same input
  3. idempotent sinks: the sink must handle duplicate micro-batches to prevent redundant writes

Allowed modifications during query restarts:

  • transformations: minor changes like adding filters to handle corrupted data
  • source and sink options: modifications depend on specific source/sink semantics (e.g. changing console sink options is allowed; changing socket host/port is not)
  • processing details: the checkpoint location must remain unchanged, but settings like trigger intervals can be updated

Monitoring an active query

We can use a StreamingQuery instance to get current metrics and status. We can also create listeners to publish stats to a location of choice by setting spark.sql.streaming.metricsEnabled to true.

Streaming data sources and sinks

Reading from files

  • all the files must be of the same format and are expected to have the same schema
  • each file must appear in the directory listing atomically—that is, the whole file must be available at once for reading, and once it is available, the file cannot be updated or modified
  • when there are multiple new files to process but it can only pick some of them in the next micro-batch (e.g., because of rate limits), it will select the files with the earliest timestamps. Within the micro-batch, however, there is no predefined order of reading of the selected files; all of them will be read in parallel

Writing to files

  • Structured Streaming achieves end-to-end exactly-once guarantees when writing to files by maintaining a log of the data files that have been written to the directory
  • if you change the schema of the result DataFrame between restarts, then the output directory will have data in multiple schemas. These schemas have to be reconciled when querying the directory

Kafka

We can read/write form/to Kafka topics

Custom sources and sinks

We can use foreachBatch() and foreach() methods to implement custom logic to write to our storage.

Data transformations

Stateless

All projection operations (e.g. select(), explode(), map(), flatMap()) and selection operations (e.g., filter(), where()) process each input record individually without needing any information from previous rows. This lack of dependence on prior input data makes them stateless operations.

Stateful

The simplest example of a stateful transformation is DataFrame.groupBy().count(), which generates a running count of the number of records received since the beginning of the query. In every micro-batch, the incremental plan adds the count of new records to the previous count generated by the previous micro-batch. This partial count communicated between plans is the state. This state is maintained in the memory of the Spark executors and is checkpointed to the configured location in order to tolerate failures.

image

Each micro-batch reads a new set of words, shuffles them within the executors to group them, computes the counts within the micro-batch, and finally adds them to the running counts to produce the new counts. These new counts are both the output and the state for the next micro-batch, and hence they are cached in the memory of the executors. The next micro-batch of data is grouped between executors in exactly the same way as before, so that each word is always processed by the same executor, and can therefore locally read and update its running count.

For all stateful operations, Structured Streaming ensures the correctness of the operation by automatically saving and restoring the state in a distributed manner. Depending on the stateful operation, all you may have to do is tune the state cleanup policy such that old keys and values can be automatically dropped from the cached state

There are two types of stateful operations:

  • managed stateful operations: these automatically identify and clean up old state, based on an operation-specific definition of “old.” You can tune what is defined as old in order to control the resource usage (e.g., executor memory used to store state). The operations that fall into this category are those for streaming aggregations, stream–stream joins, streaming deduplication

  • mnmanaged stateful operations: these operations let you define your own custom state cleanup logic. The operations in this category are MapGroupsWithState and FlatMapGroupsWithState

Stateful streaming aggregations

  • aggregations not based on time (global vs grouped)
  • aggregations with event-time windows (we can wait for late date with watermarks and a watermark delay which defines how long the engine will wait for late data to arrive)

Streaming joins

Stream-Static joins

  • stream–static joins are stateless operations, and therefore do not require any kind of watermarking
  • the static DataFrame is read repeatedly while joining with the streaming data of every micro-batch, so you can cache the static DataFrame to speed up the reads
  • if the underlying data in the data source on which the static DataFrame was defined changes, whether those changes are seen by the streaming query depends on the specific behavior of the data source. For example, if the static DataFrame was defined on files, then changes to those files (e.g., appends) will not be picked up until the streaming query is restarted

Stream-Stream joins

Inner

  • combine matching events from two streams based on a condition (e.g. adId)
  • buffers unmatched events, optional watermarking limits state size and ensures cleanup

Outer

  • includes all events, using NULLs for unmatched ones
  • requires watermarking to limit state and ensure correct results
  • introduces delay to confirm no future matches

Performance tuning

Structured Streaming in Spark requires tuning strategies distinct from batch processing due to smaller data volumes and continuous operation.

  1. Cluster resource provisioning:
    • allocate resources to balance performance and cost
    • stateless queries require more cores; stateful queries need more memory
  2. Shuffle partition configuration:
    • reduce shuffle partitions compared to batch jobs for lower overhead and higher throughput
    • for stateful operations with frequent triggers, limit partitions to 2–3 times the allocated cores
  3. Source rate limits:
    • prevent instability during data surges by setting rate limits for supported sources
    • avoid setting limits too low to prevent resource underutilization
    • limits can’t mitigate sustained data rate increases, leading to growing latencies
  4. managing multiple queries:
    • running multiple queries in the same SparkContext may bottleneck task scheduling or exceed driver memory limits
    • use separate scheduler pools (spark.scheduler.pool) for fair resource allocation between queries

Streamed

I covered the notebooks from chapter 9 (Spark with Delta Lake) and ch. 10 (Spark ML) on stream and they were very interesting. Spark’s ML library has depth for sure.

From the notebooks I also got these 3 extra resources I need to read:

After streamed I checked the chapter 11 notebooks and this caught my attention - distributed training + inference

XGBoost can be included directly in a PySpark Pipeline even though its not part of the library

from sparkdl.xgboost import XgboostRegressor
from pyspark.ml import Pipeline

params = {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 4, "random_state": 42, "missing": 0}

xgboost = XgboostRegressor(**params)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, xgboost])
pipeline_model = pipeline.fit(trainDF)

Distributed Inference with mapInPandas

import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

with mlflow.start_run(run_name="sklearn-rf-model") as run:
  # Import the data
  spark_df = spark.read.csv("dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-numeric.csv", header=True, inferSchema=True).drop("zipcode")
  df = spark_df.toPandas()
  X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)

  # Create model, train it, and create predictions
  rf = RandomForestRegressor(n_estimators=100, max_depth=10)
  rf.fit(X_train, y_train)
  predictions = rf.predict(X_test)

  # Log model
  mlflow.sklearn.log_model(rf, "random-forest-model")

  # Log params
  mlflow.log_param("n_estimators", 100)
  mlflow.log_param("max_depth", 10)

  # Log metrics
  mlflow.log_metric("mse", mean_squared_error(y_test, predictions))
  mlflow.log_metric("mae", mean_absolute_error(y_test, predictions))  
  mlflow.log_metric("r2", r2_score(y_test, predictions))
# Create Spark DataFrame and apply model to Spark DataFrame.
sparkDF = spark.createDataFrame(X_train)
# Apply the model in parallel with mapInPandas
def predict(iterator):
  model_path = f"runs:/{run.info.run_uuid}/random-forest-model" # Load model
  model = mlflow.sklearn.load_model(model_path)
  for features in iterator:
    yield pd.concat([features, pd.Series(model.predict(features), name="prediction")], axis=1)
    
display(sparkDF.mapInPandas(predict, """`host_total_listings_count` DOUBLE,`neighbourhood_cleansed` BIGINT,`latitude` DOUBLE,`longitude` DOUBLE,`property_type` BIGINT,`room_type` BIGINT,`accommodates` DOUBLE,`bathrooms` DOUBLE,`bedrooms` DOUBLE,`beds` DOUBLE,`bed_type` BIGINT,`minimum_nights` DOUBLE,`number_of_reviews` DOUBLE,`review_scores_rating` DOUBLE,`review_scores_accuracy` DOUBLE,`review_scores_cleanliness` DOUBLE,`review_scores_checkin` DOUBLE,`review_scores_communication` DOUBLE,`review_scores_location` DOUBLE,`review_scores_value` DOUBLE, prediction double"""))

# Define a function that takes all the data for a given device, train a model, saves it as a nested run, and returns a DataFrame
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

def train_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
  """
  Trains an sklearn model on grouped instances
  """
  # Pull metadata
  device_id = df_pandas["device_id"].iloc[0]
  n_used = df_pandas.shape[0]
  run_id = df_pandas["run_id"].iloc[0] # Pulls run ID to do a nested run
  
  # Train the model
  X = df_pandas[["feature_1", "feature_2", "feature_3"]]
  y = df_pandas["label"]
  rf = RandomForestRegressor()
  rf.fit(X, y)

  # Evaluate the model
  predictions = rf.predict(X)
  mse = mean_squared_error(y, predictions) # Note we could add a train/test split
 
  # Resume the top-level training
  with mlflow.start_run(run_id=run_id):
    # Create a nested run for the specific device
    with mlflow.start_run(run_name=str(device_id), nested=True) as run:
      mlflow.sklearn.log_model(rf, str(device_id))
      mlflow.log_metric("mse", mse)
      
      artifact_uri = f"runs:/{run.info.run_id}/{device_id}"
      # Create a return pandas DataFrame that matches the schema above
      returnDF = pd.DataFrame([[device_id, n_used, artifact_uri, mse]], 
        columns=["device_id", "n_used", "model_path", "mse"])

  return returnDF 
# Use applyInPandas to grouped data
with mlflow.start_run(run_name="Training session for all devices") as run:
  run_id = run.info.run_uuid
  
  modelDirectoriesDF = (df
    .withColumn("run_id", f.lit(run_id)) # Add run_id
    .groupby("device_id")
    .applyInPandas(train_model, schema=trainReturnSchema)
  )
  
combinedDF = (df
  .join(modelDirectoriesDF, on="device_id", how="left")
)

display(combinedDF)

Joblib to parallelize the evaluation of sklearn models

from sklearn.utils import parallel_backend
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
import pandas as pd
from joblibspark import register_spark

register_spark() # register spark backend

spark_df = spark.read.csv("dbfs:/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-numeric.csv", header=True, inferSchema=True).drop("zipcode")
df = spark_df.toPandas()
X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)

rf = RandomForestRegressor(random_state=42)
param_grid = {"max_depth": [2, 5, 10], "n_estimators": [20, 50, 100]}
gscv = GridSearchCV(rf, param_grid, cv=3)

with parallel_backend("spark", n_jobs=3):
  gscv.fit(X_train, y_train)
  
# Uses R2 to score the models
print(gscv.cv_results_)
print(gscv.best_estimator_)

Tomorrow I will cover chapter 12 - related to Spark 3 (one of the earlier versions) and will see what else ~


That is all for today!

See you tomorrow :)