Skip to content

Commit

Permalink
Merge pull request #19 from bytewax/bytewax-v0.16
Browse files Browse the repository at this point in the history
Bytewax to 0.16.1 and allowing disable api
  • Loading branch information
miccioest authored May 18, 2023
2 parents c1d6566 + 83eadd6 commit e381cc3
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 171 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The command removes all the Kubernetes components associated with the chart and
| Parameter | Description | Default |
|-------------------------------------------|-----------------------------------------------|---------------------------------------------------------|
| `image.repository` | Image repository | `bytewax.docker.scarf.sh/bytewax/bytewax` |
| `image.tag` | Image tag | `0.15.1-python3.9` |
| `image.tag` | Image tag | `0.16.1-python3.9` |
| `image.pullPolicy` | Image pull policy | `Always` |
| `imagePullSecrets` | Image pull secrets | `[]` |
| `serviceAccount.create` | Create service account | `true` |
Expand All @@ -59,7 +59,9 @@ The command removes all the Kubernetes components associated with the chart and
| `containerName` | Statefulset/Job application container name | `process`|
| `securityContext` | Statefulset/Job containers securityContext | `{"allowPrivilegeEscalation": false, "capabilities": {"drop": ["ALL"], "add": ["NET_BIND_SERVICE"]}, "readOnlyRootFilesystem": true }`|
| `service.port` | Kubernetes port where intertal bytewax service is exposed | `9999` |
| `api.enabled` | Create resources related to dataflow api | `true` |
| `api.port` | Kubernetes port where dataflow api service is exposed | `3030` |
| `api.cacheport` | Kubernetes port where dataflow api cache service is exposed | `3033` |
| `resources` | CPU/Memory resource requests/limits | `{}` |
| `nodeSelector` | Node labels for pod assignment | `{}` |
| `tolerations` | Toleration labels for pod assignment | `[]` |
Expand Down
4 changes: 2 additions & 2 deletions charts/bytewax/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: bytewax
description: A Helm chart for Bytewax
type: application
version: 0.5.2
appVersion: "0.15.1"
version: 0.6.0
appVersion: "0.16.1"
icon: https://bytewax.io/assets/img/favicon.png
dependencies:
- condition: opentelemetry-collector.enabled
Expand Down
Binary file modified charts/bytewax/examples.tar
Binary file not shown.
40 changes: 22 additions & 18 deletions charts/bytewax/examples/k8s_basic.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
from bytewax.dataflow import Dataflow
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import ManualOutputConfig
from bytewax import parse
from bytewax.connectors.stdio import StdOutput
from bytewax.inputs import StatelessSource, DynamicInput
import time

def input_builder(worker_index, worker_count, resume_epoch):
# Ignore state recovery here
state = None
for i in range(100):
class NumberSource(StatelessSource):
def __init__(self, max, worker_index):
self.worker_index = worker_index
self.iterator = iter(range(max))

def next(self):
time.sleep(1)
yield state, i
return f"Worker: {self.worker_index} - {next(self.iterator)}"

def output_builder(worker_index, worker_count):
def output_handler(item):
print(f"worker: {worker_index} - item: {item}")
return output_handler
def close(self):
pass

flow = Dataflow()
flow.input("inp", ManualInputConfig(input_builder))
flow.capture(ManualOutputConfig(output_builder))

if __name__ == "__main__":
cluster_main(flow, **parse.proc_env())
class NumberInput(DynamicInput):
def __init__(self, max):
self.max = max

def build(self, worker_index, worker_count):
return NumberSource(self.max, worker_index)


flow = Dataflow()
flow.input("inp", NumberInput(100))
flow.output("out", StdOutput())
103 changes: 28 additions & 75 deletions charts/bytewax/examples/k8s_cluster.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,47 @@
import os
from pathlib import Path

from bytewax import parse
from bytewax.dataflow import Dataflow
from bytewax.execution import cluster_main
from bytewax.inputs import distribute, ManualInputConfig
from bytewax.outputs import ManualOutputConfig
from bytewax.recovery import SqliteRecoveryConfig
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput

input_dir = Path("./examples/sample_data/cluster/")
input_dir = Path("./sample_data/cluster/")
output_dir = Path("./cluster_out/")
recovery_dir = Path("./cluster_recovery/")

