(Day 339) Going deeper and deeper into DE

Ivan Ivanov · December 5, 2024

Hello :) Today is Day 339!

A quick summary of today:

  • finishing DQ Fundamentals
  • finishing Apache Iceberg: The Definitive Guide
  • finally reading Learning Spark 2nd ed

Data Quality Fundamentals - Chapter 8 - Democratising DQ

Treating data like a product

Regardless of what data the product visualizes, crunches, or puts to work, there are specific outcomes it should deliver:

  • increased data accessibility (surface data where people need it when they need it)
  • increased data democratization (make it easier for people to manipulate the data)
  • faster ROI on data (quicker insights)
  • time savings for the data team / data consumers
  • more precise insights (i.e. experimentation platforms)

There are also important characteristics or qualities a data product should have:

  • reliability and observability
  • scalability
  • extensibility
  • usability
  • security and compliance
  • release discipline and roadmap

Applying the Data-as-a-Product approach

  • gain stakeholder alignment early–and often
  • apply a product management mindset
  • invest in self-serve tooling
  • prioritize data quality and reliability
  • find the right team structure for your data organization

Building trust in your data platform

  • align your product’s goals with the goals of the business
  • gain feedback and buy-in from the right stakeholders
  • prioritise long-term growth and sustainability vs. short-term gains
  • sign off on baseline metrics for your data and how you measure them
  • know when to build vs buy

Assigning ownership for DQ

Blast radius caused by data downtime

image

Role Facilitate Data Accessibility Make It Easy to Interpret Data Drive Insights and Recommendations Based on Data Ensure Data Compliance Maintain High Data Quality Deliver on Data Reliability
CDO A A A A A A
Business Intelligence R R R I C/I C/I
Analytics Engineer A C R I A R
Data Science C R R I R C
Data Engineering R C C I R R
Data Governance C I C R C I
Product Manager R C C C R R

R = Responsible, A = Accountable, C = Consulted, and I = Informed

Certifying data

Data certification is the process by which data assets are approved for use across the organization after having met mutually agreed upon SLAs for data quality, observability, ownership/accountability, issue resolution, and communication.

Data certification requirements vary based on the needs of the business, the capacity of the data engineering team, and the availability of data, but they typically incorporate features such as the following:

  • automated quality checks for freshness, volume, schema, and distribution
  • delivery SLAs with defined uptime
  • data owners who are accountable for investigating alerts
  • alerts routed to Slack (or email)
  • set communication process for outages

7 steps to implementing a data certification program

  1. Build out your data observability capabilities
  2. Determine your data owners
  3. Understand what ‘good’ data looks like
  4. Set clear SLAs, SLOs, and SLIs for your most important data sets
  5. Develop your communication and incident management processes
  6. Determine a mechanism to tag the data as certified
  7. Train your data team and downstream consumers

Increasing data literacy

  • data catalogs
  • DBMS
  • data modelling tools
  • operational analytics dashboards

Prioritise data governance and compliance

Data governance, the management of data across organizations, is increasingly critical due to regulations like GDPR and CCPA. It ensures data availability, usability, provenance, and security but often fails in execution due to outdated methods and scalability challenges in cloud-based systems. Data catalogs, serving as metadata repositories, are emerging as essential tools, aiding in data lineage, accessibility, and PII management.

Types of data catalogs:

  1. in-house: customizable but resource-intensive and prone to visibility issues over time
  2. third-party: often ML-powered with compliance integrations but sometimes criticized for poor usability
  3. open-source: flexible but manual-intensive, lacking off-the-shelf support for diverse stacks

Despite their utility, data catalogs alone are insufficient for comprehensive governance. Modern programs require tools for data lineage, observability, and automated policy enforcement. Success also hinges on fostering a data culture that aligns teams with governance priorities. Collaboration across engineering, product, and analytics teams is essential to address governance gaps effectively and ensure data reliability at scale.

Building a DQ strategy

Make leadership accountable for DQ

Before you start trying to secure leadership and stakeholder buy-in, it’s important to be transparent about the current state of your data quality strategy. Consider how you might answer the following questions:

  • How do you measure the data quality of the assets your company collects and stores?
  • What are the key KPIs or goals you’re going to hold your data quality strategy accountable for meeting?
  • Do you have cross-functional involvement from leadership and data users in other parts of the company?
  • Who at the company will be held accountable for meeting your strategy’s KPIs and goals?
  • What checks and balances do you have to ensure KPIs are measured correctly and goals can be met?

