Hello :) Today is Day 299!
A quick summary of today:
- read a bit of Data Analysis with Python and PySpark
- evaluate gpt4o-mini’s finetuning results
Data Analysis with Python and PySpark
I found this book and its part 3 is Get confident: Using machine learning with PySpark, which consists of these 3 chapters:
- 12 Setting the stage: Preparing features for machine learning
- 13 Robust machine learning with ML Pipelines
- 14 Building custom ML transformers and estimators
Chapter 12 - Preparing features for machine learning
Dataset used is about food nutrition with ~20K samples and 680 columns - here
Preparing the data for making features
Because the raw column names vary in their format (i.e. include -, _, *, etc), we first need to clean them:
def sanitize_column_name(name):
"""Drops unwanted characters from the column name.
We replace spaces, dashes and slashes with underscore,
and only keep alphanumeric characters.
"""
answer = name
for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
answer = answer.replace(i, j)
return "".join([char for char in answer if char.isalpha() or char.isdigit() or char == "_"])
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])
Next is identifying which columns need what kind of transformation. For example, identifying binary columns vs multi-categorical columns for encoding.
For binary cols, the author suggests:
import pandas as pd
pd.set_option("display.max_rows", 1000)
is_binary = food.agg(*[(F.size(F.collect_set(x)) == 2).alias(x) for x in food.columns]).toPandas()
is_binary.unstack()
Which returns (I am pasting a truncated version):
title 0 False
rating 0 False
calories 0 False
protein 0 False
fat 0 False
sodium 0 False
cakeweek 0 False
wasteless 0 False
22_minute_meals 0 True
3_ingredient_recipes 0 True
30_days_of_groceries 0 True
Of course, the above might not be able to detect hidden categorical features, so we need to be careful and try to understand the data before moving onto more in-depth feature engineering.
Next, 4 sets of columns are created:
-
identifiers, which are the column(s) that contain the information unique to each record
-
targets, which are the column(s) (most often one) that contain the value we wish to predict
-
continuous columns, containing continuous features
-
binary columns, containing binary features
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
"rating",
"calories",
"protein",
"fat",
"sodium",
]
TARGET_COLUMN = ["dessert"]
BINARY_COLUMNS = [
x
for x in food.columns
if x not in CONTINUOUS_COLUMNS
and x not in TARGET_COLUMN
and x not in IDENTIFIERS
]
Next is deleting rows where either the target is null or majority of the features are null, and also imputing some missing values.
Ignoring the target and the recipe name column:
food = food.dropna(how="all", subset=[x for x in food.columns if x not in IDENTIFIERS])
food = food.dropna(subset=TARGET_COLUMN)
Which results in just 5 records being deleted.
Next, if we want we can impute a value of 0 to any missing values in the binary columns:
food = food.fillna(0.0, subset=BINARY_COLUMNS)
Next is taking care of extreme values among the continuous features
Before this, we need to make sure all numerical cols are read as numerical by pyspark. We can check for non-numerical values using:
from typing import Optional
@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
if not value:
return True
try:
_ = float(value)
except ValueError:
return False
return True
food.where(~is_a_number(F.col("rating"))).select(*CONTINUOUS_COLUMNS).show()
Which returns this rogue row:
+---------+------------+-------+----+------+
| rating| calories|protein| fat|sodium|
+---------+------------+-------+----+------+
| Cucumber| and Lemon "| 3.75|NULL| NULL|
+---------+------------+-------+----+------+
Now that this is fixed, the decided approach to deal with extreme values is to harcode a maximum:
maximum = {
"calories": 3203.0,
"protein": 173.0,
"fat": 207.0,
"sodium": 5661.0,
}
for k, v in maximum.items():
food = food.withColumn(k, F.when(F.isnull(F.col(k)), F.col(k)).otherwise(F.least(F.col(k), F.lit(v))))
Next is weeding out the rare binary occurrence columns
For this model, 10 is chosen to be the threshold. We do not want binary features with less than 10 of 0.0 or 1.0 in the model.
inst_sum_of_binary_columns = [F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS]
sum_of_binary_columns = (food.select(*inst_sum_of_binary_columns).head().asDict())
num_rows = food.count()
too_rare_features = [k for k, v in sum_of_binary_columns.items() if v < 10 or v > (num_rows - 10)]
len(too_rare_features)
print(too_rare_features)
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))
Now, the binary columns are 506 (167 removed)
Feature creation and refinement
Creating custom features
food = food.withColumn(
"protein_ratio", F.col("protein") * 4 / F.col("calories")
).withColumn(
"fat_ratio", F.col("fat") * 9 / F.col("calories")
)
food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]
We need to be careful not to introduce multicollinear features
Removing highly correlated features
First, we need to create a vector of the continuous features:
from pyspark.ml.feature import VectorAssembler
continuous_features = VectorAssembler(
inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features"
)
vector_food = food.select(CONTINUOUS_COLUMNS)
for x in CONTINUOUS_COLUMNS:
vector_food = vector_food.where(~F.isnull(F.col(x)))
vector_variable = continuous_features.transform(vector_food)
vector_variable.select("continuous_features").show(3, False)
+---------------------------------------------------------------------+
|continuous_features |
+---------------------------------------------------------------------+
|[2.5,426.0,30.0,7.0,559.0,0.28169014084507044,0.14788732394366197] |
|[4.375,403.0,18.0,23.0,1439.0,0.17866004962779156,0.5136476426799007]|
|[3.75,165.0,6.0,7.0,165.0,0.14545454545454545,0.38181818181818183] |
+---------------------------------------------------------------------+
Once we have that we can use pyspark’s Correlation.corr()
from pyspark.ml.stat import Correlation
correlation = Correlation.corr(
vector_variable, "continuous_features"
)
correlation.printSchema()
# |-- pearson(continuous_features): matrix (nullable = false)
The Correlation.corr() returns returns a DenseMatrix column type, which is like a two-dimensional vector. In order to extract the values in an easy-to-read format, we have to do a little method juggling:
-
we extract a single record as a list of Row using head()
-
a Row is like an ordered dictionary, so we can access the first (and only) field containing our correlation matrix using list slicing
-
a DenseMatrix can be converted into a pandas-compatible array by using the toArray() method on the matrix
-
we can directly create a pandas DataFrame from our Numpy array. Inputting our column names as an index (in this case, they’ll play the role of ‘row names’) makes the correlation matrix very readable
correlation_array = correlation.head()[0].toArray()
correlation_pd = pd.DataFrame(
correlation_array,
index=CONTINUOUS_COLUMNS,
columns=CONTINUOUS_COLUMNS,
)
print(correlation_pd.iloc[:, :4])
rating calories protein fat
rating 1.000000 0.102257 0.113292 0.111536
calories 0.102257 1.000000 0.757837 0.918052
protein 0.113292 0.757837 1.000000 0.664899
fat 0.111536 0.918052 0.664899 1.000000
sodium 0.065225 0.516818 0.585450 0.421920
protein_ratio 0.094429 0.164735 0.600182 0.125572
fat_ratio 0.129946 0.176823 0.109188 0.424986
print(correlation_pd.iloc[:, 4:])
sodium protein_ratio fat_ratio
rating 0.065225 0.094429 0.129946
calories 0.516818 0.164735 0.176823
protein 0.585450 0.600182 0.109188
fat 0.421920 0.125572 0.424986
sodium 1.000000 0.339067 0.033819
protein_ratio 0.339067 1.000000 0.024854
fat_ratio 0.033819 0.024854 1.000000
Feature preparation with transformers and estimators
Estimators are like a transformers factory. The next parts introduce estimators through the usage of Imputer and MinMaxScaler.
Imputing continuous features using the Imputer estimator
The image shows the Imputer and its companion transformer/model, ImputerModel. When fit() is called on an instantiated estimator, a fully parameterized transformer (called Model) is created.
As an example, we want our Imputer to impute the mean value to every record in the calories, protein, fat, and sodium columns when the record is null
from pyspark.ml.feature import Imputer
OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]
imputer = Imputer(
strategy="mean",
inputCols=OLD_COLS,
outputCols=NEW_COLS,
)
imputer_model = imputer.fit(food)
CONTINUOUS_COLUMNS = (
list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS
)
# Using the imputer:
food_imputed = imputer_model.transform(food)
Scaling our features using the MinMaxScaler estimator
The MinMaxScaler and its companion model, the MinMaxScalerModel—the same model of operation as the Imputer estimator
from pyspark.ml.feature import MinMaxScaler
CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]
continuous_assembler = VectorAssembler(
inputCols=CONTINUOUS_NB, outputCol="continuous"
)
food_features = continuous_assembler.transform(food_imputed)
continuous_scaler = MinMaxScaler(
inputCol="continuous",
outputCol="continuous_scaled",
)
food_features = continuous_scaler.fit(food_features).transform(
food_features
)
Chapter 13 - Robust machine learning with ML Pipelines
Transformers and estimators: The building blocks of ML in Spark
Data comes in, data comes out: The Transformer
The transformer is the first building block of an ML pipeline
In PySpark, transformers use a Params
mechanism for configuration. When initializing transformers like VectorAssembler
, keyword-only arguments define its Params
. To access or modify these Params
, getters and setters are used, which differ from typical Python attributes by returning a Param
object, not the direct value. To get the actual value, the getter method (e.g., getOutputCol()
) is used, while explainParam()
details the Param
, including its name, description, default, and current value.
For changing values, setter methods (like setOutputCol()
) modify Params
in place, while setParams()
allows updating multiple Params
simultaneously. To reset a Param
to default, the clear()
method is used. Transformers are passed by reference, so using copy()
ensures modifications on one instance don’t affect others.
Data comes in, transformer comes out: The Estimator
In PySpark, an estimator is an essential ML pipeline component that, unlike a transformer, uses the fit()
method to train on an input DataFrame and returns a transformer model. This distinction allows an estimator to adjust parameters based on the input data, whereas a transformer relies solely on pre-set parameters to apply transformations. For example, the MinMaxScaler
estimator takes min
and max
(defaulting to 0.0 and 1.0) and the inputCols
and outputCols
parameters. During fit()
, it calculates the actual minimum (E_min
) and maximum (E_max
) values of the input column, scales them to the min-max range, and outputs a configured transformer.
This fit()/transform()
pattern is foundational in PySpark ML, where estimators—including models—adapt to data specifics before transforming. If E_min
equals E_max
, scaling cannot proceed as usual since the range is zero, a scenario to handle as a special case.
Building a (complete) machine learning pipeline
Pipelines build on transformers and estimators to make training, evaluating, and optimizing ML models much clearer and more explicit
The Pipeline estimator has only one Param, called stages, which takes a list of transformers and estimators.
When using pipelines, we need remember that the data frame will travel each stage. For instance, the continuous_scaler stage will have the output of the data frame transformed by continuous_scaler. For estimator stages, the data frame stays identical, as fit() does not transform the data frame but returns a Model instead.
When calling fit() using a data frame, that data frame gets passed as an argument to the first stage. Each estimator stage gets evaluated (transformers are passed verbatim), and the resulting transformers/models form the stages of the PipelineModel.
Assembling the final data set with the vector column type
preml_assembler = MF.VectorAssembler(
inputCols=BINARY_COLUMNS
+ ["continuous_scaled"]
+ ["protein_ratio", "fat_ratio"],
outputCol="features",
)
food_pipeline.setStages(
[imputer, continuous_assembler, continuous_scaler, preml_assembler]
)
food_pipeline_model = food_pipeline.fit(food)
food_features = food_pipeline_model.transform(food)
Our data frame is ready for machine learning! We have a number of records, each with
-
a target (or label) column, dessert, containing a binary input (1.0 if the recipe is a dessert, 0.0 otherwise)
-
a vector of features, called features, containing all the information we want to train our machine learning model with
We provide 513 distinct features with a large number of zeroes. When storing vectors, PySpark has two choices for representing vectors:
-
a dense representation, where a Vector in PySpark is simply a numpy single-dimensional array object
-
a sparse representation, where a Vector in PySpark is an optimized sparse vector compatible with the scipy scipy.sparse matrix
In practice, we don’t decide if a Vector is sparse or dense: PySpark will convert between the two as needed.
Dense: [0.0, 1.0, 4.0, 0.0]
Sparse: (4, [1,2], [1.0, 4.0])
We can further see the schema of our features using:
print(food_features.schema["features"])
# StructField(features,VectorUDT,true)
print(food_features.schema["features"].metadata)
{'ml_attr': {'attrs': {'numeric': [{'idx': 0, 'name': 'ham'}, {'idx': 1, 'name': 'frangelico'}, {'idx': 2, 'name': 'lunch'}, {'idx': 3, 'name': 'cherry'}, {'idx': 4, 'name': 'cookies'}, {'idx': 5, 'name': 'rice'}, {'idx': 6, 'name': 'rutabaga'}, {'idx': 7, 'name': 'kosher_for_passover'}, {'idx': 8, 'name': 'potluck'}, {'idx': 9, 'name': 'fruit'}, {'idx': 10, 'name': 'nut'}, {'idx': 11, 'name': 'lemon_juice'}, {'idx': 12, 'name': 'nutmeg'}, {'idx': 13, 'name': 'simmer'}, {'idx': 14, 'name': 'egg'}, {'idx': 15, 'name': 'meat'}, {'idx': 16, 'name': 'harpercollins'}, {'idx': 17, 'name': 'walnut'}, {'idx': 18, 'name': 'arugula'}, {'idx': 19, 'name': 'caraway'}, {'idx': 20, 'name': 'pear'}, {'idx': 21, 'name': 'missouri'}, {'idx': 22, 'name': 'thanksgiving'},
...
The column schema for an assembled vector will keep track of the features making its composition under the metadata attribute.
For scaled variables, since they originate from a VectorAssembler, PySpark gives them a generic name, but we can track their name from the original vector column (here continuous_assembled) as needed.
Training an ML model using a LogisticRegression classifier
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
featuresCol="features", labelCol="dessert", predictionCol="prediction"
)
food_pipeline.setStages(
[
imputer,
continuous_assembler,
continuous_scaler,
preml_assembler,
lr,
]
)
Before fitting the pipeline, we cache() the training data frame. We do this because ML uses the data frame repeatedly, so caching in memory provides an increase in speed if your cluster has enough memory.
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
WARNING! Although PySpark will use the same seed, which should guarantee that the split will be consistent across runs, there are some cases where PySpark will break that consistency. If you want to be 100% certain about your splits, split your data frame, write each one to disk, and then read them from the disk location.
Evaluating and optimizing our model
Assessing model accuracy: Confusion matrix and evaluator object
results.groupby("dessert").pivot("prediction").count().show()
+-------+----+---+
|dessert| 0.0|1.0|
+-------+----+---+
| 0.0|4944| 87|
| 1.0| 93|991|
+-------+----+---+
In Spark 3.1+ we can use the below for precision and recall:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"Model precision: {metrics.precisionByLabel[1]}")
print(f"Model recall: {metrics.recallByLabel[1]}")
# Model precision: 0.9192949907235621
# Model recall: 0.9142066420664207
Otherwise, we can use the RDD-based API:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabel = results.select("prediction", "dessert").rdd
metrics_rdd = MulticlassMetrics(predictionAndLabel)
print(f"Model precision: {metrics_rdd.precision(1.0)}")
print(f"Model recall: {metrics_rdd.recall(1.0)}")
# Model precision: 0.9192949907235621
# Model recall: 0.9142066420664207
True positives vs. false positives: The ROC curve
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(
labelCol="dessert",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC",
)
accuracy = evaluator.evaluate(results)
print(f"Area under ROC = {accuracy} ")
# Area under ROC = 0.9927985236918562
import matplotlib.pyplot as plt
plt.figure(figsize=(5, 5))
plt.plot([0, 1], [0, 1], "r--")
plt.plot(
lr_model.summary.roc.select("FPR").collect(),
lr_model.summary.roc.select("TPR").collect(),
)
plt.xlabel("False positive rate")
plt.ylabel("True positive rate")
plt.show()
Optimizing hyperparameters with cross-validation
To build the set of hyperparameters we wish to evaluate our model against, we use the ParamGridBuilder, which assists in creating a Param Map. For this, we start with the builder class which can take a series of addGrid() methods taking two parameters:
- the Param of the stage we want to modify. In this case, our LogisticRegression estimator was assigned to the variable lr, so lr.elasticNetParam is the Param in question
- the values we wish to assign the hyperparameter on, passed as a list
Once we are done, we call build(), and a list of Param Maps is returned. Each element of the list is a dictionary (called Map in Scala, hence the name) of the Params that will be passed to our pipeline when fitting. Often, we want to set the hyperparameters for the model estimator, but nothing prevents us from changing the Params from another stage, for instance the preml_assembler, should we want to remove features. We just need to be sure to be consistent if you meddle with inputCol/outputCol to avoid missing column errors.
from pyspark.ml.tuning import ParamGridBuilder
grid_search = (
ParamGridBuilder()
.addGrid(lr.elasticNetParam, [0.0, 1.0])
.build()
)
print(grid_search)
[{Param(parent='LogisticRegression_e87ad8468abf', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}, {Param(parent='LogisticRegression_e87ad8468abf', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0}]
Now onto cross-validation. PySpark provides out-of-the-box K-fold cross-validation through the CrossValidator class
Code-wise, the CrossValidator object combines everything under a single abstraction. To build a cross-validator, we need three elements, all of which we’ve encountered so far:
- an estimator, which contains the model we wish to evaluate (here: food_ pipeline)
- an estimatorParamMaps set, which is the list of Param Maps we created earlier in the section
- an evaluator, which carries the metric we wish to optimize against
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(
estimator=food_pipeline,
estimatorParamMaps=grid_search,
evaluator=evaluator,
numFolds=3,
seed=13,
collectSubModels=True,
)
cv_model = cv.fit(train)
print(cv_model.avgMetrics)
# [0.9895381256961793, 0.9895383283033521]
pipeline_food_model = cv_model.bestModel
Getting the biggest drivers from our model: Extracting the coefficients
import pandas as pd
feature_names = ["(Intercept)"] + [
x["name"]
for x in (
food_features
.schema["features"]
.metadata["ml_attr"]["attrs"]["numeric"]
)
]
feature_coefficients = [lr_model.intercept] + list(
lr_model.coefficients.values
)
coefficients = pd.DataFrame(
feature_coefficients, index=feature_names, columns=["coef"]
)
coefficients["abs_coef"] = coefficients["coef"].abs()
print(coefficients.sort_values(["abs_coef"]))
Sample:
coef abs_coef
hanukkah 0.001985 0.001985
house_and_garden 0.004944 0.004944
pernod -0.015873 0.015873
Writing and Reading ML models
pipeline_food_model.write().overwrite().save("am_I_a_dessert_the_model")
and
from pyspark.ml.pipeline import PipelineModel
loaded_model = PipelineModel.load("am_I_a_dessert_the_model")
Creating our own transformer and estimators
Creating a custom transformer is not hard, but there are a lot of moving parts and a set of conventions to follow to make it consistent with the other transformers provided by PySpark. Our blueprint for this section follows this plan:
- Design our transformer: Params, inputs, and outputs
- Create the Params, inheriting some preconfigured ones as necessary
- Create the necessary getters and setters to get
- Create the initialization function to instantiate our transformer
- Create the transformation function
For Estimators the plan is:
- Outline the design or the estimator, taking into account the resulting model: inputs, outputs, fit(), and transform()
- Create the companion model class as a Model (which is a specialized Transformer) subclass
- Create the estimator as an Estimator subclass
The full code for this example is here
I saw that this book has a video edition so I might watch some of the videos of the other chapters that just go over basic pyspark functions and see if there is anything interest. I will bookmark this part for later ^^
Evaluating gpt4o-mini
A few days ago I mentioned about starting to use company review data and fine-tuning a model for that task. I had fine-tuned gpt4o-mini with the company dataset, however I was yet to evaluate it based on the preset metrics - RougeLSum for text and RMSE for scores. Well today, after some debugging mainly due to data formatting, I got the results:
RougeLSum: 0.8768 (1 is max meaning identical, 0 is the min)
RMSE: 0.5924 (lower is better)
And here is a link to the model on OpenAI’s platform
Also, tomorrow is day 300 which is exciting but I don’t have anything special planned haha
That is all for today!
See you tomorrow :)