Simple Example#

Let’s write our first Bytewax dataflow. Be sure that you’ve followed the instructions for installing Bytewax in Installing.

Imports#

Our dataflow starts with a few imports, so let’s create those now.

1import bytewax.operators as op
2from bytewax.connectors.stdio import StdOutSink
3from bytewax.dataflow import Dataflow
4from bytewax.testing import TestingSource

Dataflow#

To begin, create a Dataflow instance in a variable named flow. This defines the empty dataflow we’ll add steps to.

1flow = Dataflow("a_simple_example")

Input#

Every dataflow requires input. For this example, we’ll use the input operator together with a TestingSource to produce a stream of ints.

1stream = op.input("input", flow, TestingSource(range(10)))

The input operator returns a Stream of values. If you are using an editor with a language server setup with type hints, you can see that the return type is a Stream[int] containing integers.

Operators#

Each operator method will return a new Stream with the results of the step which you can call more operators on. Let’s use the map operator to double each number from our input stream.

1def times_two(inp: int) -> int:
2    return inp * 2
3
4
5double = op.map("double", stream, times_two)

Output#

Finally, let’s add an output step. At least one input step and one output step are required on every dataflow. We’ll have our output directed to standard out using the StdOutSink.

1op.output("out", double, StdOutSink())

Running#

When writing Bytewax dataflows for production use, you should run your dataflow using the bytewax.run module. Let’s see an example that does just that.

To begin, save the following code in a file called basic.py.

 1import bytewax.operators as op
 2from bytewax.connectors.stdio import StdOutSink
 3from bytewax.dataflow import Dataflow
 4from bytewax.testing import TestingSource
 5
 6flow = Dataflow("a_simple_example")
 7
 8stream = op.input("input", flow, TestingSource(range(10)))
 9
10
11def times_two(inp: int) -> int:
12    return inp * 2
13
14
15double = op.map("double", stream, times_two)
16
17op.output("out", double, StdOutSink())

To run the dataflow, use the following command:

> python -m bytewax.run basic
0
2
4
6
8
10
12
14
16
18

The first argument passed to the bytewax.run module is a dataflow getter string. Because we saved our Dataflow definition in a variable named flow we don’t need to supply it when running our dataflow from the command line.

Note that just executing the Python file will not run it! You must use the bytewax.run script and it’s options so it can setup the runtime correctly.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now