(Day 362) Learning about Apache Pinot + Data Mesh

Ivan Ivanov · December 28, 2024

Hello :) Today is Day 362!

A quick summary of today:

  • Apache Pinot 101
  • more of Architecting Data and Machine Learning Platforms
  • starting the Data Mesh book

Apache Pinot 101

Zach Wilson did a livestream this morning and mentioned that Pinot is a good OLAP database for real-time data and a concept that is interesting. So I decided to look for a video ~ I found not just a video but a short playlist of videos by Tim Berglung (the nice instructor from Confluent’s Kafka/FLink videos) - here

What is Apache Pinot?

Apache Pinot is a real-time distributed OLAP database, designed to serve OLAP workloads on streaming data with extreme low latency and high concurrency.

The essence of real-time analytics:

  • latency: as low as 10ms
  • concurrency: as many as 100,000 queries per second
  • freshness: seconds from event time till queryable in Pinot

Data model

  • Pinot uses the usual tabular data model
  • table creation and schema definition expressed in JSON
  • queries with SQL

Architecture: Tables and Segments

Tables

  • the basic unit of data storage
  • composed of rows and columns
  • expected to scale to arbitrarily large row counts
  • defined using a schema and tableConfig JSON file
  • three variaties: offline(contains batch data), real-time and hybrid
  • every column is either a metric, dimension or date/time

Segments

  • tables are split into units of storage called segments
  • similar to shards or partitions, but transparent to users
  • for offline tables, segments are created outside of Pinot and pushed into the cluster using a REST API
  • for real-time tables, segments are created automatically from events sourced by the event streaming system (like Kafka)
  • standard utilities support batch ingest from standard file types (avro, json, csv)
  • APIs are available to create segments from Spark, Flink, and Hadoop

Segment structure

  • Pinot is a columnar db
  • all of a segment’s values for a single column are stored contiguously
  • dimension columns are typically dictionary-encoded
  • indexes are stored as a part of the segment
  • segments are immutable once written
  • segments have a configurable retention period

Segment structure example:

image

Zooming in on ip:

image

zooming again

image

It is many many many individual values of the IPs.

Architecture: Servers and Brokers

Servers

  • the computers that host segments and do most query processing
  • they are generally segregated into real-time and offline servers
  • this is an edministrative convention only; servers do not know they are one kind or another
  • real-time servers host real-time tables exclusively, and offline servers host offline tables exclusively - by convention

image

Since segments are immutable it’s easy to store replication is easier since we don’t need to think about which replicas are out-of-date, we just need to make sure we have the copies.

Brokers

  • accept queries from clients and forward them to the appropriate servers
  • have access to metadata that helps them determine which segments are implicated in a given query, and which servers host those segments
  • for single-stage queries, they collect results returned from the servers and consolidate them to return the client
  • for multi-stage queries, they issue a plan to several tiers of servers, which eventually return a single result set to the issuing broker

Deep store

  • the deep store is the permanent storage system for segment files
  • stores a compressed version of segment data without indexes
  • can be located on a POSIX-compatible file system, HDFS, or a cloud blob store
  • does not host backups of Zookeeper metadata

image

Architecture: Cluster Management

Controller

  • the Controller hosts the Apache Helix controller
  • responsible for cluster management (e.g node failure detection and recovery, replication management, segment assignment)
  • maintains global metadata (table schemas and configs)
  • maintains the mapping of segments to servers
  • expose admin APIs for vieweing and changing configs
  • the endpoint for offline segment uploads
  • manage real-time ingestion and segment creation
  • controllers can be configured for redundancy

Zookeeper

  • distributed, fault-tolerant, strongly consistent state store
  • provides persistence for the Controller to store cluster metadata

Multitenancy

  • Pinot supports multitenance natively
  • every broker and server can be assigned a tenant tag
  • a table can be assigned a server tenant tag and a broker tenant tag
  • this allows us to ensure that certain tables are not hosted on the same hardware as others

image

Architecture: Minions

  • refers to a cluster component that runs tasks on a standby basis
  • minions offload computationally intensive work from other cluster components, like servers
  • often involved in segment creation, segment purge, and segment merging
  • we can create our own minion tasks by implementing the PinotTaskExecutor and MinionEventObserver interfaces

