Optimizing RAG Applications with Windowing: Examining an Indexing Pipeline

By Laura Funderburk

Generating accurate, context-aware data in real-time is becoming increasingly critical in AI and large language models (LLMs). One technique that has emerged to address this need is the Retrieval-Augmented Generation (RAG). RAG enhances LLMs by integrating external data retrieval into the generation process, enabling the models to generate more accurate and context-specific responses. However, when dealing with vast amounts of data, the efficiency of this process can be significantly improved using windowing.

In this blog, we will explore the concept of windowing, its importance in LLM-based RAG applications, and how to incorporate it effectively into an indexing pipeline.

We will demonstrate this using Python code snippets that implement windowing within an indexing pipeline, designed to process and embed news articles.

What is Windowing?

Windowing is a technique in stream processing that divides continuous data flows into discrete, manageable segments called "windows." These windows allow the system to process and aggregate data in chunks rather than in a continuous stream, which is crucial when dealing with large-scale data in real-time. There are different types of windows:

  • Tumbling Windows: Fixed-size, non-overlapping windows that process data in equal intervals.
  • Sliding Windows: Overlapping windows that move incrementally over the data stream, useful for calculating moving averages.
  • Session Windows: Windows with dynamic sizes based on periods of inactivity in the data stream, ideal for tracking user sessions or activity bursts.

To explore windowing concepts more in-depth and how to apply them using the Python-native streaming framework Bytewax, check out our article "Streaming for data scientists part II". Let's now take a look at why consider windowing as part of your RAG toolkit for real-time applications.

Why is Windowing Important in RAG Applications?

When integrating external data into RAG systems, particularly in real-time applications, efficiently processing large volumes of data is a significant challenge. Windowing can help by:

  • Enhancing Efficiency: By processing data in smaller, manageable chunks, windowing reduces the computational load and improves the overall speed of the system.
  • Ensuring Relevance: Windowing helps the system focus on the most recent and relevant data, which is crucial for generating accurate responses.
  • Handling Real-Time Data: Windowing allows for the effective processing of real-time data streams, ensuring that late-arriving data is appropriately managed.

Let's explore how windowing can be integrated into an indexing pipeline using Python.

Incorporating Windowing into an Indexing Pipeline

We'll adapt an indexing pipeline that processes news articles, cleans the text, splits the documents, and then embeds them using a document embedder. We'll use windowing to manage the processing of large volumes of data more efficiently.

The indexing pipeline is built using Haystack by deepset. We build a Haystack custom component that will parse the entries in this JSONL dataset and extract the content from the content key by removing any HTML tags from the text.


from haystack.components.preprocessors import DocumentCleaner
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack import Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
from haystack.utils import Secret

from haystack import component, Document
from typing import Any, Dict, List, Optional, Union
from haystack.dataclasses import ByteStream

import json
from dotenv import load_dotenv
import os

import re
from bs4 import BeautifulSoup
from pathlib import Path

load_dotenv(".env")
open_ai_key = os.environ.get("OPENAI_API_KEY")

@component
class BenzingaNews:
    
    @component.output_types(documents=List[Document])
    def run(self, sources: Dict[str, Any]) -> None:
             
        documents = []
        for source in sources:
        
            for key in source:
                if type(source[key]) == str:
                    source[key] = self.clean_text(source[key])
                    
            if source['content'] == "":
                continue

            #drop content from source dictionary
            content = source['content']
            document = Document(content=content, meta=source) 
            
            documents.append(document)
         
        return {"documents": documents}
               
    def clean_text(self, text):
        # Remove HTML tags using BeautifulSoup
        soup = BeautifulSoup(text, "html.parser")
        text = soup.get_text()
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text).strip()
        return text
    

We then add the custom component along with Haystack pre-defined components and connect the components into an indexing pipeline:


