Skip to content

Commit

Permalink
Merge pull request #31 from bytewax/hostpath
Browse files Browse the repository at this point in the history
HostPath and Bytewax version 0.19.0
  • Loading branch information
miccioest authored Mar 22, 2024
2 parents 08b55fa + 47710eb commit 386a5a0
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 19 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ The command removes all the Kubernetes components associated with the chart and

| Parameter | Description | Default |
|-------------------------------------------|-----------------------------------------------|---------------------------------------------------------|
| `image.repository` | Image repository | `bytewax.docker.scarf.sh/bytewax/bytewax` |
| `image.tag` | Image tag | `0.18.1-python3.9` |
| `image.repository` | Image repository | `bytewax/bytewax` |
| `image.tag` | Image tag | `0.19.0-python3.9` |
| `image.pullPolicy` | Image pull policy | `Always` |
| `imagePullSecrets` | Image pull secrets | `[]` |
| `serviceAccount.create` | Create service account | `true` |
Expand Down Expand Up @@ -92,6 +92,8 @@ The command removes all the Kubernetes components associated with the chart and
| `configuration.recovery.persistence.finalizers` | PersistentVolumeClaim finalizers | `[ "kubernetes.io/pvc-protection" ]` |
| `configuration.recovery.persistence.extraPvcLabels` | Extra labels to apply to the PVC | `{}` |
| `configuration.recovery.persistence.storageClassName` | Type of persistent volume claim | `nil` |
| `configuration.recovery.persistence.hostPath.enabled` | Use hostPath instead of PersistentVolumeClaim | `false` |
| `configuration.recovery.persistence.hostPath.path` | Absolute path on the host to store recovery files | `` |
| `customOtlpUrl` | OTLP Endpoint URL | `` |
| `opentelemetry-collector.enabled` | Install OpenTelemetry Collector Helm Chart | `false` |
| `jaeger.enabled` | Install Jaeger Helm Chart | `false` |
Expand Down
4 changes: 2 additions & 2 deletions charts/bytewax/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: bytewax
description: A Helm chart for Bytewax
type: application
version: 0.8.0
appVersion: "0.18.1"
version: 0.8.1
appVersion: "0.19.0"
icon: https://bytewax.io/assets/img/favicon.png
dependencies:
- condition: opentelemetry-collector.enabled
Expand Down
Binary file modified charts/bytewax/examples.tar
Binary file not shown.
2 changes: 1 addition & 1 deletion charts/bytewax/examples/k8s_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, max, worker_index):
self.worker_index = worker_index
self.iterator = iter(range(max))

def next_batch(self, worker_index):
def next_batch(self):
time.sleep(1)
return [f"Worker: {self.worker_index} - {next(self.iterator)}"]

Expand Down
14 changes: 12 additions & 2 deletions charts/bytewax/examples/k8s_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,25 @@ def to_tuple(x):
# flow.output("out", DirOutput(output_dir, 5, assign_file=int))


# We are going to use Waxctl, you can download it from https://bytewax.io/downloads
# Run these commands in your terminal to run a cluster of two containers:
# To run this example using helm you need to run the following
# helm upgrade --install k8s-cluster \
# bytewax/bytewax \
# --set configuration.pythonFileName=k8s_cluster.py \
# --set configuration.processesCount=2 \
# --set configuration.configMap.files.tarName=examples.tar \
# --set configuration.keepAlive=true

# Also, you could use Waxctl to run the example. For that you can download it from https://bytewax.io/downloads
# And then, run these commands in your terminal to run a cluster of two containers:

# $ tar -C ./ -cvf cluster.tar examples
# $ waxctl dataflow deploy ./cluster.tar \
# --name k8s-cluster \
# --python-file-name examples/k8s_cluster.py \
# -p2 --debug --keep-alive=true --yes

# Regardless of how you have executed the example (helm CLI or waxctl):

# Each worker will read the files in
# ./examples/sample_data/cluster/*.txt which have lines like
# `ONE1`.
Expand Down
15 changes: 8 additions & 7 deletions charts/bytewax/examples/wikistream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
from bytewax.operators.window import SystemClockConfig, TumblingWindow
from bytewax.operators.window import SystemClockConfig, TumblingWindow, WindowMetadata


async def _sse_agen(url):
Expand All @@ -25,7 +25,7 @@ def __init__(self):
# Gather up to 0.25 sec of or 1000 items.
self._batcher = batch_async(agen, timedelta(seconds=0.25), 1000)

def next_batch(self, _sched: datetime) -> List[str]:
def next_batch(self) -> List[str]:
return next(self._batcher)

def snapshot(self) -> None:
Expand All @@ -36,7 +36,7 @@ class WikiSource(FixedPartitionedSource[str, None]):
def list_parts(self):
return ["single-part"]

