Module bytewax.execution

How to execute your dataflows.

Run an instantiated Dataflow using one of the entry point functions in this module.

Epoch Configs

Epochs define the granularity of recovery in a bytewax dataflow. By default, we snapshot recovery every 10 seconds. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.

Expand source code
"""How to execute your dataflows.

Run an instantiated `bytewax.dataflow.Dataflow` using one of the entry
point functions in this module.


Epoch Configs
-------------

Epochs define the granularity of recovery in a bytewax dataflow. By default, we
snapshot recovery every 10 seconds. You should only need to set this if you are
testing the recovery system or are doing deep exactly-once integration work. Changing
this does not change the semantics of any of the operators.


"""
from typing import Any, Iterable, List, Optional, Tuple

from multiprocess import get_context

from bytewax.dataflow import Dataflow
from bytewax.recovery import RecoveryConfig

from .bytewax import (  # noqa: F401
    cluster_main,
    EpochConfig,
    PeriodicEpochConfig,
    run_main,
    TestingEpochConfig,
)

# Due to our package structure, we need to define __all__
# in any submodule as pdoc will not find the documentation
# for functions imported here, but defined in another submodule.
# See https://pdoc3.github.io/pdoc/doc/pdoc/#what-objects-are-documented
# for more information.
__all__ = [
    "run_main",
    "cluster_main",
    "spawn_cluster",
    "EpochConfig",
    "PeriodicEpochConfig",
    "TestingEpochConfig",
]


def _gen_addresses(proc_count: int) -> Iterable[str]:
    return [f"localhost:{proc_id + 2101}" for proc_id in range(proc_count)]


def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
    """Execute a dataflow as a cluster of processes on this machine.

    Blocks until execution is complete.

    Starts up cluster processes for you and handles connecting them
    together. You'd commonly use this for notebook analysis that needs
    parallelism and higher throughput, or simple stand-alone demo
    programs.

    >>> from bytewax.testing import doctest_ctx
    >>> from bytewax.dataflow import Dataflow
    >>> from bytewax.inputs import TestingInputConfig
    >>> from bytewax.outputs import StdOutputConfig
    >>> flow = Dataflow()
    >>> flow.input("inp", TestingInputConfig(range(3)))
    >>> flow.capture(StdOutputConfig())
    >>> spawn_cluster(
    ...     flow,
    ...     proc_count=2,
    ...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
    ... )  # doctest: +ELLIPSIS
    (...)

    See `bytewax.run_main()` for a way to test input and output
    builders without the complexity of starting a cluster.

    See `bytewax.cluster_main()` for starting one process in a cluster
    in a distributed situation.

    Args:

        flow: Dataflow to run.

        epoch_config: A custom epoch config. You probably don't need
            this. See `EpochConfig` for more info.

        recovery_config: State recovery config. See
            `bytewax.recovery`. If `None`, state will not be
            persisted.

        proc_count: Number of processes to start.

        worker_count_per_proc: Number of worker threads to start on
            each process.

        mp_ctx: `multiprocessing` context to use. Use this to
            configure starting up subprocesses via spawn or
            fork. Defaults to spawn.

    """
    addresses = _gen_addresses(proc_count)
    with mp_ctx.Pool(processes=proc_count) as pool:
        futures = [
            pool.apply_async(
                cluster_main,
                (flow,),
                {
                    "epoch_config": epoch_config,
                    "recovery_config": recovery_config,
                    "addresses": addresses,
                    "proc_id": proc_id,
                    "worker_count_per_proc": worker_count_per_proc,
                },
            )
            for proc_id in range(proc_count)
        ]
        pool.close()

        for future in futures:
            # Will re-raise exceptions from subprocesses.
            future.get()

        pool.join()

Functions

def cluster_main(flow, addresses, proc_id, *, epoch_config, recovery_config, worker_count_per_proc)

Execute a dataflow in the current process as part of a cluster.

You have to coordinate starting up all the processes in the cluster and ensuring they each are assigned a unique ID and know the addresses of other processes. You'd commonly use this for starting processes as part of a Kubernetes cluster.

Blocks until execution is complete.

>>> from bytewax.dataflow import Dataflow
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> addresses = []  # In a real example, you'd find the "host:port" of all other Bytewax workers.
>>> proc_id = 0  # In a real example, you'd assign each worker a distinct ID from 0..proc_count.
>>> cluster_main(flow, addresses, proc_id)
0
1
2

See bytewax.run_main() for a way to test input and output builders without the complexity of starting a cluster.

See bytewax.spawn_cluster() for starting a simple cluster locally on one machine.

Args

flow
Dataflow to run.
addresses
List of host/port addresses for all processes in this cluster (including this one).
proc_id
Index of this process in cluster; starts from 0.
epoch_config
A custom epoch config. You probably don't need this. See EpochConfig for more info.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
worker_count_per_proc
Number of worker threads to start on each process.
def run_main(flow, *, epoch_config, recovery_config)

