(Day 344) Checking out the first chapters from Andriy Burkov's new book

Ivan Ivanov · December 10, 2024

Hello :) Today is Day 344!

A quick summary of today:

  • new Andriy Burkov book
  • more about Flink and Kafka

Andriy Burkov’s new book on LLMs

I saw that Andriy Burkov put out the first 4 chapters of his upcoming book online for free ~ so it’s time to check them out.


Chaprer 1 - ML Basics

AI

  • first chatbot ELIZA in 1966 that answered depending on input pattern matching
  • AI winters 1974-1980 and 1987-2000

image

  • the term AI was somewhat of a taboo during AI winters and researchers renamed their works to things like pattern matching, smart decision systems
  • things picked up again since 2012

ML

  • decision boundaries
  • decision trees
  • random forests
  • SVMs

Model

  • y = f(x)
  • f(x) = wx + b
  • MSE and the loss function
  • an overview of differentiation and derivatives

4-step ML process

  1. collect a dataset
  2. define the model’s structure
  3. define the loss function
  4. minimise the loss function

Chapter 2 - Vectors, Matrices, and Neural networks

Vectors

  • dot product
  • element-wise sum and product
  • unit vectors, zero vectors, vector norm

image

Unit vectors are useful since their dot product equals the cosine of the angle between them, and computing the dot product is fast. When documents are represented as unit vectors, finding similar ones to a query becomes efficient by calculating the dot product between the query’s vector and the document vectors.

Neural networks

  • a fixed non-linear function is applied to the outputs of trainable linear functions
  • deeper structure compared to linear models

They are of the structure: ф(wx + b), where ф is an activation function. And with layers these are stacked

image

Matrices

  • sum and product of matrices
  • transposing a matrix
  • matrix-vector operations

There are these very neat graphs showing how it works:

image

Gradient Descent

  • intro to the cross-entropy loss
  • intro to gradient descent
    • initialise params
    • compute preds
    • compute the gradient
    • update w and b based on a learning rate
    • calculate the loss
    • iterate

Chapter 3 - Representing text

Bag of Words

  1. create a vocabulary: a list all unique words in the corpus
  2. vectorize documents: represent each document as a feature vector where each dimension of the vector indicates whether a word from the vocabulary is present, absent, or its frequency in the document

Feature vectors can be organized into a document-term matrix, where rows represent documents, and columns represent tokens.

image

In natural languages, word frequencies follow Zipf’s Law. According to the law, a word’s frequency is inversely proportional to its rank in a frequency table. For example, the second most frequent word occurs half as often as the most frequent one. Consequently, most words in a document-term matrix are rare, resulting in a sparse matrix filled with many zeros.

BoW cannot capture order or context of words - for this n-grams can be used. But for n-grams the problem becomes that the vocab grows significantly. BoW also does not work well with out-of-bag words - words not seen in the original vocabulary. This is all addressed with word embeddings

Word embeddings

Word embeddings address the limitations of traditional bag-of-words (BoW) and one-hot encoding by representing words as dense vectors that encode semantic relationships. Unlike one-hot vectors, which are sparse and carry little information, embeddings are lower-dimensional, non-sparse, and map semantically similar words like films and movies to vectors with high cosine similarity.

The word2vec skip-gram algorithm is a popular method for learning embeddings. It trains a model to predict a word’s context from the word itself or vice versa

image

The image is of a skip-gram model with a skip-gram size of 5 and the embedding layer of 300 units

Word embeddings are fundamental in NLP and modern models also integrate subword tokenization, such as byte-pair encoding, to enhance representation capabilities.

Byte-Pair Encoding (BPE)

It tries to handle the difficulty of handling out-of-bag words by splitting words into smaller pieces called subwords

  1. Initialization:
    • use a text corpus, splitting each word into individual characters (i.e ‘hello’ -> “h e l l o”)
    • the initial vocabulary comprises all unique characters in the corpus
  2. Iterative merging:
    • count adjacent symbol pairs: identify all pairs of adjacent symbols in the corpus (e.g. ‘h e’, ‘e l’, ‘l l’, ‘l o’)
    • select the most frequent pair: choose the pair with the highest count (e.g.’l l’)
    • merge the selected pair: replace all occurrences of the pair with a single merged symbol (e.g. ‘l l’ → ‘ll’, transforming ‘h e l l o’ into ‘h e ll o’)
    • update the vocabulary: add the new merged symbol to the vocabulary
  3. Repeat:
    • continue merging the most frequent pairs until the vocabulary reaches the desired size