@component
class BenzingaEmbeder:
    
    def __init__(self):
        get_news = BenzingaNews()
        
        document_cleaner = DocumentCleaner(
                            remove_empty_lines=True,
                            remove_extra_whitespaces=True,
                            remove_repeated_substrings=False
                        )
        document_splitter = DocumentSplitter(split_by="passage", split_length=5)
        
        embedding = OpenAIDocumentEmbedder(api_key=Secret.from_token(open_ai_key))

        self.pipeline = Pipeline()
        self.pipeline.add_component("get_news", get_news)
        self.pipeline.add_component("document_cleaner", document_cleaner)
        self.pipeline.add_component("document_splitter", document_splitter)
        self.pipeline.add_component("embedding", embedding)


        self.pipeline.connect("get_news", "document_cleaner")
        self.pipeline.connect("document_cleaner", "document_splitter")
        self.pipeline.connect("document_splitter", "embedding")

        
    @component.output_types(documents=List[Document])
    def run(self, event: List[Union[str, Path, ByteStream]]):
        
        documents = self.pipeline.run({"get_news": {"sources": [event]}})
        
        self.pipeline.draw("benzinga_pipeline.png")
        return documents

Let's explore how to execute this pipeline in the context of windowing.

Example 1: Tumbling Window for Document Embedding

In this example, we'll use a tumbling window to process and embed documents at regular intervals. This ensures that the embedding process is manageable and efficient, even when dealing with a continuous stream of news articles.


import time
from datetime import datetime, timedelta, timezone
import json
import logging
import re

from datetime import datetime, timedelta, timezone
import bytewax.operators as op
from bytewax.connectors.files import FileSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators import windowing as wop
from bytewax.operators.windowing import EventClock, TumblingWindower

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def safe_deserialize(data):
    """Safely deserialize JSON data, handling various formats."""
    try:
        parsed_data = json.loads(data)

        if isinstance(parsed_data, dict):
            event = parsed_data
        else:
            logger.info(f"Skipping unexpected data type: {data}")
            return None

        if 'link' in event:
            event['url'] = event.pop('link')

        if "url" in event:
            return event
        else:
            logger.info(f"Missing 'url' key in data: {data}")
            return None

    except json.JSONDecodeError as e:
        logger.error(f"JSON decode error ({e}) for data: {data}")
        return None
    except Exception as e:
        logger.error(f"Error processing data ({e}): {data}")
        return None

def parse_time(parsed_data):
    """Convert time from string to datetime"""
    for item in ['created_at', 'updated_at']:
        time_min_t = re.sub("T", " ", parsed_data[item])
        time_min_ms = re.sub(r":*Z", "", time_min_t)
        time_ = time.strptime(time_min_ms, "%Y-%m-%d %H:%M:%S")

        parsed_data[item] = datetime(year=time_.tm_year,
                                     month=time_.tm_mon,
                                     day=time_.tm_mday,
                                     hour=time_.tm_hour,
                                     minute=time_.tm_min,
                                     second=time_.tm_sec,
                                     tzinfo=timezone.utc)

    return parsed_data


embed_benzinga = BenzingaEmbeder()


def process_event(event):
    """Unpack the tuple to get the event ID and list of dictionaries"""
    event_id, event_data = event
    
    try: 
       
        for single_event in event_data:
            
            # Ensure that each item in the list is a dictionary
            if isinstance(single_event, list):
                
                documents = embed_benzinga.run(single_event[0])
                return documents
    except Exception as e:
        print("Error", e)
        return None

# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)

# Map the tuple to ensure consistent structure
map_tuple = op.map(
    "tuple_map",
    transform_data_time,
    lambda reading_data: (str(reading_data["id"]), {
        "created_at": reading_data['created_at'],
        "updated_at": reading_data['updated_at'],
        "headline": reading_data['headline'],
        "content": reading_data['content']}
    ),
)

event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = TumblingWindower(align_to=align_to, length=timedelta(seconds=19))

window = wop.collect_window(
    "windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)

calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())

