Hello :) Today is Day 290!
A quick summary of today:
- finished the financial data engineering book
- learned about a new feature selection method
Financial DE Chapter 11 - Financial data workflows
Workflow-oriented software architectures (WOSA)
WOSA help manage the growing complexity of digital systems by organizing software transactions into structured workflows. Common in financial markets, WOSA structures processes like financial trades and credit card payments into defined steps. This enhances modularity, efficiency, and manageability in software systems, particularly for data workflows.
What is a data workflow?
A data workflow is a repeatable process involving a structured sequence of steps that transform an initial dataset into a desired output. In data engineering, workflow abstraction helps generalize workflows by focusing on their conceptual framework, not technological specifics. The dataflow paradigm, often represented as a Directed Acyclic Graph (DAG), organizes computations into a directed graph, where each task depends on others. Tasks should be atomic, idempotent, and appropriately sized to ensure efficiency, traceability, and debuggability, while aligning with business logic
Workflow management systems (WMS)
Once a workflow abstraction has been defined, the next step is to design a system that provides the infrastructure and tools necessary for building, orchestrating, and monitoring a WOSA
When developing a WMS some properties need to be considered:
- flexibility: a flexible WMS is capable of managing a diverse range of tasks and workflows
- configurability: enables users to define workflow specifications according to their needs
- dependency management: simple WMSs allow users only to be able to define static and linear workflows; more advanced WMSs may allow for parallel workflow/task execution, dynamic and conditional task creation, and information passing between tasks
- coordination patterns: two notable patterns are the Synchronous Blocking Pattern (SBP) and the Asynchronous Non-Blocking Pattern (ANBP)
- scalability: assessed based on the WMS’ capacity to handle concurrent executions of workflows and tasks
- integration with other tools
Types of financial data workflows
ETL workflows
Stream processing workflows
Lambda vs Kappa
Lambda:
Kappa:
Microservice workflows
Microservice-oriented online store order processing system:
Microservice workflows coordinate multiple microservices, which are independently deployable, to follow specific business logic. Designing these workflows requires balancing technical factors like cohesion and low coupling with business logic formalization, often using Domain-Driven Design (DDD). Microservice workflows can be managed via the saga pattern, using either choreography (direct communication between microservices) or orchestration (centralized control via an orchestrator). This approach is widely adopted in FinTech and traditional financial institutions for greater agility, allowing rapid development and deployment of new services.
There are two types of saga patterns:
ML workflows
The challenges involved in building reliable and high-performance ML models largely revolve around effective data management and ensuring data quality.
Chapter 12 - Hands-on projects
All of the code from the book is here
Project 1: Designing a Bank Account Management System Database with PostgreSQL
Account management is a core functionality for banks, allowing them to handle and oversee customer accounts, balances, and transactions. And this 1st project is for designing and implementing a relational db system for managing back accounts
Conceptual model: understanding business requirements
Operational and business models vary from one banking institution to another, and not all banks offer the same products and services. As such, a simple and generic bank account management system will be developed.
We’ll assume that stakeholders have defined their requirements and that these have been formalized them into entities, relationships, and constraints.
Entities
Our bank account management system needs to store data for seven types of entities: accounts, customers, loans, transactions, branches, employees, and cards. Each of these needs to store detailed information for each case
Relationships
The business team has requested that the following relationships be established between entities:
- each account should be linked to a customer
- each loand should be linked to a customer
- each loan payment should be linked to a transaction and an account
- each employee should be affiliated with a branch
- each card should be associated with both a customer and an account
Constraints
- account balances must never go below a customer-specific minimum amount
- all entity records (i.e. accounts, loans, transactions) must be identified with a unique ID
- data redundancy should be minimised
- null values are not permitted for certain fields
- specific fields, such as email addresses, must be unique across records
Logical model: entity relationship diagram
This stage focuses on selecting the data storage model most suitable for our system
After a thorough evaluation, the financial data engineering team has concluded that the relational model is the best fit, because this model effectively organises various entities into distinct tables and ensures data integrity through db constraints. Also the relational model supports the implementation of a normalised data structure which is essential for our system.
To implement this, we need to create the Entity Relationship Diagram (ERD) for our logical model. An ERD is a visual representation used to model the structure of a database.
Physical model: data definition and manipulation language
It’s time to choose a relational database technology and write the queries to create and populate our tables. Let’s assume the financial data engineering team has selected PostgreSQL as the database management system due to its high reliability and strong adherence to SQL standards.
Next is the coding part, I will share some of the code as I will follow along the book, but everything is in the github repo above.
Structure:
In the docker-compose.yaml
there are two services: postgres and pgadmin
The rest is just creating the tables from the created schema and inserting data into those tables.
Project 2: Designing a Financial Data ETL Workflow with Mage and Python
(Mage getting another shout out in a new book; nice)
Workflow definition
Data retrieval
- fetch adjusted intraday time series history for the past month using an API endpoint
- store the raw data in the db, recording the ingestion timestamp
- data aggregation
- deduplication
- data export
Database design
Two tables will be needed: one for the raw intraday data retrieved from the API, and another for the transformed daily data product at the end of the workflow
Local practice
The docker-compose.yaml
for this project contains: mage, postgres, and pgadmin
After populating some data in the db, I created the pipeline in Mage:
Project 3: Designing a Microservice Workflow with Netflix Conductor, PostgreSQL, and Python
Workflow definition
The first step in designing a microservice workflow is to define its structure. Here are the microservices that will constitute the OMS system:
- order acknowledgement service: handles the acknowledgment of customer orders. It receives a client order, persists it in the database, and returns an acknowledgment message to the customer
- payment processing service: processes customer payment transactions and returns a message informing the customer about the status of their payment operation
- stock and inventory service: manages the inventory and stock levels of products available for sale. It tracks and checks the quantity of each product in stock, books and updates inventory based on incoming orders, and returns a message to the customer about the stock booking
- shipping service: manages the shipment of orders. This service coordinates with shipping carriers to book a delivery, generate a tracking number, and update the delivery status of orders. It returns a message informing the client about their upcoming delivery
- notification service: sends notifications to customers at various stages of the order fulfillment process
Database design
Local testing
The docker compose here consists of: conductor, pgadmin, postgres, and jupyterlab
This project is a bit more detailed so I will share some of the code. And also because this is the 1st time I am trying out Netflix’s conductor
First, create tables: python3 database/create_tables.py
Second, populate them: python3 database/populate_tables.py
Third, create and register the microservice workflow. It is prepared for me and each service is in its own folder with a service and a worker.py
This is the delivery_scheduling_service
# service.py
import psycopg2
import os
import string
import random
from datetime import datetime, timedelta
def generate_tracking_number():
return ''.join(random.choices(string.digits, k = 10))
def generate_delivery_date():
# Get the current date
current_date = datetime.now()
# Calculate the date after 3 days (let's assume delivery takes places after 3 days)
date_after_10_days = current_date + timedelta(days=3)
# Format the date as YYYY-MM-DD
formatted_date = date_after_10_days.strftime('%Y-%m-%d')
return formatted_date
def schedule_delivery(order_id: int):
# Connection parameters
connection_params = {
'host': os.environ.get('POSTGRES_HOST'),
'port': os.environ.get('POSTGRES_PORT'),
'database': os.environ.get('POSTGRES_DB'),
'user': os.environ.get('POSTGRES_USER'),
'password': os.environ.get('POSTGRES_PASSWORD')
}
try:
# Establish a connection to the PostgreSQL database
with psycopg2.connect(**connection_params) as connection:
# Create a cursor
with connection.cursor() as cursor:
# Schedule delivery
tracking_number = generate_tracking_number()
delivery_date = generate_delivery_date()
cursor.execute(
"UPDATE DeliverySchedule SET delivery_service = 'DHL Express', expected_delivery_date=%s, tracking_number=%s WHERE order_id = %s;",
(delivery_date, tracking_number, order_id)
)
# Commit the transaction
connection.commit()
print("Delivery added to DeliverySchedule table successfully!")
return {'success': True, 'order_id': order_id, 'notification': 'Your order has been shipped and is on its way to the delivery address.'}
except psycopg2.Error as e:
print("Error: Unable to add delivery to DeliverySchedule table:", e)
return {'success': False, 'order_id': order_id, 'notification': 'Your order could not be processed due to an unexpected error. Please contact customer support for assistance.'}
And worker.py
from conductor.client.worker.worker_task import worker_task
from delivery_scheduling_service.service import schedule_delivery as sd
import uuid
@worker_task(task_definition_name='schedule_delivery')
def schedule_delivery(order_id: int) -> dict:
result = sd(order_id=order_id)
return result
Next, I need to register the order workflow python3 workflow/register_order_workflow.py
Here is the register_order_workflow.py
:
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_workflow import order_workflow
def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
workflow = order_workflow(workflow_executor=workflow_executor)
workflow.register(True)
return workflow
def main():
# The app is connected to http://localhost:8080/api by default
try:
api_config = Configuration()
workflow_executor = WorkflowExecutor(configuration=api_config)
# Registering the workflow (Required only when the app is executed the first time)
workflow = register_workflow(workflow_executor)
print(f'Successfully registered workflow {workflow.name} - version {workflow.version}')
except Exception as e:
print(e)
if __name__ == '__main__':
main()
The executed workflow is this one which contains all the workers which call their respective service:
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_acknowledgement_service.worker import acknowledge_order
from notification_service.worker import notify_customer
from payment_processing_service.worker import process_payment
from stock_and_inventory_service.worker import update_inventory
from delivery_scheduling_service.worker import schedule_delivery
def order_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
workflow = ConductorWorkflow(
name='Order Workflow',
executor=workflow_executor,
version=2,
description='Executes the services required to process a customer order.'
)
# Acknowledge Order task
acknowledge_order_task = acknowledge_order(
task_ref_name='acknowledge_order_ref',
customer_id=workflow.input('customer_id'),
products=workflow.input('products'),
payment_amount=workflow.input('payment_amount'),
delivery_address=workflow.input('delivery_address')
)
# Notify the customer of the acknowledgment task
notify_acknowledgement_task = notify_customer(
task_ref_name='notify_acknowledgement_ref',
order_id=acknowledge_order_task.output('order_id'),
notification_text=acknowledge_order_task.output('notification')
)
# Process payment task
process_payment_task = process_payment(
task_ref_name='process_payment_ref',
order_id=acknowledge_order_task.output('order_id')
)
# Notify customer of payment task
notify_payment_task = notify_customer(
task_ref_name='notify_payment_ref',
order_id=acknowledge_order_task.output('order_id'),
notification_text=process_payment_task.output('notification')
)
# Update inventory
update_inventory_task = update_inventory(
task_ref_name='update_inventory_ref',
order_id=acknowledge_order_task.output('order_id')
)
# Notify customer of inventory booking
notify_inventory_task = notify_customer(
task_ref_name='notify_inventory_ref',
order_id=acknowledge_order_task.output('order_id'),
notification_text=update_inventory_task.output('notification')
)
# Schedule delivery
schedule_delivery_task = schedule_delivery(
task_ref_name='schedule_delivery_ref',
order_id=acknowledge_order_task.output('order_id'),
)
# Notify customer of delivery schedule
notify_delivery_task = notify_customer(
task_ref_name='notify_delivery_ref',
order_id=acknowledge_order_task.output('order_id'),
notification_text=schedule_delivery_task.output('notification')
)
# define the flow structure
workflow >> acknowledge_order_task >> \
notify_acknowledgement_task >> \
process_payment_task >> \
notify_payment_task >> \
update_inventory_task >> \
notify_inventory_task >> \
schedule_delivery_task >> \
notify_delivery_task
return workflow
Once I executed the bash command to register the workflow, I can see it in the UI:
Clicking on it takes me to:
To execute the workflow with python (python3 workflow/execute_order_workflow.py
), and the file looks like:
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_workflow import order_workflow
def main():
# The app is connected to http://localhost:8080/api by default
api_config = Configuration()
workflow_executor = WorkflowExecutor(configuration=api_config)
# Starting the worker polling mechanism
task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()
workflow_run = workflow_executor.execute(name='Order Workflow',
version=2,
workflow_input={
'customer_id': 1,
'products': {1: 1, 2: 3},
'payment_amount': 2000,
'delivery_address': '456 Broadway, New York, NY 10013, United States'},
)
task_handler.stop_processes()
if __name__ == '__main__':
main()
The terminal shows these logs:
In conductor, I can see the run in the Executions tab:
And view all kinds of information for the run:
Project 4: Designing a Financial Reference Data Store with OpenFIGI, PermID, and GLEIF APIs
This project was just downloading financial identifiers from the above sources and saving them to the database.
For instance - download and export BIC-to-LEI mapping data:
api_url = "https://mapping.gleif.org/api/v2/bic-lei/165fb003-5254-498f-a3cd-6e7dfccb8075/download"
bic_to_lei_dataframe = download_and_extract_zip(api_url)
bic_to_lei_dataframe.head()
And then insert this into postgres.
Overall
Great book! Definitely in my ‘hall of fame’
Stream
- read the overall basics of skrub
- experimented with shap_select
That is all for today!
See you tomorrow :)