(Day 337) More about Iceberg and DQ

Ivan Ivanov · December 3, 2024

Hello :) Today is Day 337!

A quick summary of today:

  • Apache Iceberg: The Definitive Guide - Chapter 4 - Optimizing the Performance of Iceberg Tables
  • Data Quality Fundamentals - Chapter 6 - Fixing Data Quality Issues at Scale

Apache Iceberg: The Definitive Guide - Chapter 4 - Optimizing the Performance of Iceberg Tables

Compaction

Compaction takes many smaller files and processes them into fewer bigger files

image

Hands-on with Compaction

Compaction strategies

Strategy What it does Pros Cons
Binpack Combines files only; no global sorting (will do local sorting within tasks) This offers the fastest compaction jobs. Data is not clustered.
Sort Sorts by one or more fields sequentially prior to allocating tasks (e.g., sort by field a, then within that, sort by field b) Data clustered by often queried fields can lead to much faster read times. This results in longer compaction jobs compared to binpack.
Z-order Sorts by multiple fields that are equally weighted, prior to allocating tasks (X and Y values in this range are in one grouping; those in another range are in another grouping) If queries often rely on filters on multiple fields, this can improve read times even further. This results in longer-running compaction jobs compared to binpack.

Sorting

Sorting (or “clustering”) data improves query performance by reducing the number of files that need to be scanned. When data is sorted, similar values are grouped into fewer files, making queries more efficient. For example, if NFL player data is sorted by team, querying for players from the Detroit Lions will require scanning fewer files compared to unsorted data.

image

Sorted datasets results in scanning fewer datafiles

We can combine compacting and sorting:

image

Z-order

There are times when multiple fields are a priority when querying a table, and this is where a z-order sort may be quite helpful. With a z-order sort you are sorting the data by multiple data points, which allows engines a greater ability to reduce the files scanned in the final query plan

image

So if we know what data we are looking for based on fields we z-ordered by, we can possibly avoid searching large portions of the data since it’s sorted by both fields. We can then take that quadrant and break it down even further and apply another z-order sort to the data in the quadrant, as shown in “C” in the figure. Since our search is based on multiple factors (X and Y), we could eliminate 75% of the searchable area by taking this approach.

Usage:

CALL catalog.system.rewrite_data_files(
  table => 'people',
  strategy => 'sort',
  sort_order => 'zorder(age,height)'
)

Using the sort and z-order compaction strategies not only allows you to reduce the number of files your data exists in, but also makes sure the order of the data in those files enables even more efficient query planning.

While sorting is effective, it comes with some challenges. First, as new data is ingested, it becomes unsorted, and until the next compaction job, the data remains somewhat scattered across multiple files. This occurs because new data is added to a new file and is potentially sorted within that file but not in the context of all previous records. Second, files may still contain data for multiple values of the sorted field, which can be inefficient for queries that only require data with a specific value.

Partitioning

If you know a particular field is pivotal to how the data is accessed, you may want to go beyond sorting and into partitioning. When a table is partitioned, instead of just sorting the order based on a field, it will write records with distinct values of the target field into their own datafiles.

For example in politics, we may query a table a lot by party

image

Copy-on-Write Versus Merge-on-Read

Another consideration when it comes to the speed of your workloads is how you handle row-level updates. When you are adding new data, it just gets added to a new datafile, but when you want to update preexisting rows to either update or delete them, there are some considerations you need to be aware of:

  • in data lakes, and therefore in Apache Iceberg, datafiles are immutable, meaning they can’t be changed. This provides lots of benefits, such as the ability to achieve snapshot isolation (since files that old snapshots refer to will have consistent data)
  • if you’re updating 10 rows, there is no guarantee they are in the same file, so you may have to rewrite 10 files and every row of data in them to update 10 rows for the new snapshot

There are 3 approaches:

Update style Read speed Write speed Best practice
Copy-on-write Fastest reads Slowest updates/deletes  
Merge-on-read (position deletes) Fast reads Fast updates/deletes Use regular compaction to minimize read costs.
Merge-on-read (equality deletes) Slow reads Fastest updates/deletes Use more frequent compaction to minimize read costs.

Copy-on-Write

image

Merge-on-Read

If you are deleting a record:

  • the record is listed in a delete file
  • when a reader reads the table, it will reconcile the datafile with the delete file

If you are updating a record:

  • the record to be updated is tracked in a delete file
  • a new datafile is created with only the updated record
  • when a reader reads the table, it will ignore the old version of the record because of the delete file and use the new version in the new datafile

image

When you have a ton of data and you want to kick out a specific row, you have a couple of options:

  • we can look for the row data based on where it’s sitting in the dataset, kind of like finding your friend in a movie theater by their seat number, or
  • we can look for the row data based on what it’s made of, like picking out your friend in a crowd because they’re wearing a bright red hat

If we use the first option, we’ll use what are called positional delete files. But if we use the second option, we’ll need equality delete files.

Other considerations

This section outlines various techniques for optimizing performance when working with data files and tables, particularly in systems like Apache Iceberg. Key areas discussed include:

Metrics collection:

  • metrics like counts, bounds, and distinct values are tracked to optimize queries
  • overhead from metrics tracking in wide tables can be reduced by customizing column-specific metrics using table properties (e.g., none, counts, truncate, full)

