Hacker News From Request to Stream: A Deep Dive into How to Use Bytewax to Poll HTTP Endpoints to Create a Real-Time Stream of Data

By Zander Matheson Required version: >= 0.17.1

Introduction

In the dynamic landscape of today’s business environment, the ability to access and respond to the most recent data is not just an advantage, it’s a necessity. Companies across the globe are vying for strategies and technologies that enable them to process and analyze data in real time, ensuring operations happen in a timely and effective manner. This demand propels us into the exploration of mechanisms like polling HTTP endpoints – a technique pivotal in the contemporary data ecosystem for retrieving the latest data directly from the source.

Polling HTTP endpoints refers to the automated process of sending requests to a specific HTTP URL at regular intervals. This procedure ensures that businesses can capture, process, and respond to the most recent and relevant data. In industries where data is continuously generated and updated—such as e-commerce, finance, and social media—this real-time data retrieval is integral for instant analytics, immediate decision-making, and timely response to market trends and customer behaviors.

poll.png

In this deep dive, we’ll show how you can use the periodic input available in bytewax since v0.17.1 mechanism in Bytewax to poll HTTP endpoints, unveiling the step-by-step process to poll, retrieve, and stream data in real-time.

Diving into Real-Time Data Polling with Bytewax

In this piece, we're focusing on how to effectively retrieve real-time data using Bytewax, demonstrated with a Python script for pulling data from the Hacker News API. We are going to walk through a datflow program that will continuously poll the Hacker News API, retrieve a set of ids, distribute those ids to different workers and then retrieve the metadata of the items and the root id of the item. You can see the entire dataflow program below, we will dissect the code below to understand its structure and functionality.

import requests
from datetime import datetime, timedelta
import time
from typing import Any, Optional

from bytewax.connectors.periodic import SimplePollingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class HNInput(SimplePollingInput):
    def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
        super().__init__(interval, align_to)
        logger.info(f"received starting id: {init_item}")
        self.max_id = init_item

    def next_item(self):
        '''
        Get all the items from hacker news API between 
        the last max id and the current max id
        '''
        if not self.max_id:
            self.max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
        new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
        logger.info(f"current id: {self.max_id}, new id: {new_max_id}")
        ids = [int(i) for i in range(self.max_id, new_max_id)]
        self.max_id = new_max_id
        return ids
    
def download_metadata(hn_id):
    # Given an hacker news id returned from the api, fetch metadata
    req = requests.get(
        f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
    )
    if not req.json():
        logger.warning(f"error getting payload from item {hn_id} trying again")
        time.sleep(0.5)
        return download_metadata(hn_id)
    return req.json()

def recurse_tree(metadata):
    try:
        parent_id = metadata["parent"]
        parent_metadata = download_metadata(parent_id)
        return recurse_tree(parent_metadata)
    except KeyError:
        return (metadata["id"], {**metadata, "key_id": metadata["id"]})

def key_on_parent(metadata: dict) -> tuple:
    key, metadata = recurse_tree(metadata)
    return (key, metadata)
    
def run_hn_flow(init_item): 
    flow = Dataflow()
    flow.input("in", HNInput(timedelta(seconds=15), None, init_item)) # skip the align_to argument
    flow.flat_map(lambda x: x)
    # If you run this dataflow with multiple workers, downloads in
    # the next `map` will be parallelized thanks to .redistribute()
    flow.redistribute()
    flow.map(download_metadata)
    flow.inspect(logger.info)

    # We want to keep related data together so let's build a 
    # traversal function to get the ultimate parent
    flow.map(key_on_parent)

    flow.output("std-out", StdOutput())
    return flow

Breaking it Down

Import Necessary Libraries and Modules

import requests
from datetime import datetime, timedelta
from typing import Any, Optional

from bytewax.connectors.periodic import SimplePollingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

We start by importing standard libraries and modules. The requests library is used for making HTTP requests to the Hacker News API. We’re also setting up logging to capture informational messages during the script’s execution, aiding in debugging and verification of the data retrieval process.

Create a Custom Input Class

class HNInput(SimplePollingInput):
    def __init__(self, interval: timedelta, align_to: Optional[datetime] = None, init_item: Optional[int] = None):
        super().__init__(interval, align_to)
        logger.info(f"received starting id: {init_item}")
        self.max_id = init_item

Here, a custom input class HNInput is defined, inheriting from Bytewax’s SimplePollingInput. The __init__ method is extended to log the initial item ID and set it as the max_id. This will be used as the starting point for retrieving new items from the API. If it is ignored by passing None, the dataflow will just start processing from the latest item ID.

