Optimizing RAG Applications with Windowing: Examining an Indexing Pipeline
Generating accurate, context-aware data in real-time is becoming increasingly critical in AI and large language models (LLMs). One technique that has emerged to address this need is the Retrieval-Augmented Generation (RAG). RAG enhances LLMs by integrating external data retrieval into the generation process, enabling the models to generate more accurate and context-specific responses. However, when dealing with vast amounts of data, the efficiency of this process can be significantly improved using windowing.
In this blog, we will explore the concept of windowing, its importance in LLM-based RAG applications, and how to incorporate it effectively into an indexing pipeline.
We will demonstrate this using Python code snippets that implement windowing within an indexing pipeline, designed to process and embed news articles.
What is Windowing?
Windowing is a technique in stream processing that divides continuous data flows into discrete, manageable segments called "windows." These windows allow the system to process and aggregate data in chunks rather than in a continuous stream, which is crucial when dealing with large-scale data in real-time. There are different types of windows:
- Tumbling Windows: Fixed-size, non-overlapping windows that process data in equal intervals.
- Sliding Windows: Overlapping windows that move incrementally over the data stream, useful for calculating moving averages.
- Session Windows: Windows with dynamic sizes based on periods of inactivity in the data stream, ideal for tracking user sessions or activity bursts.
To explore windowing concepts more in-depth and how to apply them using the Python-native streaming framework Bytewax, check out our article "Streaming for data scientists part II". Let's now take a look at why consider windowing as part of your RAG toolkit for real-time applications.
Why is Windowing Important in RAG Applications?
When integrating external data into RAG systems, particularly in real-time applications, efficiently processing large volumes of data is a significant challenge. Windowing can help by:
- Enhancing Efficiency: By processing data in smaller, manageable chunks, windowing reduces the computational load and improves the overall speed of the system.
- Ensuring Relevance: Windowing helps the system focus on the most recent and relevant data, which is crucial for generating accurate responses.
- Handling Real-Time Data: Windowing allows for the effective processing of real-time data streams, ensuring that late-arriving data is appropriately managed.
Let's explore how windowing can be integrated into an indexing pipeline using Python.
Incorporating Windowing into an Indexing Pipeline
We'll adapt an indexing pipeline that processes news articles, cleans the text, splits the documents, and then embeds them using a document embedder. We'll use windowing to manage the processing of large volumes of data more efficiently.
The indexing pipeline is built using Haystack by deepset. We build a Haystack custom component that will parse the entries in this JSONL dataset and extract the content from the content
key by removing any HTML tags from the text.
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack import Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.utils import Secret
from haystack import component, Document
from typing import Any, Dict, List, Optional, Union
from haystack.dataclasses import ByteStream
import json
from dotenv import load_dotenv
import os
import re
from bs4 import BeautifulSoup
from pathlib import Path
load_dotenv(".env")
open_ai_key = os.environ.get("OPENAI_API_KEY")
@component
class BenzingaNews:
@component.output_types(documents=List[Document])
def run(self, sources: Dict[str, Any]) -> None:
documents = []
for source in sources:
for key in source:
if type(source[key]) == str:
source[key] = self.clean_text(source[key])
if source['content'] == "":
continue
#drop content from source dictionary
content = source['content']
document = Document(content=content, meta=source)
documents.append(document)
return {"documents": documents}
def clean_text(self, text):
# Remove HTML tags using BeautifulSoup
soup = BeautifulSoup(text, "html.parser")
text = soup.get_text()
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
We then add the custom component along with Haystack pre-defined components and connect the components into an indexing pipeline:
@component
class BenzingaEmbeder:
def __init__(self):
get_news = BenzingaNews()
document_cleaner = DocumentCleaner(
remove_empty_lines=True,
remove_extra_whitespaces=True,
remove_repeated_substrings=False
)
document_splitter = DocumentSplitter(split_by="passage", split_length=5)
embedding = OpenAIDocumentEmbedder(api_key=Secret.from_token(open_ai_key))
self.pipeline = Pipeline()
self.pipeline.add_component("get_news", get_news)
self.pipeline.add_component("document_cleaner", document_cleaner)
self.pipeline.add_component("document_splitter", document_splitter)
self.pipeline.add_component("embedding", embedding)
self.pipeline.connect("get_news", "document_cleaner")
self.pipeline.connect("document_cleaner", "document_splitter")
self.pipeline.connect("document_splitter", "embedding")
@component.output_types(documents=List[Document])
def run(self, event: List[Union[str, Path, ByteStream]]):
documents = self.pipeline.run({"get_news": {"sources": [event]}})
self.pipeline.draw("benzinga_pipeline.png")
return documents
Let's explore how to execute this pipeline in the context of windowing.
Example 1: Tumbling Window for Document Embedding
In this example, we'll use a tumbling window to process and embed documents at regular intervals. This ensures that the embedding process is manageable and efficient, even when dealing with a continuous stream of news articles.
import time
from datetime import datetime, timedelta, timezone
import json
import logging
import re
from datetime import datetime, timedelta, timezone
import bytewax.operators as op
from bytewax.connectors.files import FileSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators import windowing as wop
from bytewax.operators.windowing import EventClock, TumblingWindower
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def safe_deserialize(data):
"""Safely deserialize JSON data, handling various formats."""
try:
parsed_data = json.loads(data)
if isinstance(parsed_data, dict):
event = parsed_data
else:
logger.info(f"Skipping unexpected data type: {data}")
return None
if 'link' in event:
event['url'] = event.pop('link')
if "url" in event:
return event
else:
logger.info(f"Missing 'url' key in data: {data}")
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error ({e}) for data: {data}")
return None
except Exception as e:
logger.error(f"Error processing data ({e}): {data}")
return None
def parse_time(parsed_data):
"""Convert time from string to datetime"""
for item in ['created_at', 'updated_at']:
time_min_t = re.sub("T", " ", parsed_data[item])
time_min_ms = re.sub(r":*Z", "", time_min_t)
time_ = time.strptime(time_min_ms, "%Y-%m-%d %H:%M:%S")
parsed_data[item] = datetime(year=time_.tm_year,
month=time_.tm_mon,
day=time_.tm_mday,
hour=time_.tm_hour,
minute=time_.tm_min,
second=time_.tm_sec,
tzinfo=timezone.utc)
return parsed_data
embed_benzinga = BenzingaEmbeder()
def process_event(event):
"""Unpack the tuple to get the event ID and list of dictionaries"""
event_id, event_data = event
try:
for single_event in event_data:
# Ensure that each item in the list is a dictionary
if isinstance(single_event, list):
documents = embed_benzinga.run(single_event[0])
return documents
except Exception as e:
print("Error", e)
return None
# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)
# Map the tuple to ensure consistent structure
map_tuple = op.map(
"tuple_map",
transform_data_time,
lambda reading_data: (str(reading_data["id"]), {
"created_at": reading_data['created_at'],
"updated_at": reading_data['updated_at'],
"headline": reading_data['headline'],
"content": reading_data['content']}
),
)
event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = TumblingWindower(align_to=align_to, length=timedelta(seconds=19))
window = wop.collect_window(
"windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)
calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())
In this setup, the TumblingWindower
is used to process and embed news articles in 19-second intervals, ensuring that each window is handled independently, thereby optimizing resource usage.
Example 2: Session Window for Processing News Bursts
Session windows are ideal when processing bursts of data, such as sudden spikes in news articles during significant events. This example demonstrates how to implement a session window to manage such scenarios.
# ... previous function definition and imports stay the same
from bytewax.operators.windowing import SessionWindower
clock = EventClock(ts_getter, wait_for_system_duration=timedelta.max)
windower = SessionWindower(gap=timedelta(seconds=5))
# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)
# Map the tuple to ensure consistent structure
map_tuple = op.map(
"tuple_map",
transform_data_time,
lambda reading_data: (str(reading_data["id"]), {
"created_at": reading_data['created_at'],
"updated_at": reading_data['updated_at'],
"headline": reading_data['headline'],
"content": reading_data['content']}
),
)
event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = SessionWindower(gap=timedelta(seconds=5))
window = wop.collect_window(
"windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)
calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())
In this example, the SessionWindower
dynamically adjusts the window size based on the activity, making it perfect for handling data bursts during significant news events.
Example 3: Sliding Window for Continuous Data Aggregation
Sliding windows allow for continuous aggregation of data, making them ideal for scenarios where ongoing updates are necessary. Here's how to implement a sliding window in the indexing pipeline:
# ... previous function definition and imports stay the same
from bytewax.operators.windowing import SlidingWindower
clock = EventClock(ts_getter, wait_for_system_duration=timedelta.max)
windower = SlidingWindower(length=timedelta(seconds=10), offset=timedelta(seconds=5), align_to=align_to)
# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)
# Map the tuple to ensure consistent structure
map_tuple = op.map(
"tuple_map",
transform_data_time,
lambda reading_data: (str(reading_data["id"]), {
"created_at": reading_data['created_at'],
"updated_at": reading_data['updated_at'],
"headline": reading_data['headline'],
"content": reading_data['content']}
),
)
event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = SlidingWindower(length=timedelta(seconds=10), offset=timedelta(seconds=5), align_to=align_to)
window = wop.collect_window(
"windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)
calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())
This example processes and aggregates news articles in overlapping windows, providing a continuously updated data view.
Conclusion
Windowing is a powerful technique that significantly enhances the efficiency and relevance of Retrieval-Augmented Generation applications, especially when dealing with large volumes of real-time data. By integrating windowing strategies like tumbling, session, and sliding windows into an indexing pipeline, you can ensure that your RAG systems are optimized for performance and accuracy. As AI-driven, real-time applications continue to evolve, the role of windowing in managing and processing data will become increasingly crucial, enabling systems to generate accurate, contextually relevant information in an efficient manner.
P.S. We've built a great community on 𝐒𝐥𝐚𝐜𝐤 that's here to answer your questions, celebrate your wins, and support you. It's all about real, live conversations, so join us there! 💛
Stay updated with our newsletter
Subscribe and never miss another blog post, announcement, or community event.