Hello :) Today is Day 346!
A quick summary of today:
- chapter 3 of Stream Processing with Apache Flink
- reading the Flink learn docs
- starting an Airflow 101 course by Astronomer
Stream Processing with Apache Flink - Chapter 3 - The Architecture of Apache Flink
System architecture
Components of a Flink setup
A Flink setup consists of four different components that work together to execute streaming applications. These components are a JobManager, a ResourceManager, a TaskManager, and a Dispatcher. Since Flink is implemented in Java and Scala, all components run on JVMs
- JobManager is the master process controlling the execution of a single application in Flink. When an application is submitted, it receives a JobGraph and accompanying JAR file, which it then transforms into an ExecutionGraph—a physical dataflow graph of parallel tasks. The JobManager’s critical role involves requesting the necessary TaskManager slots from the ResourceManager and subsequently distributing these tasks across available TaskManagers. Throughout the execution, it maintains central coordination responsibilities, such as managing checkpoints and ensuring the overall orchestration of the application’s workflow
- ResourceManager acts as the resource allocation backbone of Flink. Its primary function is managing TaskManager slots, which represent Flink’s fundamental processing resources. When a JobManager requests slots, the ResourceManager coordinates with TaskManagers to offer available processing capacity. If insufficient slots are available, it can interact with the underlying resource provider to provision additional containers for launching new TaskManager processes. This dynamic resource management ensures efficient utilization of computational resources and enables flexible scaling of Flink applications
- TaskManagers serve as the worker processes in the Flink ecosystem, typically running multiple instances within a cluster. Each TaskManager provides a specific number of slots that determine its task execution capacity. After initialization, these workers register their slots with the ResourceManager and await instructions to offer them to JobManagers. Once assigned tasks, TaskManagers execute them and maintain data exchange capabilities with other TaskManagers running tasks from the same application. This distributed execution model allows Flink to process complex dataflows across multiple concurrent computing units
- Dispatcher functions as a persistent service that spans across job executions, providing a crucial REST interface for application submissions. It initiates a JobManager for each submitted application and hands over the execution details. The Dispatcher also hosts a web dashboard that offers insights into job executions, making it a valuable monitoring and management component. Its ability to serve as an HTTP entry point is particularly useful for clusters situated behind firewalls, though its requirement can vary depending on the specific application deployment method.
Application deployment
- framework style: packeged in a jar file and submitted by a client to a running service
- library style: bundled into a docker-style container image
Task execution
A TaskManager can execute several tasks at the same time. These tasks can be of the same operator (data parallelism), or from a different application (job parallelism). The TaskManager offers a certain number of processing slots to control the number of tasks it’s able to concurrently execute. A processing slot can execute one slice of an app - one parallel task of each operator of the app.
A TaskManager executes its tasks multithreaded in the same JVM process. Threads are more lightweight than separate processes and have lower communication costs but do not strictly isolate tasks from each other. Hence, a single misbehaving task can kill a whole TaskManager process and all tasks that run on it.
Highly available setup
Streaming is supposed to run 24/7, so its important their execution doesn’t stop even if some proecss fails. To recover from failures, the system first needs to restart failed processes, and second, restart the application and recover its state.
- TaskManager failures: if in a Flink setup with 4 TaskManagers each with two slots, 1 TM fails the JobManager will ask the ResourceManager to provide more processing slots. If this is not available, then the JobManager cannot restart the app until enough slots become available
- JobManager failures: the JM controls the execution of a streaming app and keeps metadata about its execution and a streaming app cannot continue processing if the responsible JM process disappears. Flink supports a high-availability mode that migrates the responsibility and metadata for a job to another JobManager in case the original JobManager disappears - this is Apache Zookeeper. When a JobManager fails, all tasks that belong to its application are automatically cancelled. A new JobManager that takes over the work of the failed master performs the following steps:
- it requests the storage locations from ZooKeeper to fetch the JobGraph, the jar file, and the state handles of the last checkpoint of the app from the remote storage
- it requests processing slots from the ResourceManager to continue executing the app
- it restarts the app and resets the state of all its tasks to the last complete checkpoint
When running an application as a library deployment in a container environment, such as Kubernetes, failed JobManager or TaskManager containers are usually automatically restarted by the container orchestration service.
Data transfer in Flink
The TaskManagers take care of shipping data from sending tasks to receiving tasks. The network component of a TaskManager collects records in buffers before they are shipped - records are not shipped one by one but batched into buffers.
Each TaskManager has a pool of network buffers (by default 32 KB in size) to send and receive data. In order to enable a smooth pipelined data exchange, a TaskManager must be able to provide enough buffers to serve all outgoing and incoming connections concurrently
When a sender task and a receiver task run in the same TaskManager process, the sender task serializes the outgoing records into a byte buffer and puts the buffer into a queue once it is filled. The receiving task takes the buffer from the queue and deserializes the incoming records. Hence, data transfer between tasks that run on the same TaskManager does not cause network communication.
Event-time processing
Timestamps
When Flink processes a data stream in event-time mode, it evaluates time-based operators based on the timestamps of records. For example, a time-window operator assigns records to windows according to their associated timestamp
Watermarks
In addition to record timestamps, a Flink event-time application must also provide watermarks. Watermarks are used to derive the current event time at each task in an event-time application. Time-based operators use this time to trigger computations and make progress
They have 2 basic properties:
- they must be monotonically increasing to ensure the event-time clocks of tasks are progressing and going forward
- they are related to record timestamps - a watermark with a timestamp T indicates that all subsequent records should have timestamps > T
When a task receives a watermark, the following actions take place:
- the task updates its internal event-time clock based on the watermark’s timestamp
- the task’s time service identifies all timers with a time smaller than the updated event time. For each expired timer, the task invokes a callback function that can perform a computation and emit records
- the task emits a watermark with the updated event time
State management
A task receives some input data. While processing the data, the task can read and update its state and compute its result based on its input data and state
In Flink, state is always associated with a specific operator. In order to make Flink’s runtime aware of the state of an operator, the operator needs to register its state. There are two types of state, operator state and keyed state, that are accessible from different scopes
Operator state
Operator state is scoped to an operator task. This means that all records processed by the same parallel task have access to the same state. Operator state cannot be accessed by another task of the same or a different operator
Keyed state
Keyed state is maintained and accessed with respect to a key defined in the records of an operator’s input stream. Flink maintains one state instance per key value and partitions all records with the same key to the operator task that maintains the state for this key. When a task processes a record, it automatically scopes the state access to the key of the current record and all records with the same key access the same state.
Checkpoints, Savepoints, and State recovery
Consistent checkpoints
A consistent checkpoint of a stateful streaming application is a copy of the state of each of its tasks at a point when all tasks have processed exactly the same input. This can be explained by looking at the steps of a naive algorithm that takes a consistent checkpoint of an application.
The image shows an application that has a single source task that consumes a stream of increasing numbers—1, 2, 3, ~~. The stream of numbers is partitioned into a stream of even and odd numbers. Two tasks of a sum operator compute the running sums of all even and odd numbers. The source task stores the current offset of its input stream as state. The sum tasks persist the current sum value as state
Recovery from a consistant checkpoint
During the execution of a streaming application, Flink periodically takes consistent checkpoints of the application’s state. In case of a failure, Flink uses the latest checkpoint to consistently restore the application’s state and restarts the processing
An application is recovered in three steps:
- restart the whole application
- reset the states of all stateful tasks to the latest checkpoint
- resume the processing of all tasks
Flink’s checkpointing algorithm produces consistent distributed checkpoints from streaming applications without stopping the whole application. However, it can increase the processing latency. Thankfully, there are tweaks that can alleviate the performance impact under certain conditions.
Savepoints
Savepoints are consistent snapshots of an application’s state, created using the same algorithm as checkpoints but with additional metadata. Unlike checkpoints, savepoints are not automatically created or cleaned up; they must be explicitly triggered by the user or an external scheduler. While checkpoints primarily support failure recovery, savepoints enable additional use cases beyond recovery.
Stream Processing with Apache Flink - Chapter 5 - The DataStream API
This chapter was mainly to follow the examples in Scala(or Java) and creating:
- an input stream
- basic transformations (map, filter, flatmap)
- keyedStream transformations (keyBy, rolling aggs, reduce)
- multistream transformations (union, connect, coMap, coFlatMap, split and select)
In addition:
- setting up parallelism
- supported data types
- defining keys and fields
- defining custom functions
After this I checked out the remaining few chapters and seem they are structured like a documentation of sort - going over the theoretical concepts covered in the previous chapters and showing the code for it - so I guess this is a very nice resource when starting to build your 1st Flink streaming job.
Stream
I found some Flink learn material on the Flink docs.
Not my best stream 😆 maybe because I had a minor headache 😆
Second stream on Airflow 101
I found that Astronomer has an online free academy series of courses on airflow and I got very excited 🥳 and after I rested after the 1st stream I decided to start the Apache Airflow 101 course and cover some of the 1st module. They are amazing and I already learned a bit about airflow’s internals - web server, medatadabase, scheduler, executor, tasks, operators and how they communicate between each other. Excited to continue with the other courses on the astronomer academy.
That’s all for today!
See you tomorrow :)