Set DQ KPIs

  • avoid focusing on data quality measurements. Instead, keep it simple. Measure for tangible metrics like completeness, freshness, accuracy, consistency, and validity as opposed to obscure ‘accuracy’ scores or other homegrown measurements

Spearhead a data governance program

  • be sure to communicate how data quality affects their functional areas, from marketing to sales, and make it easy for them to share and enforce with their team. Focus on short-term or quick wins to get traction while promoting and executing on the long-term strategy

Automate your lineage and data governance tooling

  • invest in automated tools that can quickly validate, monitor, and alert for data quality issues as they arise

Create a communications plan

  • the final step is to put together a robust and comprehensive, program-level communications plan that will keep leadership in the loop, stakeholders aligned with your project’s progress, and data stewards abreast of their marching orders

DQ Fundamentals - Chapter 9 - Data Quality in the Real World: Conversations and Case Studies

Building a data mesh for greater DQ

Traditional systems like siloed data warehouses and lakes fail to meet the growing demands of modern organizations. A data mesh offers a solution by adopting a distributed, domain-oriented, and self-serve design inspired by microservices and domain-driven design principles.

Key features:

  1. domain-oriented ownership: data ownership is federated among domains, each responsible for managing their own ETL pipelines and treating data as a product. Functional domain owners handle ingestion, transformation, and aggregation of data for analytics or operational purposes

  2. self-serve platform: centralized infrastructure supports domain-specific pipelines, enabling teams to abstract technical complexities while maintaining autonomy over their processes

  3. interoperability and standards: a universal interoperability layer standardizes governance, data quality, formatting, and metadata to ensure cross-domain collaboration and consistency. Each domain adheres to service-level agreements and quality measures

Why implement a data mesh?

The authors put together some Qs to help answer if a company should build a data mash

  • Quantity of data sources: How many data sources does your company have?
  • Size of your data team: How many data analysts, data engineers, and product managers (if any) do you have on your data team?
  • Number of data domains: How many functional teams (marketing, sales, operations, etc.) rely on your data sources to drive decision making, how many products does your company have, and how many data-driven features are being built? Add the total
  • Data engineering bottlenecks: How frequently is the data engineering team a bottleneck to the implementation of new data products on a scale of 1 to 10, with 1 being “never” and 10 being “always”?
  • Data governance: How much of a priority is data governance for your organization on a scale of 1 to 10, with 1 being “I could care less” and 10 being “It keeps me up all night”?

Score breakdown:

  • 1–15: given the size and unidimensionality of your data ecosystem, you may not need a data mesh
  • 15–30: the organization is maturing rapidly and may even be at a crossroads in terms of really being able to lean into data. We strongly suggest incorporating some data mesh best practices and concepts so that a later migration might be easier
  • 30 or above: the data organization is an innovation driver for your company, and a data mesh will support any ongoing or future initiatives to democratize data and provide self-serve analytics across the enterprise

Case study insights from a game company

  • “Building your own data stack pays off, as it gives you all of these capabilities and enables you to be data-driven on your product development or working on your team.”
  • “We’ve been through a lot of iterations over our data platform, so you have to choose and be able to understand when it’s the right time to change technology, for which right amount of data, for which process that you’re running.”
  • “It’s very important to have a higher degree of data observability if you want to establish trust in your data. It’s important that you are able to understand when there is a problem, and that you’re able to indicate that easily.”
  • “It’s important to get the basics right before advancing to more advanced data applications. In our case, we should have hired analysts earlier to make more use of the data.”
  • “Establishing a data-driven culture is quite important and sometimes even more important than building the right tech stack.”

Making metadata work for the business

Lineage can be useful in:

  • how to understand data changes that will impact consumers and determine the best course of action to resolve that use case
  • how to troubleshoot the root cause of an issue when data assets break
  • how to communicate the impact of broken data to consumers

The metadata (including but not limited to lineage) should answer more than the basic “who, what, where, when, why?” about your data. It should enable your customers (internal or external) to be equipped with up-to-date and accurate answers to questions that relate back to your customer’s pain points and use cases, including:

  • does this data matter?
  • what does this data represent?
  • is this data relevant and important to my stakeholders?
  • can I use this data in a secure and compliant way?
  • where does the answer to this question come from?
  • who is relying on this asset when I’m making a change to it?
  • can we trust this data?