def build_part(self, _now, _for_key, _resume_state):
def build_part(self, step_id, for_key, _resume_state):
return WikiPartition()


Expand All @@ -62,7 +62,10 @@ def get_server_name(data_dict):
# ("server.name", count_per_window)


def keep_max(max_count: Optional[int], new_count: int) -> Tuple[int, int]:
def keep_max(
max_count: Optional[int], new_window_count: Tuple[WindowMetadata, int]
) -> Tuple[Optional[int], int]:
_metadata, new_count = new_window_count
if max_count is None:
new_max = new_count
else:
Expand All @@ -71,9 +74,7 @@ def keep_max(max_count: Optional[int], new_count: int) -> Tuple[int, int]:
return (new_max, new_max)


max_count_per_window = op.stateful_map(
"keep_max", server_counts, lambda: None, keep_max
)
max_count_per_window = op.stateful_map("keep_max", server_counts, keep_max)
# ("server.name", max_per_window)


Expand Down
10 changes: 8 additions & 2 deletions charts/bytewax/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ spec:
- name: BYTEWAX_RECOVERY_DIRECTORY
value: /var/recovery
- name: BYTEWAX_SNAPSHOT_INTERVAL
value: {{ .Values.configuration.recovery.snapshotInterval }}
value: "{{ .Values.configuration.recovery.snapshotInterval }}"
- name: BYTEWAX_RECOVERY_BACKUP_INTERVAL
value: {{ .Values.configuration.recovery.backupInterval }}
value: "{{ .Values.configuration.recovery.backupInterval }}"
{{- end }}
{{- range $key, $value := .Values.env }}
- name: "{{ tpl $key $ }}"
Expand Down Expand Up @@ -236,7 +236,13 @@ spec:
{{- end }}
{{- if .Values.configuration.recovery.enabled }}
- name: recovery
{{- if .Values.configuration.recovery.persistence.hostPath.enabled }}
hostPath:
path: {{ .Values.configuration.recovery.persistence.hostPath.path | quote }}
type: DirectoryOrCreate
{{- else }}
persistentVolumeClaim:
claimName: {{ .Release.Name }}-recovery
{{- end }}
{{- end }}
{{- end }}
2 changes: 1 addition & 1 deletion charts/bytewax/templates/pvc.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- if .Values.configuration.recovery.enabled}}
{{- if and .Values.configuration.recovery.enabled (not .Values.configuration.recovery.persistence.hostPath.enabled) }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
Expand Down
12 changes: 12 additions & 0 deletions charts/bytewax/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "bytewax.serviceAccountName" . }}
{{- if not .Values.configuration.recovery.persistence.hostPath.enabled }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
{{- end }}
{{- if .Values.nodeSelector }}
nodeSelector:
{{ toYaml .Values.nodeSelector | indent 8 }}
Expand All @@ -45,6 +47,7 @@ spec:
initContainers:
{{- if .Values.configuration.recovery.enabled }}
- name: init-recovery
{{- if not .Values.configuration.recovery.persistence.hostPath.enabled }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand All @@ -53,6 +56,7 @@ spec:
drop:
- ALL
readOnlyRootFilesystem: true
{{- end }}
env:
- name: BYTEWAX_RECOVERY_PARTS
value: "{{ .Values.configuration.recovery.partsCount }}"
Expand Down Expand Up @@ -206,8 +210,10 @@ spec:
{{- end }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- if not .Values.configuration.recovery.persistence.hostPath.enabled }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
{{- end }}
volumeMounts:
{{- if .Values.api.enabled }}
- name: api-cache
Expand Down Expand Up @@ -276,7 +282,13 @@ spec:
{{- end }}
{{- if .Values.configuration.recovery.enabled }}
- name: recovery
{{- if .Values.configuration.recovery.persistence.hostPath.enabled }}
hostPath:
path: {{ .Values.configuration.recovery.persistence.hostPath.path | quote }}
type: DirectoryOrCreate
{{- else }}
persistentVolumeClaim:
claimName: {{ .Release.Name }}-recovery
{{- end }}
{{- end }}
{{- end }}
7 changes: 5 additions & 2 deletions charts/bytewax/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# Declare variables to be passed into your templates.

image:
repository: bytewax.docker.scarf.sh/bytewax/bytewax
repository: bytewax/bytewax
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
tag: "0.18.1-python3.9"
tag: "0.19.0-python3.9"

imagePullSecrets: []

Expand Down Expand Up @@ -136,6 +136,9 @@ configuration:
## ref: http://kubernetes.io/docs/user-guide/persistent-volumes/
##
persistence:
hostPath:
enabled: false
path: ""
# storageClassName: default
accessModes:
- ReadWriteOnce
Expand Down

0 comments on commit 386a5a0

Please sign in to comment.