Rewriting manifests:

  • consolidates manifest files to reduce overhead when listing data files without rewriting the data itself
  • helps when datafile sizes are optimal but too many manifests exist
  • spark caching can be disabled to handle memory issues during this process

Optimizing storage:

  • periodic snapshot expiration removes old, unnecessary data files to save space but disables time-travel to expired snapshots
  • orphan files created by failed jobs can be identified and removed with a specific procedure

Write distribution mode:

  • optimizes file writes by controlling how records are distributed across tasks
  • distribution modes include none (fastest), hash (grouped by partition key), and range (grouped by sort order)
  • custom distribution modes can be specified for updates, deletes, and merges

Object storage optimization:

  • addresses request throttling issues in object storage by distributing files across multiple prefixes using hashing
  • improves scalability and concurrency for large partitions

Bloom filters:

  • efficiently checks for the potential presence of values in datasets to reduce unnecessary scans
  • implemented via table properties, enabling faster queries by allowing engines to skip irrelevant data files

Data Quality Fundamentals - Chapter 6 - Fixing Data Quality Issues at Scale

Fixing quality issues in software development

Principles from DevOps and site reliability engineering (SRE) can be applied to improve data quality management at scale. DevOps uses a continuous feedback loop with eight stages: plan, code, build, test, release, deploy, operate, and monitor. This cycle ensures reliable, performant software aligned with business objectives.

Adopting similar incident management practices for data systems allows teams to proactively address data quality issues, moving beyond reactive approaches. Effective incident management involves identifying, resolving, analyzing, and preventing disruptions to maintain operational continuity. By integrating these practices and investing in appropriate tools, data teams can enhance analytics scalability and reliability.

Data incident management

As data systems become increasingly distributed and companies ingest more and more data, the opportunity for error (and incidents) only increases. For decades, software engineering teams have relied on a multistep process to detect, resolve, and prevent issues from taking down their applications. As data operations mature, it’s time we treat our data systems with the same diligence, particularly when it comes to building more reliable pipelines.

The authors recommend to use the ‘data reliability life cycle’, inspired by DevOps practices;

image

Incident detection

  • anomaly detection identifies deviations from normal data patterns
  • anomaly detection should be implemented across the entire data ecosystem not just in isolated areas
  • while essential anomaly detection alone is insufficient for effective incident management it should be combined with testing versioning observability lineage and other tools and processes

Incident response

  • effective communication is key to a successful incident response
  • data teams should develop runbooks instructions for specific services and playbooks step-by-step procedures for handling incidents to streamline incident response
  • role delegation similar to traditional site reliability engineering can improve incident response this includes an incident commander to coordinate tasks and communication and an incident responder to troubleshoot the issue
  • end-to-end lineage helps to understand the impact of data downtime and quickly communicate with affected teams

Root cause analysis (RCA)

  • identifying the root cause of data incidents is often complex and requires a holistic approach
  • the ‘five whys’ approach can be a helpful framework for rca this involves repeatedly asking “why” to drill down to the root cause
  • the sources suggest a five-step process for rca
    1. examine the lineage to identify the upstream source of the problem
    2. review the code that generated the affected table to identify potential issues
    3. analyse the data for patterns and clues related to the incident
    4. investigate the operational environment for errors delays or infrastructure issues
    5. consult with peers for insights and past experiences with similar incidents

Resolution

  • once the root cause is identified the next step is to resolve the issue and communicate the resolution to stakeholders
  • resolutions can involve pausing and restarting pipelines troubleshooting code or implementing more permanent solutions
  • continuous communication through dedicated channels is essential during the resolution process

Blameless postmortem

  • a postmortem meeting along with documentation should be conducted after resolving an incident to review the incident identify lessons learned and improve future responses
  • postmortems should focus on learning and improvement rather than assigning blame
  • postmortems are opportunities to
    • assess incident readiness and update runbooks
    • review monitoring alerting and workflow tools
    • document the incident and share it with the data team
    • revisit service-level agreements slas
    • improve system resilience and prevent future data downtime

Incident response and mitigation

Proactive approach to data reliability that incorporates testing, CI/CD, discovery, and observability, necessary components of managing and preventing data incidents

image

Establishing a routine of incident management

Establishing a routine for incident management ensures data reliability by assigning clear responsibilities and improving response times. A key role is the incident commander, who coordinates efforts, assesses severity, and maintains communication during data issues. Rotating commanders weekly or assigning them to specific datasets fosters accountability.

Key steps:

  1. route notifications to relevant team members: centralized or decentralized teams must quickly identify and notify the appropriate stakeholders using tools like Slack
  2. assess incident severity: prioritize issues based on their business impact using lineage tools to identify critical data assets and reduce time spent on non-essential issues
  3. communicate status updates: use runbooks, status pages, and clear delegation to keep stakeholders informed and reduce duplication of effort
  4. define and align on SLOs and SLIs: measure metrics like time to detection (TTD) and time to resolution (TTR) to enhance reliability and prevent future downtime

Incident commanders, supported by automation and structured processes, play a crucial role in minimizing downtime and improving data pipeline resilience.


That is all for today!

See you tomorrow :)