The true power of metadata lies in where, when, and how we use it—specifically, how we apply it to a specific, timely problem we are trying to solve. In addition to collecting metadata and building metadata solutions, data teams also need to ask themselves:

  • what purpose is this metadata serving?
  • how can I apply it to solve real and relevant customer pain points?

Deciding when to get started with DQ at your company

  • you’ve recently migrated to the cloud
  • your data stack is scaling with more data sources, more tables, and more complexity
  • your data team is growing
  • your team is spending at least 30% of their time firefighting DQ issues
  • your team has more data consumers than they did 1 yr ago
  • your company is moving to a self-service analytics model
  • data is a key part of the customer value proposition

Apache Iceberg: The Definitive Guide - Chapter 10 - Apache Iceberg in Production

Apache Iceberg Metadata Tables

One of the most powerful features of Apache Iceberg is that from its robust metadata, several metadata tables can be generated that can be used to help monitor the health of the table and diagnose where bottlenecks may exist.

The history table

The history metadata table records the table’s evolution. Each of the four fields in this table provides unique insights into the table’s history

  • the snapshot_id field serves as a unique identifier for each snapshot. This identifier enables you to track and reference specific snapshots within the table’s history
  • the parent_id field provides the unique ID of the parent snapshot of the current snapshot. This effectively maps out the lineage of each snapshot, thus facilitating the tracking of the table’s evolution over time
  • the is_current_ancestor field indicates whether a snapshot is an ancestor of the table’s current snapshot. This boolean value (true or false) helps identify snapshots that are part of the table’s present state lineage and those that have been invalidated from table rollbacks

We can use the history metadata table for data recovery and version control as well as to identify table rollbacks.

The metadata_log_entries table

The metadata_log_entries metadata table keeps track of the evolution of the table by logging the metadata files generated during table updates.

  • the timestamp field records the exact date and time when the metadata was updated
  • the file field indicates the location of the datafile that corresponds to that particular metadata log entry
  • the latest_snapshot_id field provides the identifier of the most recent snapshot at the time of the metadata update
  • the latest_schema_id field contains the ID of the schema being used when the metadata log entry was created
  • the latest_sequence_number field signifies the order of the metadata updates

We can use the metadata_log_-entries metadata table to find the latest snapshot with a previous schema

The snapshots table

The snapshots metadata table is essential for tracking dataset versions and histories. It maintains metadata about every snapshot for a given table, representing a consistent view of the dataset at a specific time.

  • the committed_at field signifies the precise timestamp when the snapshot was created
  • the snapshot_id field is a unique identifier for each snapshot
  • the operation field lists a string of the types of operations that occurred, such as APPEND and OVERWRITE
  • the parent_id field links to the snapshot ID of the snapshot’s parent
  • the manifest_list field offers detailed insights into the files comprising the snapshot. It’s like a directory or inventory that keeps a record of all the datafiles associated with a given snapshot
  • the summary field holds metrics about the snapshot, such as the number of added or deleted files, number of records, and other statistical data that provides a quick glance into the snapshot’s content

The files table

The files metadata table showcases the current datafiles within a table and furnishes detailed information about each of them, from their location and format to their content and partitioning specifics.

  • content, represents the type of content in the file, with a 0 signifying a datafile, 1 a position delete file, and 2 an equality delete file
  • file_path gives the exact location of each file
  • file_format field indicates the format of the datafile
  • spec_id field corresponds to the partition spec ID that the file adheres to
  • partition field provides a representation of the datafile’s specific partition
  • record_count field reports the number of records contained within each file
  • file_size_in_bytes field provides the total size of the file in bytes, while column_sizes furnishes the sizes of the individual columns
  • value_counts, null_value_counts, and nan_value_counts
  • lower_bounds and upper_bounds fields hold the minimum and maximum values in each column
  • key_metadata field contains implementation-specific metadata, if any exists
  • split_offsets field provides the offsets at which the file is split into smaller segments for parallel processing
  • equality_ids and sort_order_id fields correspond to the IDs relating to equality delete files, if any exist, and the IDs of the table’s sort order, if it has one

The manifests table

