`
Hello :) Today is Day 365!
A quick summary of today:
- watching feature store videos by Hopsworks academy
- using the Hopsworks python API and the UI
Learning about Feature Stores w/ Hopsworks Academy
Batch & Real-Time Machine Learning Systems; A better framework with Hopsworks Feature Store
They mention that an ML model trained on static data doesn’t qualify as an ML system - it’s a one-off ML experiment and we can generate one-off preds with that model on that static data. For an ML system we need to have new data coming in and the model will generate more value when it keeps making predictions on more and more data.
This is their model for building ML systems - they call it ‘The Feature, Training, and Inference pipeline’ model.
An ML pipeline consists of these 3 pipelines.
The 3 pipes can be developed separately and is good for system modularity so that even different roles/teams develop/maintain each pipeline (if needed).
Pipelines also need to be orchestrated, and given the modularity we can run them on different platforms depending on what we have/use case
We also need to make sure we test our pipelines. We need to test each part of our pipelines:
And then we can do A/B testing, etc.
When we have a new model, we might have a new set of features that were used to train that model so we need to connect model versions to feature versions. And Hopsworks helps with this.
Hopsworks stitches all these pipes together, and each of their outputs can be stored in Hopsworks as well
Feature Pipelines in Production with Hopsworks: Code, Deployment & Monitoring
Pipelines in prod:
- managing the codebase - like github
- deployment
We can schedule and run jobs from the Hopsworks UI:
We can also see more detailed info about each run:
- monitoring - we can set alerts
What is a feature store for ML ?
They help:
- maintain models
- train models
- improve model
Features are the data for AI, and as such can be regarded as the fuel for AI.
Features are used throughout all the stages of an ML lifecycle and managing them can be a challenge as they need to be properly organised, stored and readily available for models. This is where the feature store comes in. It manages, stores, and provides access to both historical and real-time features.
Who does it help?
Mostly DEs, DSs, MLEs
The feature store helps make more models and productionise them, faster. It’s value comes in its ability to bridge the gap between different stages of the model lifecycle and the many stakeholders involved.
Deploying Managed Hopswork Via the UI or Terraform
This video was just showing how to set up Hopsworks either through the UI - connecting with AWS/GCP/Azure, or using Hopswork’s pre-set Terraform platform.
Real-Time Inference: How to retrieve data from Hopsworks
The Hopsworks API provides us with very low-latency options for retrieving features for real-time inference cases like fraud detection.
Features can be viewed in the Feature View:
Once we click on fraud, we can see:
More detailed info. We can even see the label_encoder
tag which means that that column has had a transformation applied to it (i.e. scikit-leanr’s le)
And we can interact with this using Hopsworks’ Python API.
I will just give a list of the Hopsworks feature discussed as I will do a hopsworks tutorial I found after this ~
- we can create train/test splits alongside extra info like stats and see them through the UI
- creating a feature view: a curated selection of features (and optionally labels) from one or more feature groups within a feature store and serves as a logical representation tailored for specific machine learning models
We can also see lineage:
- external feature groups: allow users to register features stored in external systems (like Snowflake, BigQuery, S3) within Hopsworks, providing a unified platform for data scientists to access all features
- streaming feature pipelines: we can use different streaming platforms (i.e. Spark Streaming, Beam, Flink)
- feature groups: a structured table of features with an online store for fast, real-time access to the latest feature values and an offline store for historical data used in training and batch inference
- versioning: there is schema (metadata for feature views, feature groups - the features included, tags, other metadata) and data versioning (actual data versioning)
Hopsworks Tutorial - Part 01: Feature Pipeline
I created a profile on hopsworks.ai and there was a tutorial button with:
So I cliced on the 1st one and it took me to this colab notebook.
Loaded some data, and logged in Hopsworks with an API key ~
- create a feature group
customer_info_fg = fs.get_or_create_feature_group(
name="customer_info",
version=1,
description="Customer info for churn prediction.",
primary_key=['customerID'],
event_time="datetime",
)
- insert data into the feature group
customer_info_fg.insert(customer_info_df)
I inserted data for all 3 datasets:
Next is Part 2
Hopsworks Feature Store - Part 02: Training Pipeline
Load and select features from feature groups
# Retrieve feature groups
customer_info_fg = fs.get_feature_group(
name="customer_info",
version=1,
)
demography_fg = fs.get_feature_group(
name="customer_demography_info",
version=1,
)
subscriptions_fg = fs.get_feature_group(
name="customer_subscription_info",
version=1,
)
selected_features = customer_info_fg.select_except(["customerid", "datetime"]) \
.join(demography_fg.select_except(["customerid"])) \
.join(subscriptions_fg.select_except(["datetime"]))
Transformation
# Load transformation functions from the feature store
min_max_scaler = fs.get_transformation_function(name="min_max_scaler")
label_encoder = fs.get_transformation_function(name="label_encoder")
# Define lists of numerical and categorical features
numerical_features = ["tenure", "monthlycharges", "totalcharges"]
categorical_features = [
"multiplelines", "internetservice", "onlinesecurity", "onlinebackup",
"deviceprotection", "techsupport", "streamingmovies", "streamingtv",
"phoneservice", "paperlessbilling", "contract", "paymentmethod", "gender",
"dependents", "partner",
]
# Map features to their corresponding transformation functions
transformation_functions = []
# For numerical features, use the min_max_scaler transformation
for feature in numerical_features:
transformation_functions.append(min_max_scaler(feature))
# For categorical features, use the label_encoder transformation
for feature in categorical_features:
transformation_functions.append(label_encoder(feature))
Create a feature view
The Feature Views allows schema in form of a query with filters, define a model target feature/label and additional transformation functions.
feature_view = fs.get_or_create_feature_view(
name = 'churn_feature_view',
version = 1,
labels=["churn"],
transformation_functions=transformation_functions,
query=selected_features,
)
I can see it in the UI
Split data, train a model, check evam metrics
Then we can get the Hopsworks model registry:
mr = project.get_model_registry()
Model schema
The model needs to be set up with a Model Schema, which describes the inputs and outputs for a model.
A Model Schema can be automatically generated from training examples, as shown below.
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
# Create input schema using X_train
input_schema = Schema(X_train)
# Create output schema using y_train
output_schema = Schema(y_train)
# Create a ModelSchema object specifying the input and output schemas
model_schema = ModelSchema(
input_schema=input_schema,
output_schema=output_schema,
)
# Convert the model schema to a dictionary
model_schema.to_dict()
Here is the dict model schema:
{'input_schema': {'columnar_schema': [{'name': 'seniorcitizen',
'type': 'int64'},
{'name': 'label_encoder_contract_', 'type': 'int64'},
{'name': 'label_encoder_dependents_', 'type': 'int64'},
{'name': 'label_encoder_deviceprotection_', 'type': 'int64'},
{'name': 'label_encoder_gender_', 'type': 'int64'},
{'name': 'label_encoder_internetservice_', 'type': 'int64'},
{'name': 'label_encoder_multiplelines_', 'type': 'int64'},
{'name': 'label_encoder_onlinebackup_', 'type': 'int64'},
{'name': 'label_encoder_onlinesecurity_', 'type': 'int64'},
{'name': 'label_encoder_paperlessbilling_', 'type': 'int64'},
{'name': 'label_encoder_partner_', 'type': 'int64'},
{'name': 'label_encoder_paymentmethod_', 'type': 'int64'},
{'name': 'label_encoder_phoneservice_', 'type': 'int64'},
{'name': 'label_encoder_streamingmovies_', 'type': 'int64'},
{'name': 'label_encoder_streamingtv_', 'type': 'int64'},
{'name': 'label_encoder_techsupport_', 'type': 'int64'},
{'name': 'min_max_scaler_monthlycharges_', 'type': 'float64'},
{'name': 'min_max_scaler_tenure_', 'type': 'float64'},
{'name': 'min_max_scaler_totalcharges_', 'type': 'float64'}]},
'output_schema': {'columnar_schema': [{'name': 'churn', 'type': 'int64'}]}}
Then we can save articacts and the model:
model_dir = "churn_model"
if not os.path.isdir(model_dir):
os.mkdir(model_dir)
model.save_model(model_dir + "/model.json")
figure_cm.figure.savefig(model_dir + '/confusion_matrix.png')
# Create a model in the model registry
model = mr.python.create_model(
name="churnmodel",
description="Churn Model",
input_example=X_train.sample(),
model_schema=model_schema,
)
model.save(model_dir)
And I can see it in the UI:
And inside I can see all other info:
Next is Part 3
Fetch and test the model
# Retrieve the model from the model registry
retrieved_model = mr.get_model(
name="churnmodel",
version=1,
)
# Download the saved model files to a local directory
saved_model_dir = retrieved_model.download()
# Initialize the model
model = XGBClassifier()
# Load the model from a saved JSON file
model.load_model(saved_model_dir + "/model.json")
And then we can use it as usual for predictions.
We can get batch data from our feature view:
# Initialize batch scoring
feature_view.init_batch_scoring(1)
# Get the batch data
batch_data = feature_view.get_batch_data()
# Display the first three rows of the batch_data
batch_data.head(3)
We can also load any transformers needed for features (i.e. min-max scaler)
df_all = batch_data.copy()
fv_transformation_functions = feature_view._batch_scoring_server.model_dependent_transformation_functions
for transformation_function in fv_transformation_functions:
udf = transformation_function.hopsworks_udf
if udf.function_name == "min_max_scaler":
transformed_features = udf.transformation_features[0]
transformed_feature_name = udf.output_column_names[0]
stats = udf.transformation_statistics
df_all[transformed_features] = df_all[transformed_feature_name].map(lambda x: x*(stats.feature.max-stats.feature.min)+stats.feature.min)
if udf.function_name == "label_encoder":
transformed_features = udf.transformation_features[0]
transformed_feature_name = udf.output_column_names[0]
stats = udf.transformation_statistics
unique_data = sorted([value for value in stats.feature.unique_values])
index_to_value = {index: value for index, value in enumerate(unique_data)}
df_all[transformed_features] = df_all[transformed_feature_name].map(lambda x: index_to_value[x])
This comes at the end of the notebook, but we can also enable extra stats, correlations and graphs for feature groups:
customer_info_fg = fs.get_feature_group("customer_info", version = 1)
customer_info_fg.statistics_config = {
"enabled": True,
"histograms": True,
"correlations": True,
}
customer_info_fg.update_statistics_config()
customer_info_fg.compute_statistics()
In the UI we can see:
On the Hopsworks repo they have a few more notebook tutorials which is great.
I saw that Zach Wilson posted two new lectures + labs but I forgot my tablet so I will cover them by myself tomorrow as well.
I will do them on my own as I think this is the last study material I will post on the blog 😢
Today’s post is a bit shorter and earlier as I am going out to meet the New Year :)
Tomorrow I will post about my journey, what I learned, thoughts, list of books & courses, summary ~
Happy New Year 🥳
That is all for today!
And for the last time ~ See you tomorrow :)