Hello :) Today is Day 244!
A quick summary of today:
- started reading Streaming Databases on O’Reilly
- covered a short on-demand course on advanced RAG techniques
Streaming Databases Chapter 1: Streaming Foundations
Externalizing Database Features
Write-Ahead Log
The WAL is a mechanism that allows databases to ensure data durability and consistency. The spinning disks that databases write data upon don’t offer transactions. So databases are challenged to provide transactionality atop a device that doesn’t offer transactions. WALs are a way for databases to provide transactionality without having transactional disks.
A transaction in a database refers to a sequence of one or more database operations executed as a single unit of work. These operations can include data insertion (INSERT), data modification (UPDATE), or data deletion (DELETE). The WAL acts as a buffer that can be overwritten as new changes are made. The WAL persists the change to disk
When saving transactions on disk, the database follows these steps:
- The client starts a transaction by issuing a BEGIN statement
- The database writes a record to the WAL indicating that a transaction has started.
- The client makes changes to the database data.
- The client commits the transaction by issuing a COMMIT statement.
- The database writes a record to the WAL indicating that the transaction has been committed.
- The changes made by the transaction are written to disk.
Streaming Platforms
The most popular one is Apache Kafka, but there are alternatives. Platforms like that are distributed, scalable, and fault-tolerant systems designed to handle real-time data streams. They provide a powerful infrastructure for ingesting, storing, and processing large volumes of continuous data from various sources.
Most streaming platforms have a construct called partitions. These constructs mimic WALs in a database. Transactions are appended to partitions like transactions to a WAL. Streaming platforms can hold many partitions to distribute the stream load to promote horizontal scaling. Partitions are grouped in abstractions called topics to which applications either publish or consume transactions.
By publishing the transactions to the streaming platform, you’ve published it to all subscribers who may want to consume it. This is called a publish and subscribe model, and it’s critical to allow multiple disparate consumers to use these transactions.
Materialized Views
In typical OLTP databases, materialized views are special types of database objects that store the results of a precomputed query or aggregation. Unlike regular views, which are virtual and dynamically generate their results based on the underlying data, materialized views store the actual data, making them physically stored in the database.
The purpose of materialized views is to improve the performance of complex queries or aggregations by precomputing and storing the results. When a query references a materialized view, the database can quickly retrieve the precomputed data from the materialized view instead of recalculating it from the base data tables. This can significantly reduce the query execution time and improve overall database performance, especially for large and resource-intensive queries
By enabling materialized views to be updated, the stored data will always be fresh; that is, the stored data is real-time data. This characteristic makes materialized views fit naturally into streaming frameworks.
Connectors
In streaming, there are two types of connectors:
Source Connectors
Source connectors read data from a data source system (e.g., a database) and make that data available as an event stream.
Sink Connectors
Sink connectors consume data from an event stream and write that data to a sink system (a database again, a data warehouse, a data lake, etc.).
The two connector types are below. In most cases, source connectors either transform data at rest into streaming data (aka data in motion), whereas sink connectors transform streaming data into data at rest.
With data at rest, the data is sitting in a database or a filesystem and not moving. Data at rest tends to get processed using batching or microbatching techniques. A dataset batched from one source system to another has a beginning and an end. The applications that process batched data can be started using a job scheduler like cron, and the data processing ends when the dataset ends.
This is the opposite of streaming, or data in motion. Data in motion implies that there is neither a beginning nor an end to the data. Applications that process streaming data are always running and listening for new data to arrive on the stream.
Now let’s dive into how source and sink connectors can be implemented.
Connector Middleware
Connector middleware solutions such as Kafka Connect, Meroxa, Striim, or StreamSets already provide a large number of connectors out of the box and are often extensible to serve further sources and sinks. Connector middlewares also offer horizontal scaling, monitoring, and other required features, especially for production deployments.
Kafka Connect is part of the Apache Kafka project. It is a distributed cluster in which Kafka connectors are deployed to run in parallel. These types of deployments create complexity in the streaming architecture. These clusters are bulky, and maintenance of them is arduous.
If you have a large amount of data sources and sinks, these clusters often become costly and consume a lot of resources. Delegation of this integration is better solved by embedding the connectors into the systems themselves.
Embedded
An increasing number of databases offer embedded connectors to streaming platforms. CockroachDB is an example of this. An even larger set of databases has implemented embedded connectors; that is, they can consume data off the event stream themselves. Examples are Apache Druid, Apache Pinot, ClickHouse, StarRocks, Apache Doris, and Rockset.
Having databases solve the integration to streaming platforms gets them closer to becoming streaming databases. If you enable databases with the ability to pull and push data into streaming platforms, streaming will naturally become a first-class citizen in the database.
Custom-Built
Connectors can be custom-built, for example, by implementing a dedicated microservice. The advantage of this approach is its flexibility; the downside is clearly the need to “reinvent the wheel”—it often doesn’t make sense to implement connectors from scratch, especially in light of the plethora of existing powerful and scalable open source connectors (e.g., the Debezium source connectors for the Kafka Connect middleware).
Chapter 2: Stream Processing Platforms
Events tend to need cleansing and preparation before they undergo analytical processing. Events also need to be enriched with context for them to be useful enough to derive insights. Analytical processing heavily relies on the accuracy and reliability of the data. By addressing issues such as missing values, inconsistencies, duplicates, and outliers, data quality is improved, leading to more reliable and accurate analytical results.
Event data preparation can also significantly impact the performance of analytical queries. By optimizing the data layout, indexing, and partitioning, the efficiency of data retrieval and processing can be improved. This includes techniques such as data denormalization, columnar storage, and indexing strategies tailored for the analytical workload. Well-prepared data can reduce the processing time and enable faster insights.
Event data preparation also plays a crucial role in ensuring data governance and compliance. This involves enforcing data security measures, anonymizing sensitive information, and adhering to privacy regulations. By properly preparing the data, organizations can maintain data integrity, protect privacy, and comply with legal and ethical requirements.
Above, the dotted line is the processing stage.
Transformation tasks tend to be resource consuming and process intensive. It’s best to complete transformations as part of a preprocessing step in a data pipeline before writing to a data store that serves it to consumers. The earlier the transformations can be done, the better—as executing them incrementally, seeing only small amounts of data at once, is much less resource consuming than having to scour through large amounts of data already at rest later.
The destination data stores are typically online analytical processing, or OLAP, data stores. OLAP data stores allow users to invoke analytical queries on the data. Real-time OLAP (RTOLAP) data stores are also OLAP but are optimized for serving real-time analytical data.
It’s best practice to avoid process-intensive tasks in the OLAP that serve analytics to the consumers. OLAPs need to reserve their resources for quickly responding to analytical queries that answer questions related to the business or provide fast insights to customers—another reason for executing the preprocessing as early as possible in the pipeline.
Stateful Transformations
- Streaming platforms like Kafka:
- Do not transform data in a way that allows them to hold state.
- Cannot perform complex transformations like aggregations and joins directly.
- Require additional components like Kafka Streams for complex transformations.
- Data Pipelines:
- Follow two common patterns for data integration and processing:
- ETL: Data is extracted, transformed, then loaded into the destination.
- ELT: Data is extracted, loaded, and then transformed.
- Follow two common patterns for data integration and processing:
- ETL and State Management:
- Stateful transformations in ETL require data to be temporarily stored to maintain and update information between multiple input data elements.
- Examples of stateful transformations:
- Rolling averages: Maintain sum and count of data elements to calculate averages over a sliding window.
- Sessionization: Group related events into sessions based on criteria like user activity within a time threshold.
- Deduplication: Compare incoming events with previously stored events to filter out duplicates.
- Windowed aggregations: Perform aggregations (e.g., sum, count) over data windows by holding state.
- Machine learning models: Continuously update and refine models using streaming data, requiring state to hold learned parameters.
- Stateless Transformations:
- Some streaming platforms offer stateless transformations, called functions or single message transforms.
- These include simple operations like string transformations or basic math operations, not qualifying as stateful transformations in ETL/ELT.
- Limitations of Pure Streaming Platforms:
- Kafka and similar platforms only handle data extraction (“E”).
- Loading data does not occur until stateful transformations are applied.
- Pure streaming platforms cannot directly serve user-facing applications due to the lack of cleansing, preparation, and enrichment of events.
- Role of Stream Processing Platforms:
- Stream processing platforms (e.g., Kafka Streams) hold state and perform complex transformations.
- These platforms are essential for streaming ETL pipelines, enabling real-time analytics.
- Data pipelines collect data from various sources, transform it for analysis, and deliver it to analytics tools in real-time.
Data Pipelines
One of the roles of the data pipeline is to move the data from the operational data plane to the analytical data plane. The operational data plane is where the applications live, including microservices and OLTP databases. The analytical data plane is where the analytical systems live—like data warehouses, data lakes, lakehouses—all kinds of OLAP data stores. Data pipelines live in between the operational and analytical data planes, transforming and integrating data from one to the other, respectively.
Once the data is extracted, it undergoes transformation tasks previously mentioned to convert it into a suitable format for the target system or data warehouse, or OLAP data store. The transformed data is typically stored temporarily in an intermediate store.
Stream Processors
Popular ones:
-
Apache Kafka Streams is a stream processing library for JVM-based programming languages that is part of the Apache Kafka project. It allows developers to build real-time applications and microservices that consume, process, and produce streams of data from/to Kafka.
-
Apache Flink is a stream processor that supports both batch and stream processing and can be connected to a large variety of sources and sinks, including Kafka, Pulsar, and Kinesis, as well as databases like MongoDB and Elasticsearch. Flink views batch as a special case of streaming (streaming with bounded data). Contrary to Kafka Streams, Flink is not a library but runs on its own cluster.
-
Spark Structured Streaming is a component of Apache Spark that enables stream processing. It supports a large variety of connectors and is also cluster based. It is set apart from other stream processors like Kafka Streams, Flink, and Samza by making use of mini-batching instead of native stream processing—for Spark, streaming is a special case of batch rather than the other way around, as in Flink.
-
Apache Samza is a stream processor developed by LinkedIn. It supports Kafka, Azure Event Hubs, Kinesis, and HDFS and is also cluster based like Flink.
-
Apache Beam is not a stream processor itself but a unified programming model and set of software development kits (SDKs) for building data processing pipelines. It provides an abstraction that allows developers to write data-processing jobs that can be executed on various distributed processing engines, such as Apache Flink, Apache Spark, Apache Samza, and Google Cloud Dataflow.
Two Types of Streams
There are two types of real-time streams of data that flow through stream processors—change and append-only streams
Above, the topics are highlighted: Click event, Product, and Customer. The click event stream is append-only, which means it contains discrete, distinct events. Why do we know this? Clicks are always unique. Every click event is different, even if it’s done on the same T-shirt by the same customer. The only difference is the time it was clicked. In an append-only stream, the events are associated with a timestamp that indicates when each event occurred. The timestamp provides temporal information, allowing for the ordering and sequencing of events within the stream. This also applies to other types of events, not just click events.
If clicks were written to an OLTP database, the database would run out of space very quickly because every click event is a unique event; that is, we would trigger an insert into the OLTP table for every click on the application. Hence, using an OLTP database for click events would be bad design—it’s clearly better to use an append-only stream of events here.
Conversely, change stream data almost always comes from CDC events. These events are the transactions applied to a table in the OLTP database. These transactions are inserts, updates, and deletes. These records will not grow as fast as clickstream data because some events will include updates or deletes of existing records in a table.
Also, customer data may not change a lot. Customers may change their names, emails, or phone numbers, but probably not that often. Products would have a lot more changes, but still not many compared to clicks on an application. Also, change streams tend to represent changes to dimensional data. Dimensions change slowly in a database because they typically represent properties that do not frequently or rapidly change for the entities they describe.
Chapter 3: Serving Real-Time Data
This chapter will talk about delivering enriched real-time data to the end user. This stage of the real-time data pipeline is the last mile streaming data takes before it’s presented to the end user.
Real-Time Expectations
since we want to serve analytics in real time, we should consider some metrics:
-
Latency: Measures the time it takes for an analytics query or computation to complete and return results. In real-time analytics, low latency is crucial to provide near-instantaneous insights to users. SLA metrics may define acceptable latency thresholds, such as average response time or maximum response time, to ensure the timely delivery of analytics results.
-
Throughput and concurrency: Measures the number of analytics queries or computations that can be processed within a given time frame. It indicates the system’s capacity to handle concurrent requests and is especially important for high-volume scenarios. SLA metrics may specify a target throughput, such as queries per second or computations per minute, to ensure sufficient capacity for real-time analytics workloads.
-
Data freshness: Indicates how up-to-date the analytics results are in relation to the underlying data streams. It measures the delay between when the data is generated and when it becomes available for analysis. SLA metrics may specify acceptable data freshness requirements, such as a maximum delay in seconds or minutes, to ensure that users have access to the most recent information
-
Accuracy: Measures the correctness and precision of the analytics results. SLA metrics may define acceptable error rates, confidence intervals, or validation criteria to ensure the accuracy of real-time analytics.
-
Other: availability, consistency, scalability, security and privacy
Choosing an Analytical Data Store
Here are some examples of data stores that can satisfy these strict real-time SLAs:
-
In-memory databases, such as Redis, SingleStore (formerly MemSQL), Hazelcast, or Apache Ignite, store data in memory for fast access and processing. These databases offer extremely low latency and high throughput, making them suitable for real-time analytics that require near-instantaneous responses.
-
RTOLAP data stores, such as Apache Pinot, Apache Druid, ClickHouse, StarRocks, and Apache Doris. These data stores tend to be column-oriented distributed data stores. They organize data by column rather than row, enabling efficient analytics and query processing. These databases can handle large volumes of data and offer excellent scalability and high availability, making them well-suited for real-time analytics at scale.
-
Hybrid transactional/analytical processing (HTAP) data stores such as TiDB or SingleStore (formerly MemSQL) support both real-time transactional processing and analytics within a single system. These databases provide the capability to serve real-time analytics directly on operational data, minimizing data movement and reducing latency.
Sourcing from a Topic
Streaming platforms provide a publish and subscribe model for distributing streaming data, which means other systems, domains, and use cases can be consuming the same data from the sink topic. The preprocessing work done in the stream processing platform may have prepared the data only to the point where all consumers can use the data, in a data format that corresponds to a common denominator for them. This leaves any consumer-specific data preparation up to the consumer to implement.
for example, Domain consumer 1 needs the timestamps in the data to be seconds while Domain consumer 2 needs the data to be in the YEAR-MONTH-DAY format. The engineer who implemented the transformation in the stream processing platform could decide to serve the timestamp in milliseconds so that both consumers can derive their preferred timestamp values. This would also satisfy any future consumer who needs the data in milliseconds.
These scenarios require queries to implement the transformation to get the timestamp value to their required format. Unfortunately, this transformation will slow the query down. It would also need to be executed for every run of the query and could eventually break the SLA for latency. This would eventually cause the application or dashboard to slow down and create an unsatisfactory experience for the data consumers. Any consumer-specific data preparation left for the consumer to implement will always affect the SLAs for serving real-time analytics.
Ingestion Transformations
In RTOLAP systems, streaming data can be preprocessed before being stored, allowing data transformations (e.g., timestamp conversion) in-flight. Most RTOLAPs do stateless preprocessing, but some, like Apache Pinot, can perform stateful transformations during ingestion, which can optimize query performance. This stateful transformation could allow for creating denormalized views, making analytical queries faster by reducing the need for real-time joins. Ingestion transformations also optimize data for analytical queries by using formats like columnar storage, which is more efficient for OLAP systems than row-based formats used in OLTP databases.
OLTP Versus OLAP
OLTP Databases
- Used for transactional workloads on the operational plane.
- Designed for real-time transactional operations, supporting CRUD (Create, Retrieve, Update, Delete) operations.
- Optimized for handling a large number of concurrent transactions, ensuring data integrity and consistency.
- Typically have normalized data structures to minimize redundancy and maintain consistency.
- Support ACID properties:
- Atomicity: Ensures transactions are all-or-nothing.
- Consistency: Maintains database rules and constraints.
- Isolation: Prevents transaction interference.
- Durability: Guarantees permanent data storage post-transaction.
OLAP Data Stores
- Used for analytical workloads on the analytical plane.
- Employ different optimization techniques compared to OLTP databases.
- Optimized for fast analytical queries, often using columnar storage for better performance.
Row- vs. Column-Based Optimization
Row-Based Format
- Data is stored row by row, similar to spreadsheets.
- Optimized for transactional processing, with better performance in retrieving and updating entire rows.
- Offers some compression techniques but can lead to higher disk I/O during queries.
Column-Based Format
- Data is stored column by column, ideal for analytical queries that focus on specific columns.
- Provides better compression ratios, reducing storage needs.
- Designed to read only the required columns, improving query efficiency and performance.
Table: Row- vs. Column-Based Optimization Differences
Properties | Row-Based | Column-Based |
---|---|---|
Data Storage | Each row is stored together, less optimized for analytical queries. | Each column is stored together, optimized for analytical queries. |
Query Performance | Suited for transactional operations. | Excels in analytical operations. |
Compression | Offers some compression techniques with overhead. | Provides better compression ratios. |
Query Efficiency | Higher disk I/O for partial queries. | Reads only necessary columns, improving performance. |
Queries Per Second and Concurrency
Queries per second (QPS) is the measure of how well an OLAP data store can return the results of a query. This in turn indicates the volume of queries the OLAP can process within a second and, ultimately, the number of concurrent queries that can be invoked.
In our clickstream use case, we did not specify how many end users may be viewing the analytics. The assumption is for every end user, there will be at least one query. We did indicate that this is a real-time use case. Hence the data freshness on the dashboard is expected to be real time. Real-time dashboards tend to require a high rate of refreshes to keep the charts in the dashboard as real time as possible. There is a query for every refresh of the dashboard.
For 1,000 users viewing clickstream analytics with a 5-second dashboard refresh rate, you will get the following formula:
This means the query needs to have a latency of 5 milliseconds. We can convert 200 queries/second to milliseconds/query by taking the inverse:
But this might not be the case for every type of query, so it needs to be tested.
Serving Analytical Results
QPS is a metric that indicates good performance for queries that are invoked by users or applications. Especially for real-time use cases, analytical queries will have a high refresh rate. High refresh rates can strain the OLAP system. This is also called polling.
Synchronous vs. Asynchronous Queries
Synchronous (Pull) Queries
- Also known as pull queries, where the client submits a query and pulls data from the OLAP system.
- Follows a request-response pattern using a SQL dialect specific to the OLAP system.
- Provides flexibility, allowing the client to retrieve data on demand, reducing unnecessary network traffic.
- More resource-efficient as the server doesn’t need to continuously monitor and push data, but frequent pull queries can increase resource consumption.
Asynchronous (Push) Queries
- Data is pushed to clients when updates occur, eliminating the need for polling.
- Often served from topics in streaming platforms where clients subscribe to real-time updates.
- Enables real-time communication, aligning well with event-driven architectures.
- Typically uses Server-Sent Events (SSEs) for unidirectional updates or WebSockets for bidirectional communication.
- SSEs: Simpler, unidirectional, event-driven updates using HTTP protocol.
- WebSockets: Supports bidirectional, low-latency communication using a specialized protocol.
Push vs. Pull Queries
- Push Queries: Provide real-time updates with lower latency, suitable for event-driven architectures.
- Pull Queries: Offer flexibility and control over data retrieval, beneficial for reducing unnecessary network traffic and in cases where data changes infrequently.
- Hybrid Approach: Combines both methods by initially fetching data with a pull query and then subscribing to changes for real-time updates.
Limitations in OLAP Data Stores
- OLAP data stores typically do not support asynchronous push queries due to their focus on serving aggregated analytical results rather than raw data.
Advanced RAG designs
Found this course in my recommended on the O’Reilly platform so decided to give it a go.
The instructor introduced RAG, and a naive RAG system
And its drawbacks:
- limited contextual understanding: it focuses on keyword matching or basic semantic search (retrieving irrelevant or partially relevant docs)
- inconsistent relevance and quality of retrieved docs: varying in quality and relevant docs (poor quality inputs for the gen model)
- poor integration between retrieval and generation: the retriever and generator might not work in sync
- inefficient handling of large-scale data: scaling issues (taking too long to retrieve relevant docs, or miss critical info due to bad indexing)
- lack of robustness and adaptability: not adaptable to changing contexts or user needs without significant manual intervention
Advanced RAG benefits
- Pre-retrieval
- improvement of the indexing structure and user’s query
- improvement of data details, organising indexes better, adding extra info, aligning things correctly
- Post-retrieval
- combine pre-retrieval data with the original query
- re-ranking to highlight the most important content
Query expansion with generated answers
Generate partial answers to the query to get relevant context
Sample code:
from helper_utils import project_embeddings, word_wrap
from pypdf import PdfReader
import os
from openai import OpenAI
from dotenv import load_dotenv
from pypdf import PdfReader
import umap
# Load environment variables from .env file
load_dotenv()
openai_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=openai_key)
reader = PdfReader("data/microsoft-annual-report.pdf")
pdf_texts = [p.extract_text().strip() for p in reader.pages]
# Filter the empty strings
pdf_texts = [text for text in pdf_texts if text]
# print(
# word_wrap(
# pdf_texts[0],
# width=100,
# )
# )
# split the text into smaller chunks
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
SentenceTransformersTokenTextSplitter,
)
character_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", ". ", " ", ""], chunk_size=1000, chunk_overlap=0
)
character_split_texts = character_splitter.split_text("\n\n".join(pdf_texts))
# print(word_wrap(character_split_texts[10]))
# print(f"\nTotal chunks: {len(character_split_texts)}")
token_splitter = SentenceTransformersTokenTextSplitter(
chunk_overlap=0, tokens_per_chunk=256
)
token_split_texts = []
for text in character_split_texts:
token_split_texts += token_splitter.split_text(text)
# print(word_wrap(token_split_texts[10]))
# print(f"\nTotal chunks: {len(token_split_texts)}")
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
embedding_function = SentenceTransformerEmbeddingFunction()
# print(embedding_function([token_split_texts[10]]))
chroma_client = chromadb.Client()
chroma_collection = chroma_client.create_collection(
"microsoft-collection", embedding_function=embedding_function
)
# extract the embeddings of the token_split_texts
ids = [str(i) for i in range(len(token_split_texts))]
chroma_collection.add(ids=ids, documents=token_split_texts)
chroma_collection.count()
query = "What was the total revenue for the year?"
results = chroma_collection.query(query_texts=[query], n_results=5)
retrieved_documents = results["documents"][0]
# for document in retrieved_documents:
# print(word_wrap(document))
# print("\n")
def augment_query_generated(query, model="gpt-3.5-turbo"):
prompt = """You are a helpful expert financial research assistant.
Provide an example answer to the given question, that might be found in a document like an annual report."""
messages = [
{
"role": "system",
"content": prompt,
},
{"role": "user", "content": query},
]
response = client.chat.completions.create(
model=model,
messages=messages,
)
content = response.choices[0].message.content
return content
original_query = "What was the total profit for the year, and how does it compare to the previous year?"
hypothetical_answer = augment_query_generated(original_query)
joint_query = f"{original_query} {hypothetical_answer}"
print(word_wrap(joint_query))
results = chroma_collection.query(
query_texts=joint_query, n_results=5, include=["documents", "embeddings"]
)
retrieved_documents = results["documents"][0]
# for doc in retrieved_documents:
# print(word_wrap(doc))
# print("")
embeddings = chroma_collection.get(include=["embeddings"])["embeddings"]
umap_transform = umap.UMAP(random_state=0, transform_seed=0).fit(embeddings)
projected_dataset_embeddings = project_embeddings(embeddings, umap_transform)
retrieved_embeddings = results["embeddings"][0]
original_query_embedding = embedding_function([original_query])
augmented_query_embedding = embedding_function([joint_query])
projected_original_query_embedding = project_embeddings(
original_query_embedding, umap_transform
)
projected_augmented_query_embedding = project_embeddings(
augmented_query_embedding, umap_transform
)
projected_retrieved_embeddings = project_embeddings(
retrieved_embeddings, umap_transform
)
import matplotlib.pyplot as plt
# Plot the projected query and retrieved documents in the embedding space
plt.figure()
plt.scatter(
projected_dataset_embeddings[:, 0],
projected_dataset_embeddings[:, 1],
s=10,
color="gray",
)
plt.scatter(
projected_retrieved_embeddings[:, 0],
projected_retrieved_embeddings[:, 1],
s=100,
facecolors="none",
edgecolors="g",
)
plt.scatter(
projected_original_query_embedding[:, 0],
projected_original_query_embedding[:, 1],
s=150,
marker="X",
color="r",
)
plt.scatter(
projected_augmented_query_embedding[:, 0],
projected_augmented_query_embedding[:, 1],
s=150,
marker="X",
color="orange",
)
plt.gca().set_aspect("equal", "datalim")
plt.title(f"{original_query}")
plt.axis("off")
plt.show() # display the plot
Visualisation:
Red cross: original query
Orange cross: modified
Circles: retrieved documents
Query expansion with multiple queries
Use an LLM to generate additional queries that might help getting the most relevant answer
Python code:
from helper_utils import project_embeddings, word_wrap
from pypdf import PdfReader
import os
from openai import OpenAI
from dotenv import load_dotenv
from pypdf import PdfReader
import numpy as np
import umap
# Load environment variables from .env file
load_dotenv()
openai_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=openai_key)
reader = PdfReader("data/microsoft-annual-report.pdf")
pdf_texts = [p.extract_text().strip() for p in reader.pages]
# Filter the empty strings
pdf_texts = [text for text in pdf_texts if text]
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
SentenceTransformersTokenTextSplitter,
)
character_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", ". ", " ", ""], chunk_size=1000, chunk_overlap=0
)
character_split_texts = character_splitter.split_text("\n\n".join(pdf_texts))
token_splitter = SentenceTransformersTokenTextSplitter(
chunk_overlap=0, tokens_per_chunk=256
)
token_split_texts = []
for text in character_split_texts:
token_split_texts += token_splitter.split_text(text)
# now we import chromadb and the SentenceTransformerEmbeddingFunction
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
embedding_function = SentenceTransformerEmbeddingFunction()
# print(embedding_function([token_split_texts[10]]))
# we then instantiate the Chroma client and create a collection called "microsoft-collection"
chroma_client = chromadb.Client()
chroma_collection = chroma_client.create_collection(
"microsoft-collection", embedding_function=embedding_function
)
# extract the embeddings of the token_split_texts
ids = [str(i) for i in range(len(token_split_texts))]
chroma_collection.add(ids=ids, documents=token_split_texts)
chroma_collection.count()
query = "What was the total revenue for the year?"
results = chroma_collection.query(query_texts=[query], n_results=5)
retrieved_documents = results["documents"][0]
# for document in retrieved_documents:
# print(word_wrap(document))
# print("\n")
def generate_multi_query(query, model="gpt-3.5-turbo"):
prompt = """
You are a knowledgeable financial research assistant.
Your users are inquiring about an annual report.
For the given question, propose up to five related questions to assist them in finding the information they need.
Provide concise, single-topic questions (withouth compounding sentences) that cover various aspects of the topic.
Ensure each question is complete and directly related to the original inquiry.
List each question on a separate line without numbering.
"""
messages = [
{
"role": "system",
"content": prompt,
},
{"role": "user", "content": query},
]
response = client.chat.completions.create(
model=model,
messages=messages,
)
content = response.choices[0].message.content
content = content.split("\n")
return content
original_query = (
"What details can you provide about the factors that led to revenue growth?"
)
aug_queries = generate_multi_query(original_query)
# 1. First step show the augmented queries
for query in aug_queries:
print("\n", query)
# 2. concatenate the original query with the augmented queries
joint_query = [
original_query
] + aug_queries # original query is in a list because chroma can actually handle multiple queries, so we add it in a list
# print("======> \n\n", joint_query)
results = chroma_collection.query(
query_texts=joint_query, n_results=5, include=["documents", "embeddings"]
)
retrieved_documents = results["documents"]
# Deduplicate the retrieved documents
unique_documents = set()
for documents in retrieved_documents:
for document in documents:
unique_documents.add(document)
# output the results documents
for i, documents in enumerate(retrieved_documents):
print(f"Query: {joint_query[i]}")
print("")
print("Results:")
for doc in documents:
print(word_wrap(doc))
print("")
print("-" * 100)
embeddings = chroma_collection.get(include=["embeddings"])["embeddings"]
umap_transform = umap.UMAP(random_state=0, transform_seed=0).fit(embeddings)
projected_dataset_embeddings = project_embeddings(embeddings, umap_transform)
# 4. We can also visualize the results in the embedding space
original_query_embedding = embedding_function([original_query])
augmented_query_embeddings = embedding_function(joint_query)
project_original_query = project_embeddings(original_query_embedding, umap_transform)
project_augmented_queries = project_embeddings(
augmented_query_embeddings, umap_transform
)
retrieved_embeddings = results["embeddings"]
result_embeddings = [item for sublist in retrieved_embeddings for item in sublist]
projected_result_embeddings = project_embeddings(result_embeddings, umap_transform)
import matplotlib.pyplot as plt
# Plot the projected query and retrieved documents in the embedding space
plt.figure()
plt.scatter(
projected_dataset_embeddings[:, 0],
projected_dataset_embeddings[:, 1],
s=10,
color="gray",
)
plt.scatter(
project_augmented_queries[:, 0],
project_augmented_queries[:, 1],
s=150,
marker="X",
color="orange",
)
plt.scatter(
projected_result_embeddings[:, 0],
projected_result_embeddings[:, 1],
s=100,
facecolors="none",
edgecolors="g",
)
plt.scatter(
project_original_query[:, 0],
project_original_query[:, 1],
s=150,
marker="X",
color="r",
)
plt.gca().set_aspect("equal", "datalim")
plt.title(f"{original_query}")
plt.axis("off")
plt.show() # display the plot
Visualisation
We can see again the original(orange) and modified(red) queries and the retrieved docs.
However some downsides are clear:
- retrieving lots of documents - generated queries might not be always relevant or useful
- results might not always be relevant or useful
If we want to have many retrieved results, we can use re-ranking to handle that.
Re-ranking
Apply a sophisticated process to re-evaluate and reorder the initial retrieved documents
Python code:
from helper_utils import word_wrap, load_chroma
from pypdf import PdfReader
import os
from openai import OpenAI
from dotenv import load_dotenv
from pypdf import PdfReader
import numpy as np
from langchain_community.document_loaders import PyPDFLoader
# Load environment variables from .env file
load_dotenv()
openai_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=openai_key)
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
embedding_function = SentenceTransformerEmbeddingFunction()
reader = PdfReader("data/microsoft-annual-report.pdf")
pdf_texts = [p.extract_text().strip() for p in reader.pages]
# Filter the empty strings
pdf_texts = [text for text in pdf_texts if text]
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
SentenceTransformersTokenTextSplitter,
)
character_splitter = RecursiveCharacterTextSplitter(
separators=["\n\n", "\n", ". ", " ", ""], chunk_size=1000, chunk_overlap=0
)
character_split_texts = character_splitter.split_text("\n\n".join(pdf_texts))
token_splitter = SentenceTransformersTokenTextSplitter(
chunk_overlap=0, tokens_per_chunk=256
)
token_split_texts = []
for text in character_split_texts:
token_split_texts += token_splitter.split_text(text)
chroma_client = chromadb.Client()
chroma_collection = chroma_client.get_or_create_collection(
"microsoft-collect", embedding_function=embedding_function
)
# extract the embeddings of the token_split_texts
ids = [str(i) for i in range(len(token_split_texts))]
chroma_collection.add(ids=ids, documents=token_split_texts)
count = chroma_collection.count()
query = "What has been the investment in research and development?"
results = chroma_collection.query(
query_texts=query, n_results=10, include=["documents", "embeddings"]
)
retrieved_documents = results["documents"][0]
for document in results["documents"][0]:
print(word_wrap(document))
print("")
from sentence_transformers import CrossEncoder
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
pairs = [[query, doc] for doc in retrieved_documents]
scores = cross_encoder.predict(pairs)
print("Scores:")
for score in scores:
print(score)
print("New Ordering:")
for o in np.argsort(scores)[::-1]:
print(o + 1)
original_query = (
"What were the most important factors that contributed to increases in revenue?"
)
generated_queries = [
"What were the major drivers of revenue growth?",
"Were there any new product launches that contributed to the increase in revenue?",
"Did any changes in pricing or promotions impact the revenue growth?",
"What were the key market trends that facilitated the increase in revenue?",
"Did any acquisitions or partnerships contribute to the revenue growth?",
]
# concatenate the original query with the generated queries
queries = [original_query] + generated_queries
results = chroma_collection.query(
query_texts=queries, n_results=10, include=["documents", "embeddings"]
)
retrieved_documents = results["documents"]
# Deduplicate the retrieved documents
unique_documents = set()
for documents in retrieved_documents:
for document in documents:
unique_documents.add(document)
unique_documents = list(unique_documents)
pairs = []
for doc in unique_documents:
pairs.append([original_query, doc])
scores = cross_encoder.predict(pairs)
print("Scores:")
for score in scores:
print(score)
print("New Ordering:")
for o in np.argsort(scores)[::-1]:
print(o)
# ====
top_indices = np.argsort(scores)[::-1][:5]
top_documents = [unique_documents[i] for i in top_indices]
# Concatenate the top documents into a single context
context = "\n\n".join(top_documents)
# Generate the final answer using the OpenAI model
def generate_multi_query(query, context, model="gpt-3.5-turbo"):
prompt = f"""
You are a knowledgeable financial research assistant.
Your users are inquiring about an annual report.
"""
messages = [
{
"role": "system",
"content": prompt,
},
{
"role": "user",
"content": f"based on the following context:\n\n{context}\n\nAnswer the query: '{query}'",
},
]
response = client.chat.completions.create(
model=model,
messages=messages,
)
content = response.choices[0].message.content
content = content.split("\n")
return content
res = generate_multi_query(query=original_query, context=context)
print("Final Answer:")
print(res)
For a next RAG project I will definitely consider these techniques ^^
That is all for today!
See you tomorrow :)