image

Zooming in ~

image

Indexes

Once data is ingested into Pinot, we need to read it and reading it is about how the data is indexed - we have different index options in Pinot

image

Batch (offline) ingestion

  • start by creating a schema (name, dimensiton fields, metric fields, datetime), and a table config (pics below)

image

image

We can then ingest the data.

When we run the ingest, the controller kicks off a task and says this minion worker - go do this ingestion task with this csv file and config. This worker creates a segment in its storage, it pushes that segment to the controller which decides on which server to store it and then it is saved on a server.

image

Stream (real-time) ingestion

Usually we read from a Kafka topic and a Pinot table is a consumer. Since it is real-time we might not have enough records to create a segment. Data is first gathered in a consuming segment and after some threshold the records in that consuming segment are flushed into a segment. As soon as a record is written to the consuming segment it’s available for querying.

image

Realtime table config:

image

image

When we create a table, we tell it to the Controller, it then tells a server it has the duty to connect to this Kafka cluster and starts consuming

image

When the server is writing those segment files it is asking the Controller where they should go.

Multi-stage query engine

  • Pinot’s architecture supports low-latency queries on batch and streaming data at high concurrency
  • it is optimised for queries that filter and aggregate
  • query processing occurs on servers
  • last-mile reducing happens on Brokers - for small datasets
  • Brokers cannot procecss large join datasets - that’s where the multi-stage query engine comes in

Simpler query:

image

More complex query:

image

image

image

Upsert

  • offline segments are immutable
  • real-time segments can be appended-to while being consumed, but once written to the segment store, they are also immutable
  • when ingesting streams of events, this is not a problem
  • when ingesting streams of mutable entities (i.e. orders which change status frequently), lack of upsert capability is a blocker
  • upserts apply only to real-time tables
  • implemented through a metadata map that servers maintain per table

Put simply, if we have consumed A:1, B:2, C:3, and then we get B:4 - we insert it into the segment and we just update the metadata map

image

Delete support

  • upserts originally supported re-writing a given primary key value
  • by itself, that is like dding UPDATE to INSERT in a transactional db
  • DELETE is required for ingesting CDC feeds and other use cases
  • each server maintains a metadata map
  • historically, the metadata map points to a segment/docID/timestamp tuple, which holds the the most recent UPDATE
  • delete adds the equivalent of a tombstone

We see that B gets deleted:

image


Architecting Data and Machine Learning Platforms - Chapter 8 - Architectures for Streaming

The value of streaming

image

The value of a decision typically drops with time; stream processing allows an organization to make decisions in near real time

Industry use cases

  • in healthcare for real-time patient monitoring
  • in finance to detect and prevent fraud
  • in retail for personalised marketing
  • in media and entertainment to generate personalised content and ads

Streaming use cases

  • streaming ingest
  • real-time dashboards
  • stream analytics
  • continuous intelligence

Streaming ingest

Could be done in 2 ways:

  • we could agg events and write only the aggregates to a persistent store (called streaming ETL)
  • we could ingest the data directly into a lake or DWH (streaming ELT)

Streaming ETL

image

Two options for streaming ETL of logs data: micro-batching (shown with dashed lines) or streaming pipeline (shown with solid lines)

Streaming ELT

image

Streaming ELT can be done in micro-batches (dashed lines) or as events happen (solid lines)

Streaming insert

Streaming ELT in a cloud native application can take advantage of client libraries to directly insert the data

image

Streaming from edge devices (IoT)

For streaming data from IoT devices on the edge, make use of IoT-specific functionality

image

Real-time dashboards

Live querying

Dashboards periodically query the DWH using SQL

image

Materialise some views

It is better to not have complex SQL queries in our dashboards. Instead of complex queries we can create views that retrieve the required data and have the dashboard tool query the view.

image

Stream analytics

Useful for:

  • time-series analytics: track assets, predict impact of events, and do predictive maintenance
  • clickstream analysis: to make real-time offers, create dynamic websites, and optimize a customer journey
  • anomaly detection: predict equipment failure, prevent fraud, monitor system health

Time-series analytics