Chapter 4 - Language Model Basics

Language model

An LM predicts the next token in a sequence by estimating its conditional probability based on previous tokens. It assigns a probability to all possible next tokens, enabling the selection of the most likely next one. Such models are also called autoregressive because for the next prediction they only use the preceding elements.

There are also masked LMs like BERT that mask a word in a sentence and try to predict it based on its surroundings

Evaluating LMs

  • perplexity: a perplexity of 5 (for instance) means that, on average, the model acts as if it selects from roughly 5 equally likely options for each word in the given context. A perplexity of 5 is relatively low, showing the model is confident in its predictions for this sequence. But for a proper evaluation, perplexity must be measured on a much larger test set
  • ROUGE: used for comparing summarisations and machine translations
  • human evaluation (likert scale ratings, pairwise comparison, elo ratings)

Introduction to dataflow programming

Dataflow graphs

A dataflow program describes how data flows between operations

image

This is a logical dataflow graph to continuously count hashtags (nodes represent operators and edges denote data dependencies)

The above is a logical graphe because it shows a high-level view of the computational logic

In order to execute a dataflow program, its logical graph is converted into a physical dataflow graph, which specifies in detail how the program is executed. If we use distributed computing, the physical plan might look like:

image

Data parallelism and Task parallelism

We can exploit parallelism in dataflow graphs in different ways

  • we can partition the input data and have tasks of the same operation execute on the data subsets in parallel (data parallelism)
  • we can have tasks from different operators performing computations on the same or different data in parallel (task parallelism)

Data exchange strategies

They define how data items are assigned to tasks in a physical dataflow graph. They can also be automatically chosen by the execution engine depending on the semantics of the operators or explicitly imposed by the dataflow programmer. Here are some strategies:

  • forward: it sends data from a task to a receiving task. If both tasks are located on the same physical machine (which is often ensured by task schedulers), this exchange strategy avoids network communication
  • broadcast: it sends every data item to all parallel tasks of an operator. Because this strategy replicates data and involves network communication, it is fairly expensive
  • key-based: it partitions data by a key attribute and guarantees that data items having the same key will be processed by the same task
  • random: it uniformly distributes data items to operator tasks in order to evenly distribute the load across computing tasks

image

Processing streams in parallel

Latency and Throughput

Latency indicates how long it takes for an event to be processed. Essentially, it is the time interval between receiving an event and seeing the effect of processing this event in the output.

Throughput is a measure of the system’s processing capacity—its rate of processing. That is, throughput tells us how many events the system can process per time unit.

Operations on data streams

  • stateless vs stateful operations

This was covered very well by the Kafka 101 on Confluent. Definitely recommend that course. And to be honest this whole book’s material is probably covered by the free video courses on Confluent 😆 I am reading in order to confirm my knowledge and see if there is something new mentioned.

Time semantics

Processing time

Processing time is the time of the local clock on the machine where the operator processing the stream is being executed. A processing-time window includes all events that happen to have arrived at the window operator within a time period, as measured by the wall clock of its machine

image

Event time

Event time is the time when an event in the stream actually happened. Event time is based on a timestamp that is attached to the events of the stream. Timestamps usually exist inside the event data before they enter the processing pipeline (e.g. the event creation time)

image

Operations based on event time are predictable and their results are deterministic. An event time window computation will yield the same result no matter how fast the stream is processed or when the events arrive at the operator.

Handling even delays can be a challenge

Watermarks

A progress metric used in event-time window processing to signal when all events up to a certain timestamp are expected to have been received. They help manage out-of-order events and trigger computations based on event time.

Tradeoffs

  • eager watermarks: lower latency but less confidence, requiring mechanisms to handle late events
  • relaxed watermarks: higher confidence but increased processing latency

