Module bytewax.execution
How to execute your dataflows.
Run an instantiated Dataflow
using one of the entry
point functions in this module.
Epoch Configs
Epochs define the granularity of recovery in a bytewax dataflow. By default, we snapshot recovery every 10 seconds. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.
Expand source code
"""How to execute your dataflows.
Run an instantiated `bytewax.dataflow.Dataflow` using one of the entry
point functions in this module.
Epoch Configs
-------------
Epochs define the granularity of recovery in a bytewax dataflow. By default, we
snapshot recovery every 10 seconds. You should only need to set this if you are
testing the recovery system or are doing deep exactly-once integration work. Changing
this does not change the semantics of any of the operators.
"""
from typing import Any, Iterable, List, Optional, Tuple
from multiprocess import get_context
from bytewax.dataflow import Dataflow
from bytewax.recovery import RecoveryConfig
from .bytewax import ( # noqa: F401
cluster_main,
EpochConfig,
PeriodicEpochConfig,
run_main,
TestingEpochConfig,
)
# Due to our package structure, we need to define __all__
# in any submodule as pdoc will not find the documentation
# for functions imported here, but defined in another submodule.
# See https://pdoc3.github.io/pdoc/doc/pdoc/#what-objects-are-documented
# for more information.
__all__ = [
"run_main",
"cluster_main",
"spawn_cluster",
"EpochConfig",
"PeriodicEpochConfig",
"TestingEpochConfig",
]
def _gen_addresses(proc_count: int) -> Iterable[str]:
return [f"localhost:{proc_id + 2101}" for proc_id in range(proc_count)]
def spawn_cluster(
flow: Dataflow,
*,
epoch_config: Optional[EpochConfig] = None,
recovery_config: Optional[RecoveryConfig] = None,
proc_count: int = 1,
worker_count_per_proc: int = 1,
mp_ctx=get_context("spawn"),
) -> List[Tuple[int, Any]]:
"""Execute a dataflow as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you and handles connecting them
together. You'd commonly use this for notebook analysis that needs
parallelism and higher throughput, or simple stand-alone demo
programs.
>>> from bytewax.testing import doctest_ctx
>>> from bytewax.dataflow import Dataflow
>>> from bytewax.inputs import TestingInputConfig
>>> from bytewax.outputs import StdOutputConfig
>>> flow = Dataflow()
>>> flow.input("inp", TestingInputConfig(range(3)))
>>> flow.capture(StdOutputConfig())
>>> spawn_cluster(
... flow,
... proc_count=2,
... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this.
... ) # doctest: +ELLIPSIS
(...)
See `bytewax.run_main()` for a way to test input and output
builders without the complexity of starting a cluster.
See `bytewax.cluster_main()` for starting one process in a cluster
in a distributed situation.
Args:
flow: Dataflow to run.
epoch_config: A custom epoch config. You probably don't need
this. See `EpochConfig` for more info.
recovery_config: State recovery config. See
`bytewax.recovery`. If `None`, state will not be
persisted.
proc_count: Number of processes to start.
worker_count_per_proc: Number of worker threads to start on
each process.
mp_ctx: `multiprocessing` context to use. Use this to
configure starting up subprocesses via spawn or
fork. Defaults to spawn.
"""
addresses = _gen_addresses(proc_count)
with mp_ctx.Pool(processes=proc_count) as pool:
futures = [
pool.apply_async(
cluster_main,
(flow,),
{
"epoch_config": epoch_config,
"recovery_config": recovery_config,
"addresses": addresses,
"proc_id": proc_id,
"worker_count_per_proc": worker_count_per_proc,
},
)
for proc_id in range(proc_count)
]
pool.close()
for future in futures:
# Will re-raise exceptions from subprocesses.
future.get()
pool.join()
Functions
def cluster_main(flow, addresses, proc_id, *, epoch_config, recovery_config, worker_count_per_proc)
-
Execute a dataflow in the current process as part of a cluster.
You have to coordinate starting up all the processes in the cluster and ensuring they each are assigned a unique ID and know the addresses of other processes. You'd commonly use this for starting processes as part of a Kubernetes cluster.
Blocks until execution is complete.
>>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> addresses = [] # In a real example, you'd find the "host:port" of all other Bytewax workers. >>> proc_id = 0 # In a real example, you'd assign each worker a distinct ID from 0..proc_count. >>> cluster_main(flow, addresses, proc_id) 0 1 2
See
bytewax.run_main()
for a way to test input and output builders without the complexity of starting a cluster.See
bytewax.spawn_cluster()
for starting a simple cluster locally on one machine.Args
flow
- Dataflow to run.
addresses
- List of host/port addresses for all processes in this cluster (including this one).
proc_id
- Index of this process in cluster; starts from 0.
epoch_config
- A custom epoch config. You probably don't need
this. See
EpochConfig
for more info. recovery_config
- State recovery config. See
bytewax.recovery
. IfNone
, state will not be persisted. worker_count_per_proc
- Number of worker threads to start on each process.
def run_main(flow, *, epoch_config, recovery_config)
-
Execute a dataflow in the current thread.
Blocks until execution is complete.
You'd commonly use this for prototyping custom input and output builders with a single worker before using them in a cluster setting.
>>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> run_main(flow) 0 1 2
See
bytewax.spawn_cluster()
for starting a cluster on this machine with full control over inputs and outputs.Args
flow
- Dataflow to run.
epoch_config
- A custom epoch config. You probably don't need
this. See
EpochConfig
for more info. recovery_config
- State recovery config. See
bytewax.recovery
. IfNone
, state will not be persisted.
def spawn_cluster(flow: Dataflow, *, epoch_config: Optional[EpochConfig] = None, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=<multiprocess.context.SpawnContext object>) ‑> List[Tuple[int, Any]]
-
Execute a dataflow as a cluster of processes on this machine.
Blocks until execution is complete.
Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs.
>>> from bytewax.testing import doctest_ctx >>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> spawn_cluster( ... flow, ... proc_count=2, ... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this. ... ) # doctest: +ELLIPSIS (...)
See
bytewax.run_main()
for a way to test input and output builders without the complexity of starting a cluster.See
bytewax.cluster_main()
for starting one process in a cluster in a distributed situation.Args
flow
- Dataflow to run.
epoch_config
- A custom epoch config. You probably don't need
this. See
EpochConfig
for more info. recovery_config
- State recovery config. See
bytewax.recovery
. IfNone
, state will not be persisted. proc_count
- Number of processes to start.
worker_count_per_proc
- Number of worker threads to start on each process.
mp_ctx
multiprocessing
context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn.
Expand source code
def spawn_cluster( flow: Dataflow, *, epoch_config: Optional[EpochConfig] = None, recovery_config: Optional[RecoveryConfig] = None, proc_count: int = 1, worker_count_per_proc: int = 1, mp_ctx=get_context("spawn"), ) -> List[Tuple[int, Any]]: """Execute a dataflow as a cluster of processes on this machine. Blocks until execution is complete. Starts up cluster processes for you and handles connecting them together. You'd commonly use this for notebook analysis that needs parallelism and higher throughput, or simple stand-alone demo programs. >>> from bytewax.testing import doctest_ctx >>> from bytewax.dataflow import Dataflow >>> from bytewax.inputs import TestingInputConfig >>> from bytewax.outputs import StdOutputConfig >>> flow = Dataflow() >>> flow.input("inp", TestingInputConfig(range(3))) >>> flow.capture(StdOutputConfig()) >>> spawn_cluster( ... flow, ... proc_count=2, ... mp_ctx=doctest_ctx, # Outside a doctest, you'd skip this. ... ) # doctest: +ELLIPSIS (...) See `bytewax.run_main()` for a way to test input and output builders without the complexity of starting a cluster. See `bytewax.cluster_main()` for starting one process in a cluster in a distributed situation. Args: flow: Dataflow to run. epoch_config: A custom epoch config. You probably don't need this. See `EpochConfig` for more info. recovery_config: State recovery config. See `bytewax.recovery`. If `None`, state will not be persisted. proc_count: Number of processes to start. worker_count_per_proc: Number of worker threads to start on each process. mp_ctx: `multiprocessing` context to use. Use this to configure starting up subprocesses via spawn or fork. Defaults to spawn. """ addresses = _gen_addresses(proc_count) with mp_ctx.Pool(processes=proc_count) as pool: futures = [ pool.apply_async( cluster_main, (flow,), { "epoch_config": epoch_config, "recovery_config": recovery_config, "addresses": addresses, "proc_id": proc_id, "worker_count_per_proc": worker_count_per_proc, }, ) for proc_id in range(proc_count) ] pool.close() for future in futures: # Will re-raise exceptions from subprocesses. future.get() pool.join()
Classes
class EpochConfig
-
Base class for an epoch config.
These define how epochs are assigned on source input data. You should only need to set this if you are testing the recovery system or are doing deep exactly-once integration work. Changing this does not change the semantics of any of the operators.
Use a specific subclass of this for the epoch definition you need.
Subclasses
class PeriodicEpochConfig (epoch_length)
-
Increment epochs at regular system time intervals.
This is the default with 10 second epoch intervals if no
epoch_config
is passed to your execution entry point.Args
epoch_length
:datetime.timedelta
- System time length of each epoch.
Returns
Config object. Pass this as the
epoch_config
parameter of your execution entry point.Ancestors
Instance variables
var epoch_length
-
Return an attribute of instance, which is of type owner.
class TestingEpochConfig
-
Use for deterministic epochs in tests. Increment epoch by 1 after each item.
This requires all workers to have exactly the same number of input items! Otherwise the dataflow will hang!
You almost assuredly do not want to use this unless you are writing tests of the recovery system.
Returns
Config object. Pass this as the
epoch_config
parameter of your execution entry point.Ancestors