Architecture for time-series analytics:

image

Clickstream analytics

We can use a backfill pipeline to enrich clickstream data and improve identity stitching, privacy redaction, and bot detection

image

Anomaly detection

Anomaly detection involves two streaming pipelines: one to compute clusters within a sliding window and a second to compare incoming events against the clusters

image

Resilient streaming

Handling errors in streaming pipelines requires resilience to ensure uninterrupted operation:

  • store unprocessable events in a dead-letter queue for periodic inspection and logic fixes, preventing data loss

  • to update a pipeline:

    • in-place update: achieve exactly-once semantics by reusing persistent stores
    • pipeline draining: ensure at-least-once processing by completing in-flight data before transitioning

Continuous intelligence through ML

Training model on streaming data

  • windowed training

image

We need a streaming pipe to create a dataset consisting of data within the time window, and an automated training pipe that is parametrized in terms of where it obtains the train data.

  • scheduled training

We can do this in cases where a trained model will be valid for a moderately long time period.

image

  • continuous evaluation and retraining

image

We don’t want to use a model that has drifted in performance. To determine that the model has drifted in performance, we will need to employ continuous evaluation. For instance, if we have a model to predict whether a user will buy an item, you could verify a few days later if the user has bought the item.

Streaming ML inference

image

Streaming ML inference involves invoking trained ML models for predictions as events occur. While embedding the model in the streaming pipeline is efficient, it doesn’t scale well for large models or diverse applications. For non-Python environments or hardware without GPUs, models are often deployed as microservices accessible via HTTP requests. Efficiency improves when processing small batches of events simultaneously, leveraging matrix operations in modern ML frameworks.

Automated actions

image


Architecting Data and Machine Learning Platforms - Chapter 9 - Extending a data platform using Hybrid and Edge

Why multicloud?

While a single cloud provider offers simplicity, cost efficiency, and streamlined learning curves, the reality for many organizations is a multicloud environment.

Multicloud might be inevitable:

  • M&As may lead to inherited cloud infrastructure and mixing clouds
  • seeking best of multiple clouds
  • mitigating vendor lock-in and establishing exit strategies
  • address regulatory requirements and data residency concerns

Multicloud Architectural Patterns

Single Pane of Glass

Utilizing either BI tools or distributed process engines to connect, query, and blend data from various sources, providing a unified view for users

image

Write once, run anywhere

This can be accomplished through

  • managed open source software
  • multicloud products
  • utilizing multiple runners (e.g., Apache Beam)
  • open source abstraction layers
  • running open source software on Infrastructure as a Service (IaaS)

Bursting from On Premises to Cloud

image

We can expand on-premises data lakes into the cloud for specific workloads like large batch jobs, testing version upgrades, or experimenting with new technologies. DistCp can be used to transfer data to cloud blob storage, and PaaS clusters are spun up for processing.

Pass-Through from On Premises to Cloud

image

Similar to bursting, but the data that is moved to cloud storage can be processed by both on-prem tools and cloud-based processing engines, providing greater flexibility

Data Integration Through Streaming

We can use CDC to capture real-time data changes and enable real-time analytics, database replication, and event-driven architectures.

Adopting multicloud

Framework

Example: Architecture Development Model by The Open Group Architecture Framework (ADM by TOGAF)

image

Time scale

  • some orgs may never fully migrate to a public cloud
  • others might be commited to multi-year transitions
  • there are orgs who are not ready to migrate

It is important to identify the time scale at which multicloud is to be adopted

Define a target multicloud architecture

  • define your strategy: clearly articulate business goals, drivers, and identify the time scale for multicloud adoption
  • develop the team: build a ‘Cloud Center of Excellence’ with diverse skillsets to address the complexities of a multicloud environment
  • assess: thoroughly evaluate existing workloads and identify suitable migration paths, considering replatforming or redevelopment
  • design the architecture: define communication, data exchange, and integration mechanisms between components to ensure flexibility, security, performance, and cost efficiency
  • prepare a migration plan: establish clear migration modalities, timelines, effort estimations, milestones, and interdependencies between activities
  • build the landing zone: Create foundational environments on each cloud platform, addressing identity management, network connectivity, security, monitoring, and cost management
  • migrate and transform: execute the migration plan, leveraging new services and potentially rebuilding applications using cloud-native tools