The manifests metadata table details each of the table’s current manifest files. This table offers an array of useful information that assists in understanding the table’s structure and changes over time.

  • the path field provides the filepath where the manifest is stored
  • the length field shows the size of the manifest file
  • the partition_spec_id field indicates the specification ID of the partition that the manifest file is associated with
  • the added_snapshot_id field provides the ID of the snapshot that added this manifest file, offering a link between snapshots and manifests
  • three count fields—added_data_files_count, existing_data_files_count, and deleted_data_files_count—respectively relay the number of new files added in this manifest, the number of existing datafiles that were added in previous snapshots, and the number of files deleted in this manifest
  • the partition_summaries field is an array of field_summary structs that summarize partition-level statistics. It contains the following information: contains_null, contains_nan, lower_bound, and upper_bound

With the manifests metadata table, users can perform various operations, including finding manifests that need rewriting, summing the total number of files added per snapshot, finding snapshots where files were deleted, and determining whether the table is sorted well.

The partitions table

The partitions metadata table provides a snapshot of how the data in a table is divided into distinct, nonoverlapping regions, known as partitions. Each row represents a specific partition within the table.

  • partition, represents the actual partition values, usually based on certain columns of your data
  • the record_count field indicates the total number of records within a given partition
  • the file_count field gives the total number of datafiles present in the partition
  • the spec_id field corresponds to the ID of the partition specification used to generate this partition

For unpartitioned tables, the partitions metadata table will have a single record that will contain only the record_count and file_count fields, as no partitioning is applied to such tables

While the partitions metadata table provides a snapshot of the current state of partitions, it’s important to note that delete files are not applied. As a result, in certain scenarios, partitions may be listed even though all their data rows have been marked for deletion by delete files

There are many use cases for the partitions metadata table, including finding how many files are in a partition, summing the total size in bytes of a partition, and finding the number of partitions per partition scheme. For instance, you may want to see how many files are in a partition, because if a particular partition has a large number of files, it may be a candidate for compaction.

The all_data_files table

It provides comprehensive details about every datafile across all valid snapshots in the table.

  • the first field, content, signifies the type of the file. A value of 0 indicates a datafile, 1 a position delete file, and 2 an equality delete file
  • the file_path field is a string that represents the complete path to the datafile
  • the file_format field indicates the format of the datafile
  • the spec_id field corresponds to the ID of the partition specification used to generate this partition
  • the partition field represents the partition to which this datafile belongs
  • the record_count field gives the total number of records within the file, while file_size_in_bytes represents the size of the datafile in bytes
  • the column_sizes field provides a map between the column ID and the size of that column in bytes
  • the value_counts field gives a map that represents the total count of values for each column in the datafile
  • the lower_bounds and upper_bounds fields are maps that store the minimum and maximum values for each column in the datafile
  • the key_metadata field contains implementation-specific metadata
  • the split_offsets field provides information about split points within the file
  • the equality_ids field relates to equality deletes and helps in identifying rows deleted by equality deletes
  • the sort_order_id field contains the ID of the sort order used to write the datafile
  • the readable_metrics field is a derived field that provides a human-readable representation of the file’s metadata including column size, value counts, null counts, and lower and upper bounds

Some of the use cases include finding the largest table across all snapshots, finding the total file size across all snapshots, and assessing partitions across snapshots.

The all_manifests table

It provides detailed insights into every manifest file across all valid snapshots in the table

  • the first field, content, signifies the type of the file, similar to the all_data_files table. A value of 0 indicates the manifest tracks datafiles; a value of 1 indicates that it tracks delete files
  • the path field is a string representing the complete path to the manifest file
  • the length field represents the size of the manifest file in bytes
  • the partition_spec_id field corresponds to the ID of the partition specification used to write this manifest file
  • the added_snapshot_id field represents the ID of the snapshot when the manifest was created
  • the partition_summaries field is an array of structures, where each structure provides a summary for a specific partition in the manifest file
  • the reference_snapshot_id field represents the ID of the snapshot that this record is associated with

Some of the use cases for this table include finding all manifests for a particular snapshot, monitoring the growth of manifests from snapshot to snapshot, and getting the total size of all valid manifests.

The refs table

