Stream Real-Time Stock Prices and Analyze them with Numpy

By Zander Matheson
Real-time stock prices analysis

In this post, you'll learn how to stream real-time stock prices from Yahoo! Finance using websockets and analyze them using Numpy. By creating a Bytewax dataflow, you can transform and react to market data in real-time, making it ideal for live data analysis.

Websockets are a powerful tool that allows you to quickly access up-to-date information on stock prices. With Numpy, you can efficiently analyze this data and create features that can be used in machine learning models or automated trading bots.

Follow our blog and learn how to extract valuable insights from real-time market data today with Python!

Diving into the world of websockets!

Have you ever noticed how stock prices on Yahoo! Finance update in real-time? It's all thanks to websockets, which happen to be a great source of data for Bytewax dataflows.

In general, if you come across a publicly available website that displays something in real-time, chances are it's powered by websockets. We can figure this out with some quick inspection using Chrome or Mozilla developer tools.

yahoo_inspect

While inspecting the site, we'll see the websocket and the data being returned. It might be a little hard to decipher, but fear not! Others have done detective work for us and have already determined that it's base64 encoded and is in the protobuf format.

Now that we know the websocket URL and data encoding, we can build a Bytewax dataflow to get live stock data as input and analyze it to make informed trading decisions.

Dataflow

Input

A side note on data inputs: since we're working with live data, any dataflow crashes may result in data loss. To avoid this, it's recommended to use reliable data sources like Redpanda or Kafka. However, for the purpose of this demo, we'll proceed without worrying about failure.

To connect to the Yahoo! Finance websocket, we will set up a custom input source in Bytewax that will use the Python websocket-client module. Our dataflow is set up to scale to multiple workers, and the stock ticker symbols will be distributed among them using the distribute method.

import base64
import json

from bytewax.dataflow import Dataflow
from bytewax.inputs import ManualInputConfig, distribute


from websocket import create_connection

## input
ticker_list = ['AMZN', 'MSFT']

def yf_input(worker_tickers, state):
        ws = create_connection("wss://streamer.finance.yahoo.com/")
        ws.send(json.dumps({"subscribe": worker_tickers}))
        while True:
            yield state, ws.recv()


def input_builder(worker_index, worker_count, resume_state):
    state = resume_state or None
    worker_tickers = list(distribute(ticker_list, worker_index, worker_count))
    print({"subscribing to": worker_tickers})
    return yf_input(worker_tickers, state)


flow = Dataflow()
flow.input("input", ManualInputConfig(input_builder))

Once we are done with the input source, our dataflow will receive encoded protobuf messages from the Yahoo! Finance websocket at each interval. In the next step, we'll need to deserialize these messages into usable Python objects.

Understanding Protobufs

Protobufs are Google's answer to platform-agnostic data serialization. They provide a standardized way to serialize structured data in a compact binary format. If you're new to Protobufs, you can check out their documentation here.

In our case, we're using Protobufs to serialize and deserialize our streaming stock data. To get started writing protocol buffers, you will need the protobuf tools installed, and then you need to define a protobuf schema.

You can skip this because ours is already defined for us in ticker.proto. The way that protobuffs work, the schema is compiled by the protoc command line tool and a Python serialization file is generated. This file is what we are using in our code in the deserialization step.

from ticker_pb2 import Ticker


def deserialize(message):
    '''Use the imported Ticker class to deserialize 
    the protobuf message

    returns: ticker id and ticker object
    '''
    ticker_ = Ticker()
    message_bytes = base64.b64decode(message)
    ticker_.ParseFromString(message_bytes)
    return (ticker_.id, ticker_)
    
flow.map(deserialize)

At this point we have taken the websocket messages, decoded and deserialized them and then returned a tuple of the format (ticker_.id, ticker_). We have done this so that we can aggregate data by the ticker symbol downstream.

Analyzing Real-Time Stock Prices

A necessary step when writing an automated trading bot is to create some understanding around what is happening from the market data. This could either be via feature engineering in the case of machine learning or technical analysis for a trend-based algorithm.

