(Day 290) Finishing the book - Financial Data Engineering

Ivan Ivanov · October 17, 2024

Hello :) Today is Day 290!

A quick summary of today:

  • finished the financial data engineering book
  • learned about a new feature selection method

Financial DE Chapter 11 - Financial data workflows

Workflow-oriented software architectures (WOSA)

WOSA help manage the growing complexity of digital systems by organizing software transactions into structured workflows. Common in financial markets, WOSA structures processes like financial trades and credit card payments into defined steps. This enhances modularity, efficiency, and manageability in software systems, particularly for data workflows.

What is a data workflow?

A data workflow is a repeatable process involving a structured sequence of steps that transform an initial dataset into a desired output. In data engineering, workflow abstraction helps generalize workflows by focusing on their conceptual framework, not technological specifics. The dataflow paradigm, often represented as a Directed Acyclic Graph (DAG), organizes computations into a directed graph, where each task depends on others. Tasks should be atomic, idempotent, and appropriately sized to ensure efficiency, traceability, and debuggability, while aligning with business logic

image

Workflow management systems (WMS)

Once a workflow abstraction has been defined, the next step is to design a system that provides the infrastructure and tools necessary for building, orchestrating, and monitoring a WOSA

When developing a WMS some properties need to be considered:

  • flexibility: a flexible WMS is capable of managing a diverse range of tasks and workflows
  • configurability: enables users to define workflow specifications according to their needs
  • dependency management: simple WMSs allow users only to be able to define static and linear workflows; more advanced WMSs may allow for parallel workflow/task execution, dynamic and conditional task creation, and information passing between tasks

image

  • coordination patterns: two notable patterns are the Synchronous Blocking Pattern (SBP) and the Asynchronous Non-Blocking Pattern (ANBP)

image

  • scalability: assessed based on the WMS’ capacity to handle concurrent executions of workflows and tasks
  • integration with other tools

Types of financial data workflows

ETL workflows

image

Stream processing workflows

image

Lambda vs Kappa

Lambda:

image

Kappa:

image

Microservice workflows

Microservice-oriented online store order processing system:

image

Microservice workflows coordinate multiple microservices, which are independently deployable, to follow specific business logic. Designing these workflows requires balancing technical factors like cohesion and low coupling with business logic formalization, often using Domain-Driven Design (DDD). Microservice workflows can be managed via the saga pattern, using either choreography (direct communication between microservices) or orchestration (centralized control via an orchestrator). This approach is widely adopted in FinTech and traditional financial institutions for greater agility, allowing rapid development and deployment of new services.

There are two types of saga patterns:

image

ML workflows

image

The challenges involved in building reliable and high-performance ML models largely revolve around effective data management and ensuring data quality.

Chapter 12 - Hands-on projects

All of the code from the book is here

Project 1: Designing a Bank Account Management System Database with PostgreSQL

Account management is a core functionality for banks, allowing them to handle and oversee customer accounts, balances, and transactions. And this 1st project is for designing and implementing a relational db system for managing back accounts

Conceptual model: understanding business requirements

Operational and business models vary from one banking institution to another, and not all banks offer the same products and services. As such, a simple and generic bank account management system will be developed.

We’ll assume that stakeholders have defined their requirements and that these have been formalized them into entities, relationships, and constraints.

Entities

Our bank account management system needs to store data for seven types of entities: accounts, customers, loans, transactions, branches, employees, and cards. Each of these needs to store detailed information for each case

Relationships

The business team has requested that the following relationships be established between entities:

  • each account should be linked to a customer
  • each loand should be linked to a customer
  • each loan payment should be linked to a transaction and an account
  • each employee should be affiliated with a branch
  • each card should be associated with both a customer and an account

Constraints

  • account balances must never go below a customer-specific minimum amount
  • all entity records (i.e. accounts, loans, transactions) must be identified with a unique ID
  • data redundancy should be minimised
  • null values are not permitted for certain fields
  • specific fields, such as email addresses, must be unique across records

Logical model: entity relationship diagram

This stage focuses on selecting the data storage model most suitable for our system

After a thorough evaluation, the financial data engineering team has concluded that the relational model is the best fit, because this model effectively organises various entities into distinct tables and ensures data integrity through db constraints. Also the relational model supports the implementation of a normalised data structure which is essential for our system.

To implement this, we need to create the Entity Relationship Diagram (ERD) for our logical model. An ERD is a visual representation used to model the structure of a database.

image

Physical model: data definition and manipulation language

It’s time to choose a relational database technology and write the queries to create and populate our tables. Let’s assume the financial data engineering team has selected PostgreSQL as the database management system due to its high reliability and strong adherence to SQL standards.