It provides a list of all the named references within an Iceberg table. Named references can be thought of as pointers to specific snapshots of the table data, providing an ability to bookmark or version the table state.

  • the first field, name, represents the unique identifier for a named reference. Named references are categorized into two types
  • the type can be one of two values: BRANCH, a mutable reference that can be moved to a new snapshot, or TAG, an immutable reference that, once created, always points to the same snapshot
  • the max_reference_age_in_ms field indicates the maximum duration in milliseconds that a snapshot can be referenced. This age is measured from the time the snapshot was added to the table. If the age of a snapshot exceeds this duration, it will no longer be valid and will be a candidate for cleanup during maintenance operations
  • the min_snapshots_to_keep field provides a lower limit on the number of snapshots to keep in the table history. The Iceberg table will always maintain at least this many snapshots, even if they are older than the max_snapshot_age_ms setting
  • the max_snapshot_age_in_ms field indicates the maximum age in milliseconds for any snapshot in the table. Snapshots that exceed this age could be removed by the maintenance operations, unless they are protected by the min_snapshots_to_keep setting

The refs metadata table helps you understand and manage your table’s snapshot history and retention policy, making it a crucial part of maintaining data versioning and ensuring that your table’s size is under control

The entries table

It offers insightful details about each operation that has been performed on the table’s data and deletes files across all snapshots. Each row in this table captures operations that affected many files at a certain point in the table’s history, making it an essential resource for understanding the evolution of your dataset.

  • the first field, status, is an integer that indicates whether a file was added or deleted in the snapshot. A value of 0 represents an existing file, while 1 indicates an added file and 2 a deleted file
  • snapshot_id is the unique identifier of the snapshot in which the operation took place
  • the sequence_number field indicates the order of operations. This is a global counter across all snapshots of the table, and it increments for each change made, whether the change is an addition, a modification, or a deletion
  • data_file is a struct that encapsulates extensive details about the file involved in the operation

Using the metadata tables in conjunction

We can join the above tables together and gain extra insight.

  • get data on all the files added in a snapshot
  • get a detailed overview of the lifecycle of a particular datafile
  • track the evolution of the table by partition across snapshots
  • monitor files associated with a particular branch
  • find file differences between two branches of a table
  • find the growth in storage by the latest snapshot of each branch

Isolation of changes with branches

Table branching and tagging (Iceberg)

Branching allows creating independent snapshot lineages, useful for isolating data for validation (e.g., ingestion-validation branches). This ensures data integrity during testing before merging into the main table.

Tagging provides named references to snapshots, aiding reproducibility (e.g., tagging snapshots for end-of-quarter audits). Tags specify retention duration and enable easy access to historical states.

Catalog branching and tagging (Nessie)

Branching facilitates safe data validation across multiple tables. Example: weekly_ingest_branch allows testing data before merging into the main production branch.

Tagging marks specific catalog versions for tracking and reproducing states (e.g., quarterly analytics tags). Simplifies accessing historical data.

Benefits of it all:

  • facilitates the safe and isolated testing and validation of new data batches
  • enables easy reproduction of data at regular intervals, such as at the end of each quarter, improving the reliability of analytics
  • assists in maintaining an audit trail of data changes over time
  • aids in identifying different versions of the table for different analytics requirements

Multitable transactions

Multitable transactions ensure atomicity and data consistency by treating multiple operations across tables as a single unit of work, where all changes succeed or are rolled back on failure. This prevents data inconsistencies, supports isolation, and is crucial for systems like supply chain management. Using Nessie, transactions can be isolated in a catalog branch, allowing safe multitable operations. Once validated, changes can be merged into the main branch or discarded, ensuring reliable and consistent data management.

Rolling back changes

  • at the table level with rollback_to_snapshot, rollback_to_timestamp, set_current_snapshot, cherrypick_snapshot
  • at the catalog level with git-like version control

Apache Iceberg: The Definitive Guide - Chapter 11 - Streaming with Apache Iceberg

  • scalability and performance

Apache Iceberg is designed to efficiently store and retrieve information from large datasets. The file management procedures enable it to optimize performance of an ever-changing/growing dataset, making it an excellent choice for streaming analytics.

  • schema evolution

As data changes over time, the structure of the data (the schema) may need to evolve as well. Apache Iceberg allows for schema evolution without interrupting ongoing data streaming processes, making it easier to adapt to changing data requirements

  • reliability

Apache Iceberg provides snapshot isolation, meaning that each transaction operates on an unchanging snapshot of the table. This feature ensures consistent and reliable data, even in environments with many concurrent operations.

  • time travel

Iceberg stores a full history of the table’s metadata, which allows time-travel queries that can access previous versions of the table.

Streaming with Spark

Spark streaming benefits:

  • fault tolerance
  • integration
  • real-time processing
  • window operations
  • high throughput
  • multiple data sources

Benefits:

  • event time processing
  • fault tolerance
  • backpressure handling
  • high throughput and low latency
  • windowing and complex event processing
  • integration
  • exactly-once semantics