In this setup, the TumblingWindower is used to process and embed news articles in 19-second intervals, ensuring that each window is handled independently, thereby optimizing resource usage.

Example 2: Session Window for Processing News Bursts

Session windows are ideal when processing bursts of data, such as sudden spikes in news articles during significant events. This example demonstrates how to implement a session window to manage such scenarios.


# ... previous function definition and imports stay the same
from bytewax.operators.windowing import SessionWindower

clock = EventClock(ts_getter, wait_for_system_duration=timedelta.max)
windower = SessionWindower(gap=timedelta(seconds=5))

# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)

# Map the tuple to ensure consistent structure
map_tuple = op.map(
    "tuple_map",
    transform_data_time,
    lambda reading_data: (str(reading_data["id"]), {
        "created_at": reading_data['created_at'],
        "updated_at": reading_data['updated_at'],
        "headline": reading_data['headline'],
        "content": reading_data['content']}
    ),
)

event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = SessionWindower(gap=timedelta(seconds=5))

window = wop.collect_window(
    "windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)

calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())

In this example, the SessionWindower dynamically adjusts the window size based on the activity, making it perfect for handling data bursts during significant news events.

Example 3: Sliding Window for Continuous Data Aggregation

Sliding windows allow for continuous aggregation of data, making them ideal for scenarios where ongoing updates are necessary. Here's how to implement a sliding window in the indexing pipeline:


# ... previous function definition and imports stay the same

from bytewax.operators.windowing import SlidingWindower


clock = EventClock(ts_getter, wait_for_system_duration=timedelta.max)
windower = SlidingWindower(length=timedelta(seconds=10), offset=timedelta(seconds=5), align_to=align_to)


# Set up the dataflow
flow = Dataflow("rag-pipeline")
input_data = op.input("input", flow, FileSource("data/news_out.jsonl"))
deserialize_data = op.filter_map("deserialize", input_data, safe_deserialize)
transform_data_time = op.map("timeconversion", deserialize_data, parse_time)

# Map the tuple to ensure consistent structure
map_tuple = op.map(
    "tuple_map",
    transform_data_time,
    lambda reading_data: (str(reading_data["id"]), {
        "created_at": reading_data['created_at'],
        "updated_at": reading_data['updated_at'],
        "headline": reading_data['headline'],
        "content": reading_data['content']}
    ),
)

event_time_config = EventClock(ts_getter=lambda e: e['updated_at'], wait_for_system_duration=timedelta(seconds=1))
align_to = datetime(2024, 5, 29, tzinfo=timezone.utc)
clock_config = SlidingWindower(length=timedelta(seconds=10), offset=timedelta(seconds=5), align_to=align_to)

window = wop.collect_window(
    "windowed_data", map_tuple, clock=event_time_config, windower=clock_config
)

calc = op.filter_map("embed_content", window.down, process_event)
op.output("output", calc, StdOutSink())

This example processes and aggregates news articles in overlapping windows, providing a continuously updated data view.

Conclusion

Windowing is a powerful technique that significantly enhances the efficiency and relevance of Retrieval-Augmented Generation applications, especially when dealing with large volumes of real-time data. By integrating windowing strategies like tumbling, session, and sliding windows into an indexing pipeline, you can ensure that your RAG systems are optimized for performance and accuracy. As AI-driven, real-time applications continue to evolve, the role of windowing in managing and processing data will become increasingly crucial, enabling systems to generate accurate, contextually relevant information in an efficient manner.

P.S. We've built a great community on 𝐒𝐥𝐚𝐜𝐤 that's here to answer your questions, celebrate your wins, and support you. It's all about real, live conversations, so join us there! 💛

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post

Laura Funderburk

Senior Developer Advocate
Laura Funderburk holds a B.Sc. in Mathematics from Simon Fraser University and has extensive work experience as a data scientist. She is passionate about leveraging open source for MLOps and DataOps and is dedicated to outreach and education.
Next post