bytewax.outputs#

Low-level output interfaces.

If you want pre-built connectors for various external systems, see bytewax.connectors. That is also a rich source of examples.

Data#

X: TypeVar#

Type consumed by a Sink.

S: TypeVar#

Type of state snapshots.

Classes#

class Sink#
Bases:

A destination to write output items.

Base class for all output sinks. Do not subclass this.

If you want to implement a custom connector, instead subclass one of the specific sink sub-types below in this module.

class StatefulSinkPartition#
Bases:

Output partition that maintains state of its position.

abstract write_batch(values: List[X]) None#

Write a batch of output values.

Called with a list of values for each (key, value) at this point in the dataflow.

See FixedPartitionedSink.part_fn for how the key is mapped to partition.

Parameters:

values – Values in the dataflow. Non-deterministically batched.

abstract snapshot() S#

Snapshot the position of the next write of this partition.

This will be returned to you via the resume_state parameter of FixedPartitionedSink.build_part.

Be careful of “off by one” errors in resume state. This should return a state that, when built into a partition, resumes writing after the last written item, not overwriting the same item.

This is guaranteed to never be called after close.

Returns:

Resume state.

close() None#

Cleanup this partition when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

class FixedPartitionedSink#
Bases:

An output sink with a fixed number of independent partitions.

Will maintain the state of each partition and re-build using it during resume. If the sink supports seeking and overwriting, this output can support exactly-once processing.

abstract list_parts() List[str]#

List all local partitions this worker has access to.

You do not need to list all partitions globally.

Returns:

Local partition keys.

part_fn(item_key: str) int#

Route incoming (key, value) pairs to partitions.

Defaults to zlib.adler32 as a simple consistent function.

This must be globally consistent across workers and executions and return the same hash on every call.

A specific partition is chosen by wrapped indexing this value into the ordered global set of partitions. (Not just partitions local to this worker.)

Caution

Do not use Python’s built in hash function here! It is not consistent between processes by default and using it will cause incorrect partitioning in cluster executions.

Parameters:

item_key – Key for the value that is about to be written.

Returns:

Integer hash value that is used to assign partition.

abstract build_part(
step_id: str,
for_part: str,
resume_state: Optional[S],
) StatefulSinkPartition[X, S]#

Build anew or resume an output partition.

Will be called once per execution for each partition key on a worker that reported that partition was local in list_parts.

Do not pre-build state about a partition in the constructor. All state must be derived from resume_state for recovery to work properly.

Parameters:
  • step_id – The step_id of the output operator.

  • for_part – Which partition to build. Will always be one of the keys returned by list_parts on this worker.

  • resume_state – State data containing where in the output stream this partition should be begin writing during this execution.

Returns:

The built partition.

class StatelessSinkPartition#
Bases:

Output partition that is stateless.

abstract write_batch(items: List[X]) None#

Write a batch of output items.

Called multiple times whenever new items are seen at this point in the dataflow.

Parameters:

items – Items in the dataflow. Non-deterministically batched.

close() None#

Cleanup this partition when the dataflow completes.

This is not guaranteed to be called. It will only be called when the dataflow finishes on finite input. It will not be called during an abrupt or abort shutdown.

class DynamicSink#
Bases:

An output sink where all workers write items concurrently.

Does not support storing any resume state. Thus these kind of outputs only naively can support at-least-once processing.

abstract build(
step_id: str,
worker_index: int,
worker_count: int,
) StatelessSinkPartition[X]#

Build an output partition for a worker.

Will be called once on each worker.

Parameters:
  • step_id – The step_id of the output operator.

  • worker_index – Index of this worker.

  • worker_count – Total number of workers.

Returns:

The built partition.

Join our community Slack channel

Need some help? Join our community!

If you have any trouble with the process or have ideas about how to improve this document, come talk to us in the #questions-answered Slack channel!

Join now