Streaming with Kafka Connect

Key features:

  • high throughput
  • fault tolerance
  • real-time integration
  • durability
  • wide integration
  • exactly-once semantics
  • connector framework

The Iceberg Kafka Sink

A Kafka sink is a component in Apache Kafka that allows data to be consumed from Kafka topics and written to external systems or databases. It plays a crucial role in data integration and data flow by enabling seamless transfer of data from Kafka to various data stores, databases, or analytics platforms.

The Apache Iceberg Sink Connector is a specialized Kafka sink connector that is used for writing data from Kafka into Iceberg tables. It provides various features and capabilities, making it a powerful tool for data integration and storage. There are many features of the Apache Iceberg Sink Connector:

  • commit coordination
  • exactly-once delivary semantics
  • multitable fanout
  • row mutations
  • upsert mode

Apache Iceberg: The Definitive Guide - Chapter 14 - Real-World Use Cases of Apache Iceberg

Ensuring High-Quality Data with Write-Audit-Publish in Apache Iceberg

The Write-Audit-Publish (WAP) pattern provides a systematic approach to ensure that data is of good quality

  1. Write: Data is first extracted from sources and written to a nonproduction location, isolating production data from potential inconsistencies
  2. Audit: Once staged, the data undergoes a thorough validation process. This could involve inspecting null or duplicate values, validating data types, and checking data integrity.
  3. Publish: After validation, the data is atomically pushed to production tables, ensuring that consumers see the entire updated dataset or none of it.

WAP using Iceberg’s branching

The WAP process at the table level

image

Create a branch

# Create a Branch on the Table called 'etl_branch'
spark.sql("ALTER TABLE catalog.db.table CREATE BRANCH etl_branch").show()

# Get a count of all records in the table
spark.sql("SELECT COUNT(*) as total_records FROM catalog.db.table").show()

# List all Table References
spark.sql("SELECT * FROM catalog.db.table.refs").show()

Write the data