Challenges:

  • perfect watermarking is difficult due to unpredictable delays in distributed systems (i.e straggler tasks, disconnected users)
  • systems must handle late events through strategies like ignoring, logging, or using them to adjust results

Processing vs Event time

Processing time and event time serve different purposes in data processing. Processing time is advantageous when low latency is critical, as it immediately triggers computation after a specified duration, ignoring late or out-of-order events. This makes it suitable for applications prioritizing speed over accuracy, such as real-time monitoring dashboards or scenarios requiring a direct representation of event streams, like outage detection. However, results depend on processing speed and lack determinism. Conversely, event time ensures deterministic results by handling late or out-of-order events, making it ideal for use cases requiring accuracy and completeness over immediacy.

State and consistency models

State operations impose a few challenges:

  • state management
  • state partitioning
  • state recovery

Task failures

For each event in the input stream, a task is a processing step that performs the following steps:

  1. receives the event, storing it in a local buffer
  2. possibly updates internal state
  3. produces an output record.

A failure can occur during any of these steps and the system has to clearly define its behavior in a failure scenario.

Result guarantees

  • at-most-once
  • at-least-once
  • exactly-once
  • end-to-end exactly-once

Kafka Streams 101

image

At the heart of Kafka is the log, which is simply a file where records are appended. The log is immutable, but you usually can’t store an infinite amount of data, so you can configure how long your records live.

The storage layer for Kafka is called a broker, and the log resides on the broker’s filesystem. A topic is simply a logical construct that names the log—it’s effectively a directory within the broker’s filesystem.

You put data into Kafka with producers and get it out with consumers: Producers send a produce request with records to the log, and each record, as it arrives, is given a special number called an offset, which is just the logical position of that record in the log. Consumers send a fetch request to read records, and they use the offsets to bookmark, like placeholders. For example, a consumer will read up to offset number 5, and when it comes back, it will start reading at offset number 6. Consumers are organized into groups, with partition data distributed among the members of the group.

Connectors

Connectors are an abstraction over producers and consumers. Perhaps you need to export database records to Kafka. You configure a source connector to listen to certain database tables, and as records come in, the connector pulls them out and sends them to Kafka. Sink connectors do the opposite: If you want to write records to an external store such as MongoDB, for example, a sink connector reads records from a topic as they come in and forwards them to your MongoDB instance.

Definition

Kafka Streams is a java library: we write code, create a JAR file, and then start our standalone application that streams records to and from Kafka (it doesn’t run on the same node as the broker). And we can run Kafka Streams on anything from a laptop all the way up to a large server.

Basic operations

  • mapping (map((key, value) -> ..))
  • map values (mapValues(value -> value.substring(5)))
  • filtering (filter((key, value) -> Long.parseLong(value) > 1000))

It’s important to think of these operations as creating a new event stream, rather than modifying an existing one.

KTables

At the start the videos mentioned that keys across records in event streams are completely independent of one another, even if they are identical.

Update Streams are the exact opposite: if a new record comes in with the same key as an existing record, the existing record will be overwritten.