Implement the Data Retrieval Method

    def next_item(self):
        '''
        Get all the items from hacker news API between
        the last max id and the current max id
        '''
        if not self.max_id:
            self.max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
        new_max_id = requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json()
        logger.info(f"current id: {self.max_id}, new id: {new_max_id}")
        ids = [int(i) for i in range(self.max_id, new_max_id)]
        self.max_id = new_max_id
        return ids

The next_item method is the core of the data retrieval process. It makes HTTP requests to the Hacker News API to get the current maximum item ID. It then calculates the range of new item IDs that have been added since the last retrieval and updates the max_id to the current maximum. This set of new item IDs is returned for further processing.

This script efficiently retrieves new item IDs from the Hacker News API at regular intervals, facilitated by the Bytewax library. It’s a practical example for Python developers looking to implement real-time data retrieval in their applications. The logged messages assist in monitoring the script’s progress and verifying its functionality, ensuring that it’s always clear which items are being retrieved and processed.

Extending the Data Retrieval Pipeline with Metadata Enrichment and Seeking the Root Item ID

With the real-time data retrieval mechanism in place, the next step is to extend this capability to include metadata extraction and to get the parent of the items. The items can be comments, polls, poll opts or storeis. We want to recurse to the ultimate parent of the comment and add that as a key. This will allow us to efficiently compute statistics about a story in real-time like how many comments it has had in the past 15 minutes (that is left to an exercise for the reader, but a good starting point can be found here). In this section, we’ll walk through the rest of the Python script that completes this process.

Extracting Metadata with Recursion

def download_metadata(hn_id):
    # Given an hacker news id returned from the api, fetch metadata
    req = requests.get(
        f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
    )
    if not req.json():
        logger.warning(f"error getting payload from item {hn_id} trying again")
        time.sleep(0.5)
        return download_metadata(hn_id)
    return req.json()

The download_metadata function is tasked with fetching detailed metadata for each Hacker News ID. It sends a request to the Hacker News API and retrieves the corresponding item’s JSON payload. If the request fails to retrieve data, the function retries after a short pause.

Traversing the Comment Thread

def recurse_tree(metadata):
    try:
        parent_id = metadata["parent"]
        parent_metadata = download_metadata(parent_id)
        return recurse_tree(parent_metadata)
    except KeyError:
        return (metadata["id"], {**metadata, "key_id": metadata["id"]})

def key_on_parent(metadata: dict) -> tuple:
    key, metadata = recurse_tree(metadata)
    return (key, metadata)

To fetch the entire comment thread related to a specific item, recurse_tree function recursively retrieves the parent comments until it reaches the root. This function helps in collecting the entire context of a conversation or comment thread.

Building and Running the Dataflow

def run_hn_flow(init_item):
    flow = Dataflow()
    flow.input("in", HNInput(timedelta(seconds=15), None, init_item))
    flow.flat_map(lambda x: x)
    flow.redistribute()
    flow.map(download_metadata)
    flow.inspect(logger.info)
    flow.map(key_on_parent)
    flow.output("std-out", StdOutput())
    return flow

The run_hn_flow function integrates all the previously defined functions and methods, creating a complete dataflow. The data is retrieved, redistributed among multiple workers for parallel processing (if available), enriched with metadata and keyed by the attributed parent item. Finally, the data is sent to the standard output, ready to be consumed, analyzed, or stored.

Running our Dataflow

We’ve implemented a parameterized dataflow by wrapping our flow up in a function. This allows us to efficiently update the starting item id so that we can backfill from a certain date. To run a parametrized dataflow, it is similar to the way we usually run a python module with the python -m pkg:module command familiar to Flask and Fast API users, but we wrapp the command in a string. You can see the full command below.

python -m bytewax.run "dataflow:run_hn_flow(item_id)"

Where item_id is the starting id number for the dataflow program.

Scaling Up

If you are backfilling and there are a lot of requests being made to get the parent id and the metadata, you may want to parallelize your dataflow. To do this, you can add the number of processes you would like to run with the -p argument.

python -m bytewax.run "dataflow:run_hn_flow(37771380)" -p3

Note that for this particular type of workflow where you cannot easily parallelize the ids, you will be limited to a single worker for the initial input and only once you redistribute will the workers share the load.

Wrapping Everything Up

This dataflow program represents a comprehensive approach to real-time data retrieval and processing. Starting with polling the Hacker News API for new items, it then distributes, enriches, and outputs the data efficiently. By leveraging Bytewax’s straightforward interface and Python’s expressive syntax, developers can adapt and extend this script to fit specific needs, integrating real-time data into applications and analytics workflows seamlessly.

Like what you see! Give us a ⭐ on the Bytewax GitHub repo.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post
Zander Matheson

Zander Matheson

CEO, Co-founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post