Why edge computing?

Edge computing in a nutshell is the architectural pattern that promotes the execution of the data processing right where the data is generated, even though it’s at the periphery of the architecture.

Benefits

  • reliability: ensures functionality even with intermittent connectivity
  • legal/compliance: facilitates data processing adhering to local regulations
  • security: reduces risk of data exfiltration and strengthens data protection

Challenges:

  • limited computing and storage: edge devices often have specialized hardware with constraints
  • device management/remote control: updating and monitoring devices can be challenging, especially in remote locations
  • backup and restore: implementing robust backup and restore mechanisms in potentially disconnected environments requires careful consideration

Edge Computing Architectural Patterns

Smart devices

Devices themselves possess sufficient processing power to execute complex logic, such as ML algorithms. Requires significant management effort but offers high flexibility.

image

Smart gateways

‘Dumb’ devices/sensors connect to a centralized gateway that performs data processing and analysis.

image

ML Activation on the Edge

Enabling edge devices to execute ML models for real-time decision-making involves data collection, analysis, model development and training (typically in the cloud), deployment to the edge, and feedback data collection for model improvement.

image


Data Mesh

The time has come ~ finally 😆 to read the Data Mesh book

Prologue: Imagine Data Mesh

A fictional company called Daff is presented. It’s a global music and audio streaming company in 2022. Three years prior, they faced significant challenges in harnessing data for business value due to a centralised, monolithic data architecture and functionally siloed teams. This structure resulted in bottlenecks, slow data delivery, and limited data-driven experimentation.

Daff recognized the need for a new approach and embraced data mesh. This entailed a significant organizational and cultural shift, alongside the development of a self-serve data platform.

Key Features

  • decentralized domain ownership: data ownership shifted to cross-functional domain teams, promoting data accountability and alignment with business objectives
  • data as a product: data is treated as a product with defined service-level objectives (SLOs), documentation, and discoverability, ensuring data quality and usability
  • self-serve data platform: a platform that provides tools and services for building, deploying, and consuming data products, enabling autonomy and agility in data sharing and experimentation
  • federated computational governance: global data governance policies are established through collaboration between domain representatives and enforced via the platform

Benefits of data mesh for Daff:

Daff started with data mesh small, proved its benefits and continued growing.

  • enhanced data culture: data mesh fostered a culture of data curiosity and experimentation across domains
  • accelerated data delivery: the self-serve platform and domain ownership enabled rapid data product creation and sharing
  • improved DQ and trust: data product standards and ownership led to higher data quality and trust among data users
  • scalability and agility: the data mesh architecture enabled the addition of new data products and connections, accommodating the company’s growth and evolving business needs

Data Mesh Transformation Journey

Daff undertook a multi-year transformation journey, incrementally implementing data mesh principles, building platform capabilities, and delivering data-driven use cases. It was not just with a flip of a switch. They explored, and are expanding the mesh to different domains. Data mesh is a whole new way of thinking about data so it’s not just doing the tech, it’s also about changing people’s views on data.

The book was written when data mesh [was] arguably still in the innovator and early adopter phase of an innovation adoption curve and this was 2022 which is just 2 years ago ~ but I have seen people talking about data mesh throughout this year so maybe it is picking up, or even still a good framework to aim for.


Data Mesh - Chapter 1 - Data Mesh in a Nutshell

Data mesh is a decentralized sociotechnical approach to share, access, and manage analytical data in complex and large-scale environments—within or across organizations.

Data mesh is a new approach in sourcing, managing, and accessing data for analytical use cases at scale.

The outcomes

To get value from data at scale in complex and large-scale organizations, data mesh sets to achieve these outcomes:

  • respond gracefully to change: a business’ essential complexity, volatility and uncertainty
  • sustain agility in the face of growth
  • increase the ratio of value from data to investment

The shifts

Data mesh dimensions of change

image

Is data mesh an architecture? a list of principles? an operating model?

The author has decided to classify it as a sociotechnical paradigm - an approach that recognizes the interactions between people and the technical architecture and solutions in complex organizations