# to see more on recovery with this example, see "examples/manual_cluster.py"
recovery_dir.mkdir(exist_ok=True)
recovery_config = SqliteRecoveryConfig(recovery_dir)


def input_builder(worker_index, worker_count, resume_state):
print(f"Worker {worker_index} resuming with state: {resume_state}")
# Fill in a default resume state if we have None. The resume state
# will be a dict from path to line number to start reading at.
state = resume_state or {}
# List all the input partitions in the reading directory.
all_partitions = input_dir.glob("*.txt")
# Then have this worker only read every `n` files so each worker
# will read a disjoint set.
this_worker_partitions = distribute(all_partitions, worker_index, worker_count)
# Open all the ones that this worker should read.
for path in this_worker_partitions:
with open(path) as f:
for i, line in enumerate(f):
# If we're resuming, skip ahead to the line for this
# file in the state.
if i < state.get(path, 0):
continue
# Since the file has just read the current line as
# part of the for loop, note that on resume we should
# start reading from the next line.
state[path] = i + 1
# Now send them into the dataflow on this worker.
yield state, line.strip()
print(f"Worker {worker_index} input state: {state}")


def output_builder(worker_index, worker_count):
output_dir.mkdir(exist_ok=True)
# Open a file that just this worker will write to.
write_to = open(output_dir / f"worker{worker_index}.out", "a")
# Build a function that can be called for each captured output.
def write(item):
write_to.write(f"{item}\n")

# Return it so Bytewax will run it whenever an item is seen by a
# capture operator.
return write

def to_tuple(x):
return tuple(map(str, x.split(',')))

flow = Dataflow()
flow.input("inp", ManualInputConfig(input_builder))
flow.input("inp", DirInput(input_dir))
flow.map(str.upper)
flow.capture(ManualOutputConfig(output_builder))

flow.map(to_tuple)
flow.output("out", DirOutput(output_dir, 5, assign_file=int))


if __name__ == "__main__":
# We are going to use Waxctl, you can download it from https://bytewax.io/downloads
# Run these commands in your terminal to run a cluster of two containers:
# We are going to use Waxctl, you can download it from https://bytewax.io/downloads
# Run these commands in your terminal to run a cluster of two containers:

# $ tar -C ./ -cvf cluster.tar examples
# $ waxctl dataflow deploy ./cluster.tar --name k8s-cluster --python-file-name examples/k8s_cluster.py -p2
# $ tar -C ./ -cvf cluster.tar examples
# $ waxctl dataflow deploy ./cluster.tar --name k8s-cluster --python-file-name examples/k8s_cluster.py -p2

# Each worker will read the files in
# ./examples/sample_data/cluster/*.txt which have lines like
# `one1`.
# Each worker will read the files in
# ./examples/sample_data/cluster/*.txt which have lines like
# `one1`.

# They will then both finish and you'll see ./cluster_out/0.out
# and ./cluster_out/1.out with the data that each process in the
# cluster wrote with the lines uppercased.
# They will then both finish and you'll see ./cluster_out/part_0
# and ./cluster_out/part_1 with the data that each process in the
# cluster wrote with the lines uppercased.

# To see that files in each container you can run these commands:
# To see that files in each container you can run these commands:

# kubectl exec -it k8s-cluster-0 -cprocess -- cat /var/bytewax/cluster_out/0.out
# kubectl exec -it k8s-cluster-1 -cprocess -- cat /var/bytewax/cluster_out/1.out
# kubectl exec -it k8s-cluster-0 -cprocess -- cat /var/bytewax/cluster_out/part_0.out
# kubectl exec -it k8s-cluster-1 -cprocess -- cat /var/bytewax/cluster_out/part_1.out

# You could imagine reading from / writing to separate Kafka
# partitions, S3 blobs, etc.
# You could imagine reading from / writing to separate Kafka
# partitions, S3 blobs, etc.