Next is the coding part, I will share some of the code as I will follow along the book, but everything is in the github repo above.

Structure:

image

In the docker-compose.yaml there are two services: postgres and pgadmin

The rest is just creating the tables from the created schema and inserting data into those tables.

Project 2: Designing a Financial Data ETL Workflow with Mage and Python

(Mage getting another shout out in a new book; nice)

Workflow definition

Data retrieval

  1. fetch adjusted intraday time series history for the past month using an API endpoint
  2. store the raw data in the db, recording the ingestion timestamp
  3. data aggregation
  4. deduplication
  5. data export

Database design

Two tables will be needed: one for the raw intraday data retrieved from the API, and another for the transformed daily data product at the end of the workflow

image

Local practice

The docker-compose.yaml for this project contains: mage, postgres, and pgadmin

After populating some data in the db, I created the pipeline in Mage:

image

image

Project 3: Designing a Microservice Workflow with Netflix Conductor, PostgreSQL, and Python

Workflow definition

The first step in designing a microservice workflow is to define its structure. Here are the microservices that will constitute the OMS system:

  • order acknowledgement service: handles the acknowledgment of customer orders. It receives a client order, persists it in the database, and returns an acknowledgment message to the customer
  • payment processing service: processes customer payment transactions and returns a message informing the customer about the status of their payment operation
  • stock and inventory service: manages the inventory and stock levels of products available for sale. It tracks and checks the quantity of each product in stock, books and updates inventory based on incoming orders, and returns a message to the customer about the stock booking
  • shipping service: manages the shipment of orders. This service coordinates with shipping carriers to book a delivery, generate a tracking number, and update the delivery status of orders. It returns a message informing the client about their upcoming delivery
  • notification service: sends notifications to customers at various stages of the order fulfillment process

image

Database design

image

Local testing

The docker compose here consists of: conductor, pgadmin, postgres, and jupyterlab

This project is a bit more detailed so I will share some of the code. And also because this is the 1st time I am trying out Netflix’s conductor

First, create tables: python3 database/create_tables.py

Second, populate them: python3 database/populate_tables.py

image

Third, create and register the microservice workflow. It is prepared for me and each service is in its own folder with a service and a worker.py

image

This is the delivery_scheduling_service

# service.py

import psycopg2
import os
import string
import random
from datetime import datetime, timedelta

def generate_tracking_number():
    return ''.join(random.choices(string.digits, k = 10))

def generate_delivery_date():
    # Get the current date
    current_date = datetime.now()
    # Calculate the date after 3 days (let's assume delivery takes places after 3 days)
    date_after_10_days = current_date + timedelta(days=3)
    # Format the date as YYYY-MM-DD
    formatted_date = date_after_10_days.strftime('%Y-%m-%d')
    return formatted_date


def schedule_delivery(order_id: int):
    # Connection parameters
    connection_params = {
        'host': os.environ.get('POSTGRES_HOST'),
        'port': os.environ.get('POSTGRES_PORT'),
        'database': os.environ.get('POSTGRES_DB'),
        'user': os.environ.get('POSTGRES_USER'),
        'password': os.environ.get('POSTGRES_PASSWORD')
    }
    try:
        # Establish a connection to the PostgreSQL database
        with psycopg2.connect(**connection_params) as connection:
            # Create a cursor
            with connection.cursor() as cursor:
                # Schedule delivery
                tracking_number = generate_tracking_number()
                delivery_date = generate_delivery_date()

                cursor.execute(
                    "UPDATE DeliverySchedule SET delivery_service = 'DHL Express', expected_delivery_date=%s, tracking_number=%s WHERE order_id = %s;",
                    (delivery_date, tracking_number, order_id)
                )

                # Commit the transaction
                connection.commit()

        print("Delivery added to DeliverySchedule table successfully!")
        return {'success': True, 'order_id': order_id, 'notification': 'Your order has been shipped and is on its way to the delivery address.'}
    except psycopg2.Error as e:
        print("Error: Unable to add delivery to DeliverySchedule table:", e)
        return {'success': False, 'order_id': order_id, 'notification': 'Your order could not be processed due to an unexpected error. Please contact customer support for assistance.'}

And worker.py

from conductor.client.worker.worker_task import worker_task
from delivery_scheduling_service.service import schedule_delivery as sd
import uuid

@worker_task(task_definition_name='schedule_delivery')
def schedule_delivery(order_id: int) -> dict:
    result = sd(order_id=order_id)
    return result

Next, I need to register the order workflow python3 workflow/register_order_workflow.py

Here is the register_order_workflow.py:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_workflow import order_workflow


def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    workflow = order_workflow(workflow_executor=workflow_executor)
    workflow.register(True)
    return workflow