# Enabling the Apache Iceberg WAP feature
spark.sql("ALTER TABLE catalog.db.table SET TBLPROPERTIES ('write.wap.ena
bled'='true')")

# Setting the table for WAP
spark.conf.set('spark.wap.branch', 'etl_branch')

# Insert New Records Into the Table
spark.sql("INSERT INTO catalog.db.table SELECT * FROM new_data")

# The Number of Records on the 'etl_branch' - will see the table has additional records
spark.sql("SELECT COUNT(*) as total_records FROM catalog.db.table VERSION AS OF
'etl_branch'").show()

# Count of Records on the Main Branch
spark.sql("SELECT COUNT(*) as total_records FROM catalog.db.table VERSION AS OF
'main'").show()

Audit the data

After writing the new data into the isolated local branch, etl_branch, it is essential to ensure that this new dataset stands up to the organization’s quality standards. The audit phase acts as a checkpoint where we subject our data to rigorous evaluation, ensuring its fitness for purpose.

# df = dataframe with our "catalog.db.table" table
# Check for nulls in each column
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in
df.columns])

# Show the result
null_counts.show()

# df = dataframe with our "catalog.db.table" table
# Group by all columns and count
duplicates = df.groupBy(df.columns).count()

# Filter out groups with count > 1, which indicates duplicates
duplicates = duplicates.filter(col("count") > 1)

# Optionally, drop the count column if not needed
duplicates = duplicates.drop("count")

# Show the result (duplicates)
duplicates.show()

# df = dataframe with our "catalog.db.table" table
# Define your date range
start_date = datetime(2024, 1, 1)  # for example, Jan 1, 2024
end_date = datetime(2024, 1, 31)  # for example, Jan 31, 2024

# Convert the date column to date type if it's not already
df = df.withColumn("date_column", col("date_column").cast("date"))

# Filter the DataFrame to find records within the date range
within_range = df.filter((col("date_column") >= start_date) & (col("date_col
umn") <= end_date))

# Count the records that fall within the desired date range
count_within_range = within_range.count()

We compare this count to the difference between the count of the records on the main branch and the count on etl_branch. If they don’t match, we can inspect whether any records have incorrect dates or null dates causing the inconsistency.

If we identify problems, we can fix the data.

# 1. Identifying Null Records
# Check for nulls in each column and create a filter condition
is_null_condition = [col(c).isNull() for c in df.columns]
combined_null_condition = is_null_condition[0]
for condition in is_null_condition[1:]:
    combined_null_condition = combined_null_condition | condition

# 2. Identifying Duplicated Records
# Group by all columns, count occurrences, and filter for counts greater than 1
duplicates_condition = df.groupBy(df.columns) \
                         .count() \
                         .filter(col("count") > 1) \
                         .drop("count") \
                         .distinct()

# 3. Creating DataFrame for Records Needing Remediation
# Union the null and duplicated conditions to find all records needing remedia
tion
records_needing_remediation = df.filter(combined_null_condi
tion).union(df.join(duplicates_condition, df.columns, "inner")).distinct()

# 4. Creating DataFrame for Valid Records
# Use exceptAll to find records that are neither null nor duplicated
valid_records = df.exceptAll(records_needing_remediation)

# Show the records needing remediation
records_needing_remediation.show()

# Show the valid records
valid_records.show()

# Overwrite the table with only the validated records
valid_records.write.format("iceberg").mode("overwrite").save("catalog.db.table")

Now our branch has new validated records, and we’ve shipped off the invalid records to our stakeholders, who can fix them for later backfilling.

Publish the changes

#Query The List of References for the Table
spark.sql("SELECT * FROM catalog.db.table.refs")

#Cherry-picking the snapshot from 'etl_branch' over to 'main'
spark.sql("CALL catalog.system.cherrypick_snapshot('db.table',
2668401536062194692)").show()

# Record count on the 'main' branch
spark.sql("SELECT count(*) FROM catalog.db.table VERSION AS OF 'main'").show();

# Record count on the 'etl_branch' branch - both should be identical
spark.sql("SELECT count(*) FROM catalog.db.table VERSION AS OF 
'etl_branch'").show();

#Turn off the WAP feature
spark.conf.unset('spark.wap.branch')

Running BI Workloads on the Data Lake

The workflow would typically go as follows:

  1. raw data is landed as Apache Iceberg tables in the data lake
  2. virtual data marts and data products are created by creating layers of logical views on these tables
  3. reflections are enabled on a view that a dashboard will be made from. An Apache Iceberg representation of the aggregates will be created behind the scenes
  4. a dashboard will be created on the source, which will feel performant as aggregate queries are executed against the reflection instead of the raw sources

Land the data into the data lake

Once your data is in the form of Apache Iceberg tables, just need to connect your data lake storage or Apache Iceberg catalog (Hive, Nessie, AWS glue) to Dremio to be able to have access to your Apache Iceberg tables

Create virtual data marts/data products

In data warehouses, cleanup, validation, and business logic are typically applied through layered datasets organized for different business units, often referred to as data marts. Dremio facilitates the creation of virtual data marts on the data lakehouse by using logical views instead of physical tables. These views, built on raw Apache Iceberg tables, encapsulate cleanup, validation, and business logic and can be organized into folders for each business unit, effectively structuring data into data marts or data products.

image

Create a reflection to accelerate our dashboard

If we are working with a really large dataset to fuel a dashboard, we may want to create an aggregate reflection to ensure performance

For example, if we were creating a dashboard of Ohio shipments based on the oh_shipments view, we could enable an aggregate reflection on that table with a few clicks on the Dremio UI or through a simple SQL query:

ALTER TABLE arctic.logistics.oh_shipments
  CREATE AGGREGATE REFLECTION oh_shipments_agg
  USING
  DIMENSIONS (shipper_id, destination_city, shipping_method)
  MEASURES (shipment_id (COUNT), total_cost (SUM), delivery_time (AVG))
  LOCALSORT BY (shipper_id, destination_city);

This will create a reflection that is optimized for the particular dimensions and measures needed for the dashboard we are creating and the Apache Iceberg table containing precomputed aggregates called oh_shipments_agg. Although our analysts need to be aware of the existence of this reflection, Dremio will swap out oh_shipments_agg anytime it sees aggregate queries coming in for oh_shipments across the same dimensions and measures. Also, since our source tables are Apache Iceberg tables, reflections can be incrementally applied, allowing the reflection to maintain near-real-time freshness.

Implementing CDC with Iceberg

By focusing on incremental changes—whether they involve inserts, updates, or deletions—CDC ensures that data systems remain synchronized without needing to repeatedly process extensive static data

A sample CDC in a mart:

image

Create Iceberg tables

# table 1 - inventory
spark.sql('''CREATE TABLE glue.test.inventory(
  product_id int,
  product_name string,
  stock_level int,
  price int,
  last_updated date) USING iceberg''')

spark.sql('''INSERT INTO glue.test.inventory VALUES (1, 'Pasta-thin', 60, 45, 
'3/25/2023'),
(2, 'Bread-white', 55, 6, '3/10/2023'),
(3, 'Eggs-nonorg', 100, 8, '3/12/2023'),
(4, 'Sausage-pork', 72, 25, '3/29/2023'),
(5, 'Coffee-vanilla', 30, 45, '3/12/2023'),
(6, 'Maple Syrup', 20, 85, '3/29/2023'),
(7, 'Protein Bar', 120, 5, '3/15/2023')
''')

# and table 2 - inventory_summary

spark.sql('''CREATE TABLE glue.test.inventory_summary(
    product_id string,
    total_stock string,
    avg_price string) USING iceberg''')

spark.sql('''INSERT INTO glue.test.inventory_summary 
SELECT 
    product_id,  
    SUM(stock_level) AS total_stock, 
    AVG(price) AS avg_price
FROM glue.test.inventory_new
GROUP BY product_id;
''')

Apply updates from operational systems

Iceberg allows us to do row-level updates with transactional guarantees

spark.sql('''UPDATE glue.test.inventory
SET stock_level = stock_level - 15
WHERE product_name = 'Bread-white' ''')

Create the change log view to capture changes

We’ll leverage Iceberg’s built-in Spark procedure, create_changelog_view(), to execute this task. To capture changes, we have two options: use start and end snapshot IDs or use start and end timestamps. In this example, we’ll use specific snapshots as our reference points, with the start occurring after the initial inventory table record ingestion and the end following the ETL job execution. To obtain these snapshot IDs, we’ll query the default history metadata table provided by Apache Iceberg

spark.sql("SELECT * FROM glue.test.inventory.history").toPandas()

For example, let’s assume that from inspecting our history table we’ve determined the target snapshot IDs are 4816648710583642722 and 2557325773776943708

spark.sql("""CALL 
glue.system.create_changelog_view( 
     table => 'glue.test.inventory',
     options => map(
     'start-snapshot-id',
     '4816648710583642722',
          'end-snapshot-id',
          '2557325773776943708'
     ))""" )

This procedure creates a change log view called inventory_changes that allows us to see the changes made to the table between the first and second snapshots specified

We can query it:

spark.sql("SELECT * FROM inventory_changes").toPandas()

Merge changed data in the aggregated table

The last step involves updating the downstream aggregated inventory_summary table, utilized by BI reports to extract store stock-level insights

We’ll first create a view from our change log data that is made up of the aggregated changes. We can then merge those aggregated updates into our inventory_summary table to have an updated summary table for our BI dashboard:

## Create the Aggregated View
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW aggregated_changes AS 
    SELECT
        product_id,
        SUM(CASE 
            WHEN _change_type = 'INSERT' THEN stock_level
            WHEN _change_type = 'DELETE' THEN -stock_level
            ELSE 0 END) AS total_stock_change,
        AVG(price) AS new_avg_price
    FROM
        inventory_changes
    GROUP BY
        product_id
""")

## Merge the Aggregated View into our Inventory Summary
spark.sql("""
    MERGE INTO glue.test.inventory_summary AS target
    USING aggregated_changes AS source
    ON target.product_id = source.product_id
    WHEN MATCHED THEN 
        UPDATE SET 
            target.total_stock = target.total_stock + source.total_stock_change
    WHEN NOT MATCHED THEN 
        INSERT (product_id, total_stock, avg_price)
        VALUES (source.product_id, source.total_stock_change, 
source.new_avg_price)
""")

In a prod env, these operations for CDC would typically be automated and continuously monitored for optimal performance and reliability (using Airflow, etc)


Streamed

After the above books I decided to check out Learning Spark 2nd edition.

Following the book’s suggestion I created a community edition account on databricks so that I can play around and execute some of the code myself

image

For now the Scala Spark and PySpark APIs are almost identical - maybe because I am not looking at advanced stuff. I chose to look at scala just so that I gain even minimal familiarity with the language and its peculiarities, but there is a Scala and Python version for each chapter’s code.

I continued reading and following the book’s code after stream casually mainly because it is just going over introductory Spark syntax but I can see why this book is so highly regarded.


That is all for today!

See you tomorrow :)