Execute a dataflow in the current thread.

Blocks until execution is complete.

You'd commonly use this for prototyping custom input and output builders with a single worker before using them in a cluster setting.

>>> from bytewax.dataflow import Dataflow
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> run_main(flow)
0
1
2

See bytewax.spawn_cluster() for starting a cluster on this machine with full control over inputs and outputs.

Args

flow
Dataflow to run.
epoch_config
A custom epoch config. You probably don't need this. See EpochConfig for more info.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
def spawn_cluster(flow: Dataflow, *, epoch_config: Optional[EpochConfig] = None, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=<multiprocess.context.SpawnContext object>) ‑> List[Tuple[int, Any]]

Execute a dataflow as a cluster of processes on this machine.

Blocks until execution is complete.

Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.

>>> from bytewax.testing import doctest_ctx
>>> from bytewax.dataflow import Dataflow
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> spawn_cluster(
...     flow,
...     proc_count=2,
...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
... )  # doctest: +ELLIPSIS
(...)

See bytewax.run_main() for a way to test input and output builders without the complexity of starting a cluster.

See bytewax.cluster_main() for starting one process in a cluster in a distributed situation.

Args

flow
Dataflow to run.
epoch_config
A custom epoch config. You probably don't need this. See EpochConfig for more info.
recovery_config
State recovery config. See bytewax.recovery. If None, state will not be persisted.
proc_count
Number of processes to start.
worker_count_per_proc
Number of worker threads to start on each process.
mp_ctx
multiprocessing context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn.
Expand source code
def spawn_cluster(
    flow: Dataflow,
    *,
    epoch_config: Optional[EpochConfig] = None,
    recovery_config: Optional[RecoveryConfig] = None,
    proc_count: int = 1,
    worker_count_per_proc: int = 1,
    mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
    """Execute a dataflow as a cluster of processes on this machine.

    Blocks until execution is complete.

    Starts up cluster processes for you and handles connecting them
    together. You'd commonly use this for notebook analysis that needs
    parallelism and higher throughput, or simple stand-alone demo
    programs.

    >>> from bytewax.testing import doctest_ctx
    >>> from bytewax.dataflow import Dataflow
    >>> from bytewax.inputs import TestingInputConfig
    >>> from bytewax.outputs import StdOutputConfig
    >>> flow = Dataflow()
    >>> flow.input("inp", TestingInputConfig(range(3)))
    >>> flow.capture(StdOutputConfig())
    >>> spawn_cluster(
    ...     flow,
    ...     proc_count=2,
    ...     mp_ctx=doctest_ctx,  # Outside a doctest, you'd skip this.
    ... )  # doctest: +ELLIPSIS
    (...)

    See `bytewax.run_main()` for a way to test input and output
    builders without the complexity of starting a cluster.

    See `bytewax.cluster_main()` for starting one process in a cluster
    in a distributed situation.

    Args:

        flow: Dataflow to run.

        epoch_config: A custom epoch config. You probably don't need
            this. See `EpochConfig` for more info.

        recovery_config: State recovery config. See
            `bytewax.recovery`. If `None`, state will not be
            persisted.

        proc_count: Number of processes to start.

        worker_count_per_proc: Number of worker threads to start on
            each process.

        mp_ctx: `multiprocessing` context to use. Use this to
            configure starting up subprocesses via spawn or
            fork. Defaults to spawn.

    """
    addresses = _gen_addresses(proc_count)
    with mp_ctx.Pool(processes=proc_count) as pool:
        futures = [
            pool.apply_async(
                cluster_main,
                (flow,),
                {
                    "epoch_config": epoch_config,
                    "recovery_config": recovery_config,
                    "addresses": addresses,
                    "proc_id": proc_id,
                    "worker_count_per_proc": worker_count_per_proc,
                },
            )
            for proc_id in range(proc_count)
        ]
        pool.close()

        for future in futures:
            # Will re-raise exceptions from subprocesses.
            future.get()

        pool.join()

Classes

class EpochConfig

Base class for an epoch config.

These define how epochs are assigned on source input data. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.

Use a specific subclass of this for the epoch definition you need.

Subclasses

class PeriodicEpochConfig (epoch_length)

Increment epochs at regular system time intervals.

This is the default with 10 second epoch intervals if no epoch_config is passed to your execution entry point.

Args

epoch_length : datetime.timedelta
System time length of each epoch.

Returns

Config object. Pass this as the epoch_config parameter of your execution entry point.

Ancestors

Instance variables

var epoch_length

Return an attribute of instance, which is of type owner.

class TestingEpochConfig

Use for deterministic epochs in tests. Increment epoch by 1 after each item.

This requires all workers to have exactly the same number of input items! Otherwise the dataflow will hang!

You almost assuredly do not want to use this unless you are writing tests of the recovery system.

Returns

Config object. Pass this as the epoch_config parameter of your execution entry point.

Ancestors