The principles

In the simplest form data mesh can be described through four interacting principles

Principle of domain ownership

Decentralize the ownership of analytical data to business domains closest to the data—either the source of the data or its main consumers. Decompose the (analytical) data logically and based on the business domain it represents, and manage the life cycle of domain-oriented data independently.

Benefits:

  • the ability to scale out data sharing aligned with the axes of organizational growth
  • optimization for continuous change
  • enabling agility
  • increasing data business truthfulness
  • increasing solutions’ resiliency

Principle of data as a product

With this principle in place, domain-oriented data is shared as a product directly with data users—data analysts, data scientists, and so on.

Data as a product adheres to a set of usability characteristics:

  • discoverable
  • addressable
  • understandable
  • trustworthy and truthful
  • natively accessible
  • interoperable and composable
  • valueable on its own
  • secure

Motivations:

  • remove the possibility of creating domain-oriented data silos
  • create a data-driven innovation culture
  • create resilience to change with built-time and run-time isolation between data products and explicitly defined data sharing contracts
  • get higher value from data by sharing and using data across organizational boundaries

Principle of the self-serve data platform

The platform services manage the full life cycle of individual data products. They manage a reliable mesh of interconnected data products. They provide mesh-level experiences such as surfacing the emergent knowledge graph and lineage across the mesh. The platform streamlines the experience of data users to discover, access, and use data products. It streamlines the experience of data providers to build, deploy, and maintain data products.

Motivations:

  • reduce total cost of decentralized data ownership
  • abstract data management complexity and reduce the cognitive load of domain teams in managing the end-to-end life cycle of their data products
  • mobilize a larger population of devs to embark on data product development and reduce the need for specialization
  • automate governance policies to create security and compliance standards for all data products

Principle of federated computational governance

This principle creates a data governance operating model based on a federated decision-making and accountability structure, with a team composed of domain representatives, data platform, and subject matter experts—legal, compliance, security, etc. The operating model creates an incentive and accountability structure that balances the autonomy and agility of domains, with the global interoperability of the mesh. The governance execution model heavily relies on codifying and automating the policies at a fine-grained level, for every data product, via the platform services.

Motivation:

  • the ability to get higher-order value from aggregation and correlation of independent yet interoperable data products
  • countering the undesirable consequences of domain-oriented decentralizations
  • making it feasible to build in cross-cutting governance requirements such as security, privacy, legal compliance
  • reducing the overhead of manual sync between domains and the governance function

Interplay of the principles

image

Data mesh model at a glance

image

The data

Data mesh focuses on analytical data. It recognizes the blurry delineation of the two modes of data, introduces a new model of tight integration of the two, and yet respects the clear differences between them.

Operational data

  • supports running the business in real time through OLTP systems
  • captured, stored, and processed by applications, microservices, or systems of record
  • represents the current state of the business, designed for transactional integrity and frequent read/write access
  • private to apps/microservices but can be shared via APIs
  • used directly for business transactions and decision-making

Analytical Data

  • provides a historical, integrated, and aggregated view of business data through OLAP systems
  • optimized for insights, comparisons, trends, and training ML models
  • includes intensive read access but minimal writes, with a focus on history and future predictions
  • fuels AI, analytics, and user experience optimization

Chapters 2, 3, 4, and 5 are each about one of the 4 principles of data mesh. For them I decided to casually listen to the audio book since it’s available on O’Reilly while also following the respective chapter’s text. Tomorrow I will continue from chapter 6. I think this book is very in depth and understandably so. Data mesh is a new way of working, designing, and thinking about data and these initial chapters (probably the later ones too) are something to reference back to.


I started thinking about what to write on my last post 😆 I want to share some of the things on LinkedIn/X/Reddit to potentially help other beginners with the resources I found helpful. I need to write something more in-depth here on the blog, but something concise and eye-catching on social media 😆

This is not related to the content on this post but something that I have been thinking about is putting model params in a yaml file and loading them from there so that we can version those and keep track. Last time I saw this was a few days ago from Maria Vechtomova’s linkedin learning course. (I remembered to share it now when I was writing today’s post 😆)

That is all for today!

See you tomorrow :)