def main():
    # The app is connected to http://localhost:8080/api by default
    try:
        api_config = Configuration()
    
        workflow_executor = WorkflowExecutor(configuration=api_config)
    
        # Registering the workflow (Required only when the app is executed the first time)
        workflow = register_workflow(workflow_executor)
    
        print(f'Successfully registered workflow {workflow.name} - version {workflow.version}')
    except Exception as e:
        print(e)

if __name__ == '__main__':
    main()

The executed workflow is this one which contains all the workers which call their respective service:

from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_acknowledgement_service.worker import acknowledge_order
from notification_service.worker import notify_customer
from payment_processing_service.worker import process_payment
from stock_and_inventory_service.worker import update_inventory
from delivery_scheduling_service.worker import schedule_delivery

def order_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    workflow = ConductorWorkflow(
        name='Order Workflow',
        executor=workflow_executor,
        version=2,
        description='Executes the services required to process a customer order.'
    )

    # Acknowledge Order task
    acknowledge_order_task = acknowledge_order(
        task_ref_name='acknowledge_order_ref',
        customer_id=workflow.input('customer_id'),
        products=workflow.input('products'),
        payment_amount=workflow.input('payment_amount'),
        delivery_address=workflow.input('delivery_address')
    )

    # Notify the customer of the acknowledgment task
    notify_acknowledgement_task = notify_customer(
        task_ref_name='notify_acknowledgement_ref',
        order_id=acknowledge_order_task.output('order_id'),
        notification_text=acknowledge_order_task.output('notification')
    )

    # Process payment task
    process_payment_task = process_payment(
        task_ref_name='process_payment_ref',
        order_id=acknowledge_order_task.output('order_id')
    )

    # Notify customer of payment task
    notify_payment_task = notify_customer(
        task_ref_name='notify_payment_ref',
        order_id=acknowledge_order_task.output('order_id'),
        notification_text=process_payment_task.output('notification')
    )

    # Update inventory
    update_inventory_task = update_inventory(
        task_ref_name='update_inventory_ref',
        order_id=acknowledge_order_task.output('order_id')
    )

    # Notify customer of inventory booking
    notify_inventory_task = notify_customer(
        task_ref_name='notify_inventory_ref',
        order_id=acknowledge_order_task.output('order_id'),
        notification_text=update_inventory_task.output('notification')
    )

    # Schedule delivery
    schedule_delivery_task = schedule_delivery(
        task_ref_name='schedule_delivery_ref',
        order_id=acknowledge_order_task.output('order_id'),
    )

    # Notify customer of delivery schedule
    notify_delivery_task = notify_customer(
        task_ref_name='notify_delivery_ref',
        order_id=acknowledge_order_task.output('order_id'),
        notification_text=schedule_delivery_task.output('notification')
    )

    # define the flow structure
    workflow >> acknowledge_order_task >> \
        notify_acknowledgement_task >> \
        process_payment_task >> \
        notify_payment_task >> \
        update_inventory_task >> \
        notify_inventory_task >> \
        schedule_delivery_task >> \
        notify_delivery_task

    return workflow

Once I executed the bash command to register the workflow, I can see it in the UI:

image

Clicking on it takes me to:

image

To execute the workflow with python (python3 workflow/execute_order_workflow.py), and the file looks like:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from order_workflow import order_workflow


def main():
    # The app is connected to http://localhost:8080/api by default
    api_config = Configuration()

    workflow_executor = WorkflowExecutor(configuration=api_config)

    # Starting the worker polling mechanism
    task_handler = TaskHandler(configuration=api_config)
    task_handler.start_processes()

    workflow_run = workflow_executor.execute(name='Order Workflow',
                                             version=2,
                                             workflow_input={
                                                 'customer_id': 1,
                                                 'products': {1: 1, 2: 3},
                                                 'payment_amount': 2000,
                                                 'delivery_address': '456 Broadway, New York, NY 10013, United States'},
                                             )

    task_handler.stop_processes()


if __name__ == '__main__':
    main()

The terminal shows these logs:

image

In conductor, I can see the run in the Executions tab:

image

And view all kinds of information for the run:

image

image

Project 4: Designing a Financial Reference Data Store with OpenFIGI, PermID, and GLEIF APIs

This project was just downloading financial identifiers from the above sources and saving them to the database.

For instance - download and export BIC-to-LEI mapping data:

api_url = "https://mapping.gleif.org/api/v2/bic-lei/165fb003-5254-498f-a3cd-6e7dfccb8075/download"
bic_to_lei_dataframe = download_and_extract_zip(api_url)
bic_to_lei_dataframe.head()

image

And then insert this into postgres.

Overall

Great book! Definitely in my ‘hall of fame’

Stream


That is all for today!

See you tomorrow :)