Define a KTable

 StreamsBuilder builder = new StreamsBuilder();
 KTable<String, String> firstKTable = 
    builder.table(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

Operations

  • mapping
  • mapping values
  • filtering

GlobalKTable

The main difference between a KTable and a GlobalKTable is that a KTable shards data between Kafka Streams instances, while a GlobalKTable extends a full copy of the data to each instance. You typically use a GlobalKTable with lookup data. There are also some idiosyncrasies regarding joins between a GlobalKTable and a KStream.

 StreamsBuilder builder = new StreamsBuilder();
 GlobalKTable<String, String> globalKTable = 
    builder.globalTable(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

Serialisation

image

The Kafka brokers receive and send bytes

JOINs

KStream<String, String> leftStream = builder.stream("topic-A");
KStream<String, String> rightStream = builder.stream("topic-B");

ValueJoiner<String, String, String> valueJoiner = (leftValue, rightValue) -> {
    return leftValue + rightValue;
};
leftStream.join(rightStream, 
                valueJoiner, 
                JoinWindows.of(Duration.ofSeconds(10)));
  • Stream-Stream joins:
    • combine two event streams into a new event stream
    • join events based on a common key
    • records arrive within a defined window of time
    • possible to compute a new value type
    • keys are available in read-only mode and can be used in computing the new value
  • Stream-Table joins:
    • KStream-KTable
    • KStream-GlobalKTable
  • Table-Table joins

Stateful operations

  • reduce
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> myStream = builder.stream("topic-A");
Reducer<Long> reducer = (longValueOne, longValueTwo) -> longValueOne + longValueTwo;
myStream.groupByKey().reduce(reducer,
                             Materialized.with(Serdes.String(), Serdes.Long()))
                            .toStream().to("output-topic");
  • aggregate
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> myStream = builder.stream("topic-A");

Aggregator<String, String, Long> characterCountAgg = 
    (key, value, charCount) -> value.length() + charCount;
myStream.groupByKey().aggregate(() -> 0L, 
                                      characterCountAgg, 
                                      Materialized.with(Serdes.String(), Serdes.Long()))
                                      .toStream().to("output-topic");

Stateful operations don’t emit results immediately. Kafka Streams has an internal buffering mechanism that caches results. Two factors control when the cache emits records: Records are emitted when the cache is full (defined equally per instance among the number of stores; it’s 10MB), and by default, Kafka Streams calls commit every 30 seconds (you don’t call commit yourself). At this point, you would see an update. In order to see every update that comes through your aggregation, you can set your cache size to zero (which is also useful for debugging).

Even with caching, you will get multiple results, so for a single and final stateful result, you should use suppression overloads with aggregate/reduce operations.

Windowing

Hopping

A hopping window is bound by time: You define the size of the window, but then it advances in increments smaller than the window size, so you end up with overlapping windows. You might have a window size of 30 seconds with an advance size of five seconds. Data points can belong to more than one window.

Tumbling

It’s a hopping window with an advance size that’s the same as its window size. So basically you just define a window size of 30 seconds. When 30 seconds are up, you get a new window with a time of 30 seconds. So you don’t get duplicate results like you do with the overlapping in hopping windows.

Sessions

So with session windows, you define an inactivityGap. Basically, as long as a record comes in within the inactivityGap, your session keeps growing. So theoretically, if you’re keeping track of something and it’s a very active key, your session will continue to grow.

Sliding

A sliding window is bound by time, but its endpoints are determined by user activity

Grade periods

With the exception of session windows, which are behavior-driven, all windows have the concept of a grace period. A grace period is an extension to the size of a window. Specifically, it allows events with timestamps greater than the window-end (but less than the window-end plus the grace period) to be included in the windowed calculation.

(I am writing this as I watch, and there are lots of overlaping material with the previous Kafka 101 and Flink 101 videos, which is kind of understandable since we are talking about streaming but I guess these courses are meant to be standalone rather than taken in a series one after another)

Testing streams

Kafka Streams unit testing can be challenging due to the need for broker connections, which are expensive for tests. While integration tests with a live broker are necessary, they should be limited. The TopologyTestDriver addresses this by allowing unit tests for a Kafka Streams topology without requiring a broker, supporting end-to-end tests at a faster pace. It works by mocking the Kafka components and using a MockSchemaRegistry for Schema Registry-related tests.

In the TopologyTestDriver, you create and configure your topology, instantiate the driver, and then use TestInputTopic to send records through the topology. The records can have timestamps for triggering stream-time behavior, and wall-clock time can be advanced using advanceWallClockTime.

TestOutputTopic is used to mock the sink nodes, allowing verification of results. For Kafka Streams DSL operators using lambdas, it’s often better to write a concrete class for easier unit testing.

Though TopologyTestDriver is useful for unit tests, integration testing against a live broker is still essential, particularly for stateful operations. The TestContainers library is recommended for controlling the broker lifecycle in integration tests, offering reduced testing time by sharing containers across tests.


Intro to ksqlDB

Definition

ksqlDB allows you to build stream processing applications on top of Apache Kafka with the ease of building traditional applications on a relational database. Using SQL to describe what you want to do rather than how, it makes it easy to build Kafka-native applications for processing streams of real-time data. Some key ksqlDB use cases include:

  • materialized caches
  • streaming ETL pipelines
  • event-driven microservices

image

If we have a stream, some basic operations we can with ksql:

  • filter
  • join streams
  • aggregate streams

image

Hands-on

Creating a stream

CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
  WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');

Add data

INSERT INTO MOVEMENTS VALUES ('Allison', 'Denver');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('Robin', 'Ilkley');
INSERT INTO MOVEMENTS VALUES ('Allison', 'Boulder');

image

Filtering

CREATE STREAM ORDERS_NY AS
  SELECT *
    FROM ORDERS
   WHERE ADDRESS->STATE='New York';

JOINs

CREATE STREAM ORDERS_ENRICHED AS
SELECT O.*,
       I.*,
       O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
  FROM ORDERS O
       LEFT OUTER JOIN ITEMS I
       ON O.ITEMID = I.ID ;

Flattening nested records

CREATE STREAM ORDERS_FLAT AS
  SELECT TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
         ORDERID,
         ITEMID,
         ORDERUNITS,
         ADDRESS->STREET AS ADDRESS_STREET,
         ADDRESS->CITY   AS ADDRESS_CITY,
         ADDRESS->STATE  AS ADDRESS_STATE
    FROM ORDERS;

Splitting streams

CREATE STREAM ORDERS_US AS
   SELECT * FROM ORDERS_COMBINED
   WHERE SOURCE = 'US';

CREATE STREAM ORDERS_UK AS
   SELECT * FROM ORDERS_COMBINED
   WHERE SOURCE = 'UK';

CREATE STREAM ORDERS_OTHER AS
   SELECT * FROM ORDERS_COMBINED
   WHERE SOURCE != 'US'
   AND SOURCE != 'UK';

Stateful aggregations

CREATE TABLE ORDERS_PER_HOUR_BY_MAKE AS
  SELECT MAKE,
         COUNT(*) AS ORDER_COUNT,
         CAST(SUM(TOTAL_ORDER_VALUE) AS DECIMAL(9,2)) AS TOTAL_ORDER_VALUE
    FROM ORDERS_ENRICHED
          WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY MAKE
  EMIT CHANGES;

Pull vs Push queries

  • Pull is like a normal select - shows a point in time view
  • Push has the EMIT CHANGES which runs indefinitely; it tells us all value changes

Transform functions

CREATE STREAM transformed 
  AS SELECT id,
  TRANSFORM(map_field,(k,v)=> UCASE(k), (k, v)=> (v * 2))
FROM stream1 EMIT CHANGES;

Reduce functions

CREATE STREAM reduced 
  AS SELECT name,
  REDUCE(scores,0,(s,x)=> (s+x)) AS total
FROM stream2 EMIT CHANGES;

Filter functions

CREATE STREAM filtered 
  AS SELECT id,
  FILTER(numbers,x => (x%2 = 0)) AS even_numbers
FROM stream3 EMIT CHANGES;

Schema Registry 101

image

image

image

The schema workflow

Example writing a schema file with Avro

image

In Protobuf

image

There are also more complex data types that we can include in the schemas (maps, enums, unions, arrays)

Effective schema management requires using/doing:

  • schema IDs
  • schema registration
  • schema versioning
  • viewing and retrieving schemas

Schema lifecycle

image

Schema compatibility

Producer and consumer clients have expectations of the structure of the object they’re working with.

Schema Registry offers compatibility verification. The available compatibility modes and associated checks establish guardrails that guide you to safely update schemas and allow you to keep your clients operational as they are updated with the changes.

Schema compatibility modes

image

Transitive schema compatibility modes

image

Schema compatibility considerations

image

  • with Backward compatibility you can delete fields and add fields with default values and you update the consumer clients first
  • with Forward compatibility you can delete fields with default values and add new fields. You need to update your producer clients first under forward compatibility
  • with Full compatibility, both deleted or added fields must have a default value. Since every change to the schema has a default value, the order in which you update your clients doesn’t matter

That is all for today!

See you tomorrow :)