Hello :) Today is Day 336!
A quick summary of today:
- more about Apache Iceberg
- testing Spark jobs
Apache Iceberg: The Definitive Guide - Chapter 3 - Lifecycle of Write and Read Queries
The Apache Iceberg table format provides high-performance queries during reads and writes, allowing you to run OLAP workloads directly on the data lake. What facilitates this performance is the way the various components of the Iceberg table format are designed. It is therefore critical to understand the structure of these components so that query engines can effectively use them for faster query planning and execution.
Apache Iceberg’s components
Catalog layer
A catalog holds the references to the current metadata pointer, that is, the latest metadata file for each table. Irrespective of whether you are doing a read operation or a write operation, the catalog is the first component that a query engine interacts with. In the case of reads, the engine reaches out to the catalog to learn about the current state of the table, and for writes, the catalog is used to adhere to the schema defined and to know about the table’s partitioning scheme.
Metadata layer
The metadata layer in Apache Iceberg consists of three components: metadata files, manifest lists, and manifest files. Each time a query engine writes something to an Iceberg table, a new metadata file is created atomically and is defined as the latest version of the metadata file. This ensures that a linear history of the table commits, and it helps during scenarios such as concurrent writes (i.e., multiple engines writing data simultaneously). Also, during read operations, engines will always see the latest version of the table. Query engines interact with the manifest lists to get information about partition specifications that help them skip the nonrequired manifest files for faster performance. Finally, information from the manifest files, such as upper and lower bounds for a specific column, null value counts, and partition-specific data, is used by the engine for file pruning.
Data layer
Query engines filter through the metadata files to read the datafiles required by a particular query efficiently. On the write side, datafiles get written on the file storage, and the related metadata files are created and updated accordingly.
Writing queries in Apache Iceberg
Overview:
The write process in Apache Iceberg involves a series of steps that enable query engines to efficiently insert and update data. When a write query is initiated, it is sent to the engine for parsing. The catalog is then consulted to ensure consistency and integrity in the data and to write the data as per the defined partition strategies. The datafiles and metadata files are then written based on the query. Finally, the catalog file is updated to reflect the latest metadata, enabling subsequent read operations to access the most up-to-date version of the data
Creating a table
# Spark SQL
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10, 2),
order_ts TIMESTAMP
)
USING iceberg
PARTITIONED BY (HOUR(order_ts))
# Dremio
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
order_amount DECIMAL(10, 2),
order_ts TIMESTAMP
)
PARTITION BY (HOUR(order_ts))
First, the query is sent to the query engine for parsing. Then, since it is a CREATE statement, the engine will start creating and defining the table.
Then, the engine starts creating a metadata file named v1.metadata.json in the data lake filesystem to store information about the table. Based on the information on the table path, /path/to/warehouse/db1/table1, the engine writes the metadata file. It then defines the schema of the table orders by specifying the columns and data types and stores it in the metadata file. Finally, it assigns a unique identifier to the table: table-uuid. Once the query executes successfully, the metadata file v1.metadata.json is written to the data lake file storage
If we inspect the metadata file we can fine:
{
"table-uuid" : "072db680-d810-49ac-935c-56e901cad686",
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "order_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "customer_id",
"required" : false,
"type" : "long"
}, {
"id" : 3,
"name" : "order_amount",
"required" : false,
"type" : "decimal(10, 2)"
}, {
"id" : 4,
"name" : "order_ts",
"required" : false,
"type" : "timestamptz"
}
},
"partition-spec" : [ {
"name" : "order_ts_hour",
"transform" : "hour",
"source-id" : 4,
"field-id" : 1000
} ]
}
This is the current state of the table - created and empty. In Iceberg terms, this is called a snapshot. Since there is no actual data in the table, so there are no datafiles in your data lake. Therefore, the snapshot doesn’t point to any manifest list; hence, there are no manifest files.
Finally, the engine updates the current metadata pointer to point to the v1.metadata.json file in the catalog file version-hint.text, as this is the present state of the table.
Insert query
# Spark SQL/Dremio's SQL Query Engine
INSERT INTO orders VALUES (
123,
456,
36.17,
'2023-03-07 08:10:23'
)
The query is sent to the query engine for parsing. Since this is an INSERT statement, the engine needs information about the table, such as its schema, to start with query planning.
First, the query engine makes a request of the catalog to determine the location of the current metadata file and then reads it.
Although the engine’s motivation in this case is to insert new datafiles, it still interacts with the catalog, primarily for two reasons:
- it needs to understand the current schema of the table to adhere to it
- it needs to learn about the partitioning scheme to organize data accordingly while writing
After the engine learns about the table schema and the partitioning scheme, it starts writing the new datafiles and the related metadata files. Here’s what happens in this process.
The engine first writes the records as a Parquet datafile (Parquet is the default, but this can be changed) based on the hourly defined partitioning scheme of the table. Additionally, if a sort order is defined for the table, records will be sorted before being written into the datafile
After writing the datafile, the engine creates a manifest file. This manifest file is given information about the path of the actual datafile the engine created. In addition, the engine writes statistical information, such as the upper and lower bounds of a column and the null value counts, in the manifest file, which is highly beneficial for the query engine to prune files and provide the best performance. The engine computes this information while processing the data it’s going to write, so this is a relatively lightweight operation, at least compared to a process that starts from scratch and has to compute the statistics.
Here is a part of a manifest file’s content
{
"data_file" : {
"file_path" :
"s3://datalake/db1/orders/data/order_ts_hour=2023-03-07-08/0_0_0.parquet",
"file_format" : "PARQUET",
"block_size_in_bytes" : 67108864,
"null_value_counts" : [],
"lower_bounds" : {
"array": [{
"key": 1,
"value": 123
}],
}
"upper_bounds" : {
"array": [{
"key": 1,
"value": 123
}],
},
}
}
Next, the engine creates a manifest list to keep track of the manifest file. If existing manifest files are associated with this snapshot, those will also be added to this new manifest list. The engine writes this file to the data lake with information such as the manifest file’s path, the number of datafiles/rows added or deleted, and statistics about partitions, such as the lower and upper bounds of the partition columns. Again, the engine already has all this information, so it’s a lightweight operation to have these statistics. This information helps read queries exclude any nonrequired manifest files, facilitating faster queries
Here’s a part of the content of the manifest list:
{
"manifest_path":
"s3://datalake/db1/orders/metadata/62acb3d7-e992-4cbc-8e41-58809fcacb3e.avro",
"manifest_length": 6152,
"added_snapshot_id": 8333017788700497002,
"added_data_files_count": 1,
"added_rows_count": 1,
"deleted_rows_count": 0,
"partitions": {
"array": [ {
"contains_null": false,
"lower_bound": {
"bytes": "¹Ô\\u0006\\u0000"
},
"upper_bound": {
"bytes": "¹Ô\\u0006\\u0000"
}
} ]
}
}
Finally, the engine creates a new metadata file, v2.metadata.json, with a new snapshot, s1, by considering the existing metadata file, v1.metadata.json (previously current), while keeping track of the previous snapshot, s0. This new metadata file includes information about the manifest list created by the engine, with details such as the manifest list filepath, snapshot ID, and summary of the operation. Also, the engine makes a reference that this manifest list (or snapshot) is now the current one
Here is what the content of the new metadata file looks like:
"current-snapshot-id" : 8333017788700497002,
"refs" : {
"main" : {
"snapshot-id" : 8333017788700497002,
"type" : "branch"
}
},
"snapshots" : [ {
"snapshot-id" : 8333017788700497002,
"summary" : {
"operation" : "append",
"added-data-files" : "1",
"added-records" : "1",
},
"manifest-list" : "s3://datalake/db1/orders/metadata/
snap-8333017788700497002-1-4010cc03-5585-458c-9fdc-188de318c3e6.avro",
} ],
Now the engine goes to the catalog again to ensure that no other snapshots were committed while this INSERT operation was being run. By doing this validation, Iceberg guarantees no interference in operations in a scenario where multiple writers write data concurrently. With any write operation, Iceberg creates metadata files optimistically, assuming that the current version will remain unchanged until the writer commits. Upon completing the write, the engine commits atomically by switching the table’s metadata file pointer from the existing base version to the new version, v2.metadata.json, which now becomes the current metadata file.
Merge query
# Spark SQL
MERGE INTO orders o
USING (SELECT * FROM orders_staging) s
ON o.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET order_amount = s.order_amount
WHEN NOT MATCHED THEN INSERT *;
# Dremio
MERGE INTO orders o
USING (SELECT * FROM orders_staging) s
ON o.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET order_amount = s.order_amount
WHEN NOT MATCHED THEN INSERT (order_id, customer_id, order_amount, order_ts)
VALUES (s.order_id, s.customer_id, s.order_amount, s.order_ts)
The query engine begins by parsing the query and retrieving metadata, including the schema and partitioning details, from the catalog. It then loads data from both tables into memory to determine matching records. For this example, the operation uses the copy-on-write (COW) strategy, rewriting datafiles entirely when changes occur.
Matching records, such as an updated order_amount for order_id=123, are written to a new datafile, while non-matching records, like a new order with order_id=124, are treated as regular inserts and written to a separate partition based on the timestamp. After writing the updated datafiles, the engine creates a manifest file referencing the new file paths and includes statistics like value counts and column bounds.
A new metadata file is generated to reflect these changes, pointing to the updated manifest files and snapshot information. The catalog is updated with the location of the latest metadata file to commit the changes. The process ensures data consistency and tracks modifications efficiently, adhering to the Iceberg table format’s design.
Reading queries in Apache Iceberg
When a read query is initiated, it is sent to the query engine first. The engine leverages the catalog to retrieve the latest metadata file location, which contains critical information about the table’s schema and other metadata files, such as the manifest list that ultimately leads to the actual datafiles. Statistical information about columns is used in this process to limit the number of files being read, which helps improve query performance.
The SELECT query
# Spark SQL/Dremio Sonar
SELECT *
FROM orders
WHERE order_ts BETWEEN '2023-01-01' AND '2023-01-31'
- The query engine interacts with the catalog to get the current metadata file (v3.metadata.json)
- It then gets the current-snapshot-id (S2 in this case) and the manifest list location for that snapshot
- The manifest file path is then retrieved from the manifest list
- The engine determines the datafile path based on the partition filter (2023-03-07-08) from the manifest file
- The matching data from the required datafile is then returned to the user
The time-travel query
# Spark SQL
SELECT * FROM catalog.db.orders.history;
# Dremio
SELECT * FROM TABLE (table_history('orders'))
We get a table with cols like: made_current_at, snapshot_id, parent_id, is_current_ancestor
To perform the time travel:
# Spark SQL
SELECT * FROM orders
TIMESTAMP AS OF '2023-03-07 20:45:08.914'
# Dremio Sonar
SELECT * FROM orders
AT TIMESTAMP '2023-03-07 20:45:08.914'
or
# Spark SQL
SELECT *
FROM orders
VERSION AS OF 8333017788700497002
# Dremio
SELECT *
FROM orders
AT SNAPSHOT 8333017788700497002
- The query engine interacts with the catalog to get the current metadata file (v3.metadata.json)
- It then selects the snapshot (S1 in this case) based on either the timestamp or the version ID supplied in the time-travel query and gets the manifest list location for that snapshot
- The manifest file path is then retrieved from the manifest list
- The engine determines the datafile path based on the partition filter (2023-03-07-08) from the manifest file
- The matching data from the required datafile is then returned to the user
Testing in Spark by Zach Wilson
Not that much notes for this one ~
For the lab we used a library called chispa to create some unit tests.
Example:
from pyspark.sql import SparkSession
query = """
WITH teams_deduped AS (
SELECT *, ROW_NUMBER() OVER(PARTITION BY team_id ORDER BY team_id) as row_num
FROM teams
)
SELECT
team_id AS identifier,
'team' AS `type`,
map(
'abbreviation', abbreviation,
'nickname', nickname,
'city', city,
'arena', arena,
'year_founded', CAST(yearfounded AS STRING)
) AS properties
FROM teams_deduped
WHERE row_num = 1
"""
def do_team_vertex_transformation(spark, dataframe):
dataframe.createOrReplaceTempView("teams")
return spark.sql(query)
def main():
spark = SparkSession.builder \
.master("local") \
.appName("players_scd") \
.getOrCreate()
output_df = do_team_vertex_transformation(spark, spark.table("players"))
output_df.write.mode("overwrite").insertInto("players_scd")
And the test:
from chispa.dataframe_comparer import *
from ..jobs.team_vertex_job import do_team_vertex_transformation
from collections import namedtuple
TeamVertex = namedtuple("TeamVertex", "identifier type properties")
Team = namedtuple("Team", "team_id abbreviation nickname city arena yearfounded")
def test_vertex_generation(spark):
input_data = [
Team(1, "GSW", "Warriors", "San Francisco", "Chase Center", 1900),
Team(1, "GSW", "Bad Warriors", "San Francisco", "Chase Center", 1900),
]
input_dataframe = spark.createDataFrame(input_data)
actual_df = do_team_vertex_transformation(spark, input_dataframe)
expected_output = [
TeamVertex(
identifier=1,
type='team',
properties={
'abbreviation': 'GSW',
'nickname': 'Warriors',
'city': 'San Francisco',
'arena': 'Chase Center',
'year_founded': '1900'
}
)
]
expected_df = spark.createDataFrame(expected_output)
assert_df_equality(actual_df, expected_df, ignore_nullable=True)
And the homework is to convert 2 more queries from week 1-2 to spark jobs and write a test for them but I will do that in the coming days ~
That is all for today!
See you tomorrow :)