# When using `cluster_main()` you have to coordinate ensuring each
# process knows the address of all other processes in the cluster
# and their unique process ID. You can address that easily by deploying your
# dataflow program using Waxctl or installing the Bytewax Helm Chart
cluster_main(flow, recovery_config=recovery_config, **parse.proc_env())
# When using `cluster_main()` you have to coordinate ensuring each
# process knows the address of all other processes in the cluster
# and their unique process ID. You can address that easily by deploying your
# dataflow program using Waxctl or installing the Bytewax Helm Chart
# cluster_main(flow, recovery_config=recovery_config, **parse.proc_env())
12 changes: 6 additions & 6 deletions charts/bytewax/examples/sample_data/cluster/partition-1.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
one1
one2
one3
one4
one5
one6
1,one1
2,one2
3,one3
4,one4
5,one5
6,one6
12 changes: 6 additions & 6 deletions charts/bytewax/examples/sample_data/cluster/partition-2.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
two1
two2
two3
two4
two5
two6
1,two1
2,two2
3,two3
4,two4
5,two5
6,two6
12 changes: 6 additions & 6 deletions charts/bytewax/examples/sample_data/cluster/partition-3.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
three1
three2
three3
three4
three5
three6
1,hree1
2,three2
3,three3
4,three4
5,three5
6,three6
12 changes: 6 additions & 6 deletions charts/bytewax/examples/sample_data/cluster/partition-4.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
four1
four2
four3
four4
four5
four6
1,four1
2,four2
3,four3
4,four4
5,four5
6,four6
12 changes: 6 additions & 6 deletions charts/bytewax/examples/sample_data/cluster/partition-5.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
five1
five2
five3
five4
five5
five6
1,five1
2,five2
3,five3
4,five4
5,five5
6,five6
72 changes: 30 additions & 42 deletions charts/bytewax/examples/wikistream.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
import json
import os
import operator
from datetime import timedelta

# pip install sseclient-py urllib3
import sseclient
import urllib3

from bytewax import parse
from bytewax.connectors.stdio import StdOutput
from bytewax.dataflow import Dataflow
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import StdOutputConfig
from bytewax.recovery import SqliteRecoveryConfig
from bytewax.window import SystemClockConfig, TumblingWindowConfig
from bytewax.tracing import setup_tracing, OtlpTracingConfig

tracer = setup_tracing(
tracing_config=OtlpTracingConfig(
url=os.getenv("BYTEWAX_OTLP_URL", "grpc://127.0.0.1:4317"),
service_name="Tracing-example",
),
log_level="TRACE",
)
from bytewax.inputs import PartitionedInput, StatefulSource
from bytewax.window import SessionWindow, SystemClockConfig


def input_builder(worker_index, worker_count, resume_state):
# Multiple SSE connections will duplicate the streams, so only
# have the first worker generate input.
if worker_index == 0:
class WikiSource(StatefulSource):
def __init__(self):
pool = urllib3.PoolManager()
resp = pool.request(
"GET",
"https://stream.wikimedia.org/v2/stream/recentchange/",
preload_content=False,
headers={"Accept": "text/event-stream"},
)
client = sseclient.SSEClient(resp)
self.client = sseclient.SSEClient(resp)
self.events = self.client.events()

def next(self):
return next(self.events).data

def snapshot(self):
return None

def close(self):
self.client.close()

# Since there is no way to replay missed SSE data, we're going
# to drop missed data. That's fine as long as we know to
# interpret the results with that in mind.
for event in client.events():
yield (None, event.data)

class WikiStreamInput(PartitionedInput):
def list_parts(self):
return {"single-part"}

def build_part(self, for_key, resume_state):
assert for_key == "single-part"
assert resume_state is None
return WikiSource()


def initial_count(data_dict):
Expand All @@ -54,7 +54,7 @@ def keep_max(max_count, new_count):


flow = Dataflow()
flow.input("inp", ManualInputConfig(input_builder))
flow.input("inp", WikiStreamInput())
# "event_json"
flow.map(json.loads)
# {"server_name": "server.name", ...}
Expand All @@ -63,22 +63,10 @@ def keep_max(max_count, new_count):
flow.reduce_window(
"sum",
SystemClockConfig(),
TumblingWindowConfig(length=timedelta(seconds=2)),
SessionWindow(gap=timedelta(seconds=2)),
operator.add,
)
# ("server.name", sum_per_window)
flow.stateful_map(
"keep_max",
lambda: 0,
keep_max,
)
flow.stateful_map("keep_max", lambda: 0, keep_max)
# ("server.name", max_per_window)
flow.capture(StdOutputConfig())


if __name__ == "__main__":
cluster_main(
flow,
recovery_config=SqliteRecoveryConfig("."),
**parse.proc_env()
)
flow.output("out", StdOutput())
Loading

0 comments on commit e381cc3

Please sign in to comment.