Hello :) Today is Day 344!
A quick summary of today:
- new Andriy Burkov book
- more about Flink and Kafka
Andriy Burkov’s new book on LLMs
I saw that Andriy Burkov put out the first 4 chapters of his upcoming book online for free ~ so it’s time to check them out.
Chaprer 1 - ML Basics
AI
- first chatbot ELIZA in 1966 that answered depending on input pattern matching
- AI winters 1974-1980 and 1987-2000
- the term AI was somewhat of a taboo during AI winters and researchers renamed their works to things like pattern matching, smart decision systems
- things picked up again since 2012
ML
- decision boundaries
- decision trees
- random forests
- SVMs
Model
- y = f(x)
- f(x) = wx + b
- MSE and the loss function
- an overview of differentiation and derivatives
4-step ML process
- collect a dataset
- define the model’s structure
- define the loss function
- minimise the loss function
Chapter 2 - Vectors, Matrices, and Neural networks
Vectors
- dot product
- element-wise sum and product
- unit vectors, zero vectors, vector norm
Unit vectors are useful since their dot product equals the cosine of the angle between them, and computing the dot product is fast. When documents are represented as unit vectors, finding similar ones to a query becomes efficient by calculating the dot product between the query’s vector and the document vectors.
Neural networks
- a fixed non-linear function is applied to the outputs of trainable linear functions
- deeper structure compared to linear models
They are of the structure: ф(wx + b), where ф is an activation function. And with layers these are stacked
Matrices
- sum and product of matrices
- transposing a matrix
- matrix-vector operations
There are these very neat graphs showing how it works:
Gradient Descent
- intro to the cross-entropy loss
- intro to gradient descent
- initialise params
- compute preds
- compute the gradient
- update w and b based on a learning rate
- calculate the loss
- iterate
Chapter 3 - Representing text
Bag of Words
- create a vocabulary: a list all unique words in the corpus
- vectorize documents: represent each document as a feature vector where each dimension of the vector indicates whether a word from the vocabulary is present, absent, or its frequency in the document
Feature vectors can be organized into a document-term matrix, where rows represent documents, and columns represent tokens.
In natural languages, word frequencies follow Zipf’s Law. According to the law, a word’s frequency is inversely proportional to its rank in a frequency table. For example, the second most frequent word occurs half as often as the most frequent one. Consequently, most words in a document-term matrix are rare, resulting in a sparse matrix filled with many zeros.
BoW cannot capture order or context of words - for this n-grams can be used. But for n-grams the problem becomes that the vocab grows significantly. BoW also does not work well with out-of-bag words - words not seen in the original vocabulary. This is all addressed with word embeddings
Word embeddings
Word embeddings address the limitations of traditional bag-of-words (BoW) and one-hot encoding by representing words as dense vectors that encode semantic relationships. Unlike one-hot vectors, which are sparse and carry little information, embeddings are lower-dimensional, non-sparse, and map semantically similar words like films and movies to vectors with high cosine similarity.
The word2vec skip-gram algorithm is a popular method for learning embeddings. It trains a model to predict a word’s context from the word itself or vice versa
The image is of a skip-gram model with a skip-gram size of 5 and the embedding layer of 300 units
Word embeddings are fundamental in NLP and modern models also integrate subword tokenization, such as byte-pair encoding, to enhance representation capabilities.
Byte-Pair Encoding (BPE)
It tries to handle the difficulty of handling out-of-bag words by splitting words into smaller pieces called subwords
- Initialization:
- use a text corpus, splitting each word into individual characters (i.e ‘hello’ -> “h e l l o”)
- the initial vocabulary comprises all unique characters in the corpus
- Iterative merging:
- count adjacent symbol pairs: identify all pairs of adjacent symbols in the corpus (e.g. ‘h e’, ‘e l’, ‘l l’, ‘l o’)
- select the most frequent pair: choose the pair with the highest count (e.g.’l l’)
- merge the selected pair: replace all occurrences of the pair with a single merged symbol (e.g. ‘l l’ → ‘ll’, transforming ‘h e l l o’ into ‘h e ll o’)
- update the vocabulary: add the new merged symbol to the vocabulary
- Repeat:
- continue merging the most frequent pairs until the vocabulary reaches the desired size
Chapter 4 - Language Model Basics
Language model
An LM predicts the next token in a sequence by estimating its conditional probability based on previous tokens. It assigns a probability to all possible next tokens, enabling the selection of the most likely next one. Such models are also called autoregressive because for the next prediction they only use the preceding elements.
There are also masked LMs like BERT that mask a word in a sentence and try to predict it based on its surroundings
Evaluating LMs
- perplexity: a perplexity of 5 (for instance) means that, on average, the model acts as if it selects from roughly 5 equally likely options for each word in the given context. A perplexity of 5 is relatively low, showing the model is confident in its predictions for this sequence. But for a proper evaluation, perplexity must be measured on a much larger test set
- ROUGE: used for comparing summarisations and machine translations
- human evaluation (likert scale ratings, pairwise comparison, elo ratings)
Stream Processing with Apache Flink - Chapter 2 - Stream Processing Fundamentals
Introduction to dataflow programming
Dataflow graphs
A dataflow program describes how data flows between operations
This is a logical dataflow graph to continuously count hashtags (nodes represent operators and edges denote data dependencies)
The above is a logical graphe because it shows a high-level view of the computational logic
In order to execute a dataflow program, its logical graph is converted into a physical dataflow graph, which specifies in detail how the program is executed. If we use distributed computing, the physical plan might look like:
Data parallelism and Task parallelism
We can exploit parallelism in dataflow graphs in different ways
- we can partition the input data and have tasks of the same operation execute on the data subsets in parallel (data parallelism)
- we can have tasks from different operators performing computations on the same or different data in parallel (task parallelism)
Data exchange strategies
They define how data items are assigned to tasks in a physical dataflow graph. They can also be automatically chosen by the execution engine depending on the semantics of the operators or explicitly imposed by the dataflow programmer. Here are some strategies:
- forward: it sends data from a task to a receiving task. If both tasks are located on the same physical machine (which is often ensured by task schedulers), this exchange strategy avoids network communication
- broadcast: it sends every data item to all parallel tasks of an operator. Because this strategy replicates data and involves network communication, it is fairly expensive
- key-based: it partitions data by a key attribute and guarantees that data items having the same key will be processed by the same task
- random: it uniformly distributes data items to operator tasks in order to evenly distribute the load across computing tasks
Processing streams in parallel
Latency and Throughput
Latency indicates how long it takes for an event to be processed. Essentially, it is the time interval between receiving an event and seeing the effect of processing this event in the output.
Throughput is a measure of the system’s processing capacity—its rate of processing. That is, throughput tells us how many events the system can process per time unit.
Operations on data streams
- stateless vs stateful operations
This was covered very well by the Kafka 101 on Confluent. Definitely recommend that course. And to be honest this whole book’s material is probably covered by the free video courses on Confluent 😆 I am reading in order to confirm my knowledge and see if there is something new mentioned.
Time semantics
Processing time
Processing time is the time of the local clock on the machine where the operator processing the stream is being executed. A processing-time window includes all events that happen to have arrived at the window operator within a time period, as measured by the wall clock of its machine
Event time
Event time is the time when an event in the stream actually happened. Event time is based on a timestamp that is attached to the events of the stream. Timestamps usually exist inside the event data before they enter the processing pipeline (e.g. the event creation time)
Operations based on event time are predictable and their results are deterministic. An event time window computation will yield the same result no matter how fast the stream is processed or when the events arrive at the operator.
Handling even delays can be a challenge
Watermarks
A progress metric used in event-time window processing to signal when all events up to a certain timestamp are expected to have been received. They help manage out-of-order events and trigger computations based on event time.
Tradeoffs
- eager watermarks: lower latency but less confidence, requiring mechanisms to handle late events
- relaxed watermarks: higher confidence but increased processing latency
Challenges:
- perfect watermarking is difficult due to unpredictable delays in distributed systems (i.e straggler tasks, disconnected users)
- systems must handle late events through strategies like ignoring, logging, or using them to adjust results
Processing vs Event time
Processing time and event time serve different purposes in data processing. Processing time is advantageous when low latency is critical, as it immediately triggers computation after a specified duration, ignoring late or out-of-order events. This makes it suitable for applications prioritizing speed over accuracy, such as real-time monitoring dashboards or scenarios requiring a direct representation of event streams, like outage detection. However, results depend on processing speed and lack determinism. Conversely, event time ensures deterministic results by handling late or out-of-order events, making it ideal for use cases requiring accuracy and completeness over immediacy.
State and consistency models
State operations impose a few challenges:
- state management
- state partitioning
- state recovery
Task failures
For each event in the input stream, a task is a processing step that performs the following steps:
- receives the event, storing it in a local buffer
- possibly updates internal state
- produces an output record.
A failure can occur during any of these steps and the system has to clearly define its behavior in a failure scenario.
Result guarantees
- at-most-once
- at-least-once
- exactly-once
- end-to-end exactly-once
Kafka Streams 101
At the heart of Kafka is the log, which is simply a file where records are appended. The log is immutable, but you usually can’t store an infinite amount of data, so you can configure how long your records live.
The storage layer for Kafka is called a broker, and the log resides on the broker’s filesystem. A topic is simply a logical construct that names the log—it’s effectively a directory within the broker’s filesystem.
You put data into Kafka with producers and get it out with consumers: Producers send a produce request with records to the log, and each record, as it arrives, is given a special number called an offset, which is just the logical position of that record in the log. Consumers send a fetch request to read records, and they use the offsets to bookmark, like placeholders. For example, a consumer will read up to offset number 5, and when it comes back, it will start reading at offset number 6. Consumers are organized into groups, with partition data distributed among the members of the group.
Connectors
Connectors are an abstraction over producers and consumers. Perhaps you need to export database records to Kafka. You configure a source connector to listen to certain database tables, and as records come in, the connector pulls them out and sends them to Kafka. Sink connectors do the opposite: If you want to write records to an external store such as MongoDB, for example, a sink connector reads records from a topic as they come in and forwards them to your MongoDB instance.
Definition
Kafka Streams is a java library: we write code, create a JAR file, and then start our standalone application that streams records to and from Kafka (it doesn’t run on the same node as the broker). And we can run Kafka Streams on anything from a laptop all the way up to a large server.
Basic operations
- mapping (
map((key, value) -> ..)
) - map values (
mapValues(value -> value.substring(5))
) - filtering (
filter((key, value) -> Long.parseLong(value) > 1000)
)
It’s important to think of these operations as creating a new event stream, rather than modifying an existing one.
KTables
At the start the videos mentioned that keys across records in event streams are completely independent of one another, even if they are identical.
Update Streams are the exact opposite: if a new record comes in with the same key as an existing record, the existing record will be overwritten.
Define a KTable
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> firstKTable =
builder.table(inputTopic,
Materialized.with(Serdes.String(), Serdes.String()));
Operations
- mapping
- mapping values
- filtering
GlobalKTable
The main difference between a KTable and a GlobalKTable is that a KTable shards data between Kafka Streams instances, while a GlobalKTable extends a full copy of the data to each instance. You typically use a GlobalKTable with lookup data. There are also some idiosyncrasies regarding joins between a GlobalKTable and a KStream.
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, String> globalKTable =
builder.globalTable(inputTopic,
Materialized.with(Serdes.String(), Serdes.String()));
Serialisation
The Kafka brokers receive and send bytes
JOINs
KStream<String, String> leftStream = builder.stream("topic-A");
KStream<String, String> rightStream = builder.stream("topic-B");
ValueJoiner<String, String, String> valueJoiner = (leftValue, rightValue) -> {
return leftValue + rightValue;
};
leftStream.join(rightStream,
valueJoiner,
JoinWindows.of(Duration.ofSeconds(10)));
- Stream-Stream joins:
- combine two event streams into a new event stream
- join events based on a common key
- records arrive within a defined window of time
- possible to compute a new value type
- keys are available in read-only mode and can be used in computing the new value
- Stream-Table joins:
- KStream-KTable
- KStream-GlobalKTable
- Table-Table joins
Stateful operations
- reduce
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> myStream = builder.stream("topic-A");
Reducer<Long> reducer = (longValueOne, longValueTwo) -> longValueOne + longValueTwo;
myStream.groupByKey().reduce(reducer,
Materialized.with(Serdes.String(), Serdes.Long()))
.toStream().to("output-topic");
- aggregate
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> myStream = builder.stream("topic-A");
Aggregator<String, String, Long> characterCountAgg =
(key, value, charCount) -> value.length() + charCount;
myStream.groupByKey().aggregate(() -> 0L,
characterCountAgg,
Materialized.with(Serdes.String(), Serdes.Long()))
.toStream().to("output-topic");
Stateful operations don’t emit results immediately. Kafka Streams has an internal buffering mechanism that caches results. Two factors control when the cache emits records: Records are emitted when the cache is full (defined equally per instance among the number of stores; it’s 10MB), and by default, Kafka Streams calls commit every 30 seconds (you don’t call commit yourself). At this point, you would see an update. In order to see every update that comes through your aggregation, you can set your cache size to zero (which is also useful for debugging).
Even with caching, you will get multiple results, so for a single and final stateful result, you should use suppression overloads with aggregate/reduce operations.
Windowing
Hopping
A hopping window is bound by time: You define the size of the window, but then it advances in increments smaller than the window size, so you end up with overlapping windows. You might have a window size of 30 seconds with an advance size of five seconds. Data points can belong to more than one window.
Tumbling
It’s a hopping window with an advance size that’s the same as its window size. So basically you just define a window size of 30 seconds. When 30 seconds are up, you get a new window with a time of 30 seconds. So you don’t get duplicate results like you do with the overlapping in hopping windows.
Sessions
So with session windows, you define an inactivityGap. Basically, as long as a record comes in within the inactivityGap, your session keeps growing. So theoretically, if you’re keeping track of something and it’s a very active key, your session will continue to grow.
Sliding
A sliding window is bound by time, but its endpoints are determined by user activity
Grade periods
With the exception of session windows, which are behavior-driven, all windows have the concept of a grace period. A grace period is an extension to the size of a window. Specifically, it allows events with timestamps greater than the window-end (but less than the window-end plus the grace period) to be included in the windowed calculation.
(I am writing this as I watch, and there are lots of overlaping material with the previous Kafka 101 and Flink 101 videos, which is kind of understandable since we are talking about streaming but I guess these courses are meant to be standalone rather than taken in a series one after another)
Testing streams
Kafka Streams unit testing can be challenging due to the need for broker connections, which are expensive for tests. While integration tests with a live broker are necessary, they should be limited. The TopologyTestDriver
addresses this by allowing unit tests for a Kafka Streams topology without requiring a broker, supporting end-to-end tests at a faster pace. It works by mocking the Kafka components and using a MockSchemaRegistry
for Schema Registry-related tests.
In the TopologyTestDriver
, you create and configure your topology, instantiate the driver, and then use TestInputTopic
to send records through the topology. The records can have timestamps for triggering stream-time behavior, and wall-clock time can be advanced using advanceWallClockTime
.
TestOutputTopic
is used to mock the sink nodes, allowing verification of results. For Kafka Streams DSL operators using lambdas, it’s often better to write a concrete class for easier unit testing.
Though TopologyTestDriver
is useful for unit tests, integration testing against a live broker is still essential, particularly for stateful operations. The TestContainers
library is recommended for controlling the broker lifecycle in integration tests, offering reduced testing time by sharing containers across tests.
Intro to ksqlDB
Definition
ksqlDB allows you to build stream processing applications on top of Apache Kafka with the ease of building traditional applications on a relational database. Using SQL to describe what you want to do rather than how, it makes it easy to build Kafka-native applications for processing streams of real-time data. Some key ksqlDB use cases include:
- materialized caches
- streaming ETL pipelines
- event-driven microservices
If we have a stream, some basic operations we can with ksql:
- filter
- join streams
- aggregate streams
Hands-on
Creating a stream
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
Add data
INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');
Filtering
CREATE STREAM ORDERS_NY AS
SELECT *
FROM ORDERS
WHERE ADDRESS->STATE='New York';
JOINs
CREATE STREAM ORDERS_ENRICHED AS
SELECT O.*,
I.*,
O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
FROM ORDERS O
LEFT OUTER JOIN ITEMS I
ON O.ITEMID = I.ID ;
Flattening nested records
CREATE STREAM ORDERS_FLAT AS
SELECT TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
ORDERID,
ITEMID,
ORDERUNITS,
ADDRESS->STREET AS ADDRESS_STREET,
ADDRESS->CITY AS ADDRESS_CITY,
ADDRESS->STATE AS ADDRESS_STATE
FROM ORDERS;
Splitting streams
CREATE STREAM ORDERS_US AS
SELECT * FROM ORDERS_COMBINED
WHERE SOURCE = 'US';
CREATE STREAM ORDERS_UK AS
SELECT * FROM ORDERS_COMBINED
WHERE SOURCE = 'UK';
CREATE STREAM ORDERS_OTHER AS
SELECT * FROM ORDERS_COMBINED
WHERE SOURCE != 'US'
AND SOURCE != 'UK';
Stateful aggregations
CREATE TABLE ORDERS_PER_HOUR_BY_MAKE AS
SELECT MAKE,
COUNT(*) AS ORDER_COUNT,
CAST(SUM(TOTAL_ORDER_VALUE) AS DECIMAL(9,2)) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE
EMIT CHANGES;
Pull vs Push queries
- Pull is like a normal select - shows a point in time view
- Push has the
EMIT CHANGES
which runs indefinitely; it tells us all value changes
Transform functions
CREATE STREAM transformed
AS SELECT id,
TRANSFORM(map_field,(k,v)=> UCASE(k), (k, v)=> (v * 2))
FROM stream1 EMIT CHANGES;
Reduce functions
CREATE STREAM reduced
AS SELECT name,
REDUCE(scores,0,(s,x)=> (s+x)) AS total
FROM stream2 EMIT CHANGES;
Filter functions
CREATE STREAM filtered
AS SELECT id,
FILTER(numbers,x => (x%2 = 0)) AS even_numbers
FROM stream3 EMIT CHANGES;
Schema Registry 101
The schema workflow
Example writing a schema file with Avro
In Protobuf
There are also more complex data types that we can include in the schemas (maps, enums, unions, arrays)
Effective schema management requires using/doing:
- schema IDs
- schema registration
- schema versioning
- viewing and retrieving schemas
Schema lifecycle
Schema compatibility
Producer and consumer clients have expectations of the structure of the object they’re working with.
Schema Registry offers compatibility verification. The available compatibility modes and associated checks establish guardrails that guide you to safely update schemas and allow you to keep your clients operational as they are updated with the changes.
Schema compatibility modes
Transitive schema compatibility modes
Schema compatibility considerations
- with Backward compatibility you can delete fields and add fields with default values and you update the consumer clients first
- with Forward compatibility you can delete fields with default values and add new fields. You need to update your producer clients first under forward compatibility
- with Full compatibility, both deleted or added fields must have a default value. Since every change to the schema has a default value, the order in which you update your clients doesn’t matter
That is all for today!
See you tomorrow :)