Regardless a windowing operator will come in handy to take our data and accumulate it over a window of time so we can pull out some of the information "hidden" in the data. To start, we are going to take our real-time data and get the 1 min average price, the high, low and the start and end price in that interval.

First, we'll accumulate the data in a Numpy array. Numpy is a popular library for data analysis due to its speed and versatility. There are other options with more expressive APIs, but Numpy provides the capabilities we need.

def build_array():
    return np.empty((0,3))


# This is the accumulator function, and outputs a numpy array of time and price
def acc_values(np_array, ticker):
    return np.insert(np_array, 0, np.array((ticker.time, ticker.price, ticker.dayVolume)), 0)


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
# Note that the datetime MUST be UTC. If the datetime is using a different
# representation, we would have to convert it here.
def get_event_time(ticker):
    return datetime.utcfromtimestamp(ticker.time/1000).replace(tzinfo=timezone.utc)


# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_event_time, wait_for_system_duration=timedelta(seconds=10))

# And a 1 minute tumbling window, that starts at the beginning of the minute
start_at = datetime.now(timezone.utc)
start_at = start_at - timedelta(
    seconds=start_at.second, microseconds=start_at.microsecond
)
wc = TumblingWindowConfig(start_at=start_at, length=timedelta(seconds=60))
flow.fold_window("1_min", cc, wc, build_array, acc_values)

We have quite a bit going on here, so let's unpack this. First, we have two functions: a builder function (build_array) used to build data, and an updater function (acc_values) used to accumulate data. These are the last two arguments in the fold_window operator. Next, we have an get_event_time function that will take the time field and parse it to pull out the event time to be used in the window, and then we have the start_at time, which will be used as the start of our window. These are both arguments in the TumblingWindow module.

The output of this step will be an accumulated array of prices that we have received over the minute.

('MSFT', {'time': 1677606304000.0, 'min': 250.24000549316406, 'max': 250.32000732421875, 'first_price': 250.24000549316406, 'last_price': 250.3000030517578, 'volume': 17517.0})

Now we can extract the features from this data that we would then use in a machine learning model or trading algorithm.

def calculate_features(ticker__data):
    ticker, data = ticker__data
    return (ticker, {"time":data[-1][0], "min":np.amin(data[:,1]), "max":np.amax(data[:,1]), "first_price":data[:,1][-1], "last_price":data[:,1][0], "volume":data[:,2][0] - data[:,2][-1]})

flow.map(calculate_features)

The calculate_features function uses Numpy's built-in capabilities to efficiently calculate the minimum and maximum prices, first and last price, and volume for each ticker symbol over the given time window.

Printing Real-Time Finance Insights

After analyzing the stock prices, it's time to output the results. In this demo, we'll print them out to the console. In a real trading bot, you might want to use the insights to make trades or store them somewhere for further analysis. To print the results, we'll use StdOutputConfig from Bytewax outputs. Once we have the output set up, we can execute our dataflow using run_main from Bytewax execution.

from bytewax.execution import run_main
from bytewax.outputs import StdOutputConfig

flow.capture(StdOutputConfig())

if __name__ == "__main__":
    run_main(flow)

That's it! We're now ready to print out real-time finance insights by running our Python script:

python dataflow.py

Conclusion

In conclusion, we have shown you how to leverage websockets and Numpy for finance. We streamed real-time stock prices from Yahoo! Finance and analyzed them in a Bytewax dataflow. We demonstrated how to connect to the Yahoo! Finance websocket, deserialize protobuf messages, and calculated features like the 1-minute average price, high, low, and start and end price. While this example dataflow simply prints out the results, it could be extended to include trading logic or to write the results to another location for further analysis. We hope this guide has shown you how to extract valuable insights from real-time market data and inspired you to explore the capabilities of Python streaming with Bytewax for your own financial analysis and trading projects.

Stay updated with our newsletter

Subscribe and never miss another blog post, announcement, or community event.

Previous post
Zander Matheson

Zander Matheson

CEO, Co-founder
Zander is a seasoned data engineer who has founded and currently helms Bytewax. Zander has worked in the data space since 2014 at Heroku, GitHub, and an NLP startup. Before that, he attended business school at the UT Austin and HEC Paris in Europe.
Next post