Skip to content

Commit

Permalink
Merge pull request #29 from bytewax/metrics
Browse files Browse the repository at this point in the history
Adding kube-prometheus-stack and some fixes
  • Loading branch information
miccioest authored Feb 1, 2024
2 parents a28ef58 + 3837b97 commit 08b55fa
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 107 deletions.
71 changes: 67 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ These are the dependencies which are disabled by default:

- [open-telemetry/opentelemetry-collector](https://github.com/open-telemetry/opentelemetry-helm-charts/tree/main/charts/opentelemetry-collector)
- [jaegertracing/jaeger](https://github.com/jaegertracing/helm-charts/tree/main/charts/jaeger)
- [prometheus-community/kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack)

For more details about this, read [Telemetry](#telemetry) section.
For more details about this, read the [Observability](#observability) section.

## Uninstalling the Chart

Expand All @@ -46,7 +47,7 @@ The command removes all the Kubernetes components associated with the chart and
| Parameter | Description | Default |
|-------------------------------------------|-----------------------------------------------|---------------------------------------------------------|
| `image.repository` | Image repository | `bytewax.docker.scarf.sh/bytewax/bytewax` |
| `image.tag` | Image tag | `0.17.0-python3.9` |
| `image.tag` | Image tag | `0.18.1-python3.9` |
| `image.pullPolicy` | Image pull policy | `Always` |
| `imagePullSecrets` | Image pull secrets | `[]` |
| `serviceAccount.create` | Create service account | `true` |
Expand Down Expand Up @@ -83,6 +84,8 @@ The command removes all the Kubernetes components associated with the chart and
| `configuration.configMap.files.tarName` | Tar file to store in the ConfigMap to be created | `` |
| `configuration.recovery.enabled` | Enable Recovery | `false` |
| `configuration.recovery.partsCount` | Number of recovery parts | `1` |
| `configuration.recovery.snapshotInterval` | System time duration in seconds to snapshot state for recovery | `30` |
| `configuration.recovery.backupInterval` | System time duration in seconds to keep extra state snapshots around | `30` |
| `configuration.recovery.persistence.accessModes` | Persistence access modes | `[ReadWriteOnce]` |
| `configuration.recovery.persistence.size` | Size of persistent volume claim | `10Gi` |
| `configuration.recovery.persistence.annotations` | PersistentVolumeClaim annotations | `{}` |
Expand All @@ -92,6 +95,10 @@ The command removes all the Kubernetes components associated with the chart and
| `customOtlpUrl` | OTLP Endpoint URL | `` |
| `opentelemetry-collector.enabled` | Install OpenTelemetry Collector Helm Chart | `false` |
| `jaeger.enabled` | Install Jaeger Helm Chart | `false` |
| `kubePrometheusStack.enabled` | Install Prometheus Operator, Kube-Metrics, and Grafana | `false` |
| `podMonitor.enabled` | Use an existing Prometheus Operator instead of install a new one with `kubePrometheusStack.enabled` | `false` |
| `podMonitor.selector` | Labels to apply to the PodMonitor resource so the existing Prometheus Operator processes it. | `release: my-prometheus` |


### Example running basic.py obtained from a Configmap created by Helm

Expand Down Expand Up @@ -191,9 +198,11 @@ $ helm upgrade --install my-dataflow ./bytewax \
--set configuration.configMap.customName=my-configmap
```

## Tracing
## Observability

Bytewax is instrumented to offer observability of your dataflow. You can read more about it [here](https://bytewax.io/docs/deployment/metrics).

Bytewax is instrumented to offer observability of your dataflow. You can read more about it [here](https://bytewax.io/docs/getting-started/observability).
### Tracing

The Bytewax helm chart can install OpenTelemetry Collector and Jaeger both configured to work together with your dataflow traces.

Expand Down Expand Up @@ -259,6 +268,60 @@ customOtlpUrl: https://otlpcollector.myorganization.com:4317

In that case, you should keep `opentelemetry-collector.enabled` and `jaeger.enabled` with default values `false` because they are unnecessary.

### Metrics

The Bytewax helm chart can install the Prometheus Operator, Kube-Metris, and Grafana all configured to work together with your dataflow metrics.

With this simple configuration you will have your Dataflow metrics in the Prometheus database, and you can check them in the Grafana UI:

```yaml
kubePrometheusStack:
enabled: true
```

Following that example, to see the dataflow metrics in Grafana UI, you need to run this and then open `http://localhost:3000` in a web browser:

```bash
kubectl port-forward svc/<YOUR_RELEASE_NAME>-grafana 3000:80
```

You can change Prometheus Operator, Kube-Metrics, and Grafana sub-charts configuration nesting their values in `kube-prometheus-stack` or `kube-promethes-stack.kube-state-metrics`, and `kube-prometheus-stack.grafana` respectively. For example:

```yaml
kubePrometheusStack:
enabled: true
kube-prometheus-stack:
grafana:
replicas: 2
```

In case you want to instruct an existing Prometheus Operator to scrap the dataflow metrics, you just need to set the `podMonitor.enabled` field to `true` and configure the labels in `podMonitor.selector` in your values.

For example, if the existing Prometheus Operator looks for `PodMonitors` with the label `release=my-prometheus`, your settings should be like this:

```yaml
podMonitor:
enabled: true
selector:
release: my-prometheus
```

To check the actual labels looked at by your Prometheus Operator, run this command:

```bash
kubectl get prometheuses.monitoring.coreos.com --all-namespaces -o jsonpath="{.items[*].spec.podMonitorSelector}"
```

The output will be something similar to this:

```
{"matchLabels":{"release":"my-prometheus"}}
```

In case you use `podMonitor.enabled=true`, you should keep `kubePrometheusStack.enabled` with the default value `false` because it is unnecessary.


## How to securely reference secrets in your code

In my-workload.py:
Expand Down
9 changes: 6 additions & 3 deletions charts/bytewax/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
dependencies:
- name: opentelemetry-collector
repository: https://open-telemetry.github.io/opentelemetry-helm-charts
version: 0.36.2
version: 0.36.3
- name: jaeger
repository: https://jaegertracing.github.io/helm-charts
version: 0.62.1
digest: sha256:e802d990a09bd88e4ea4f30126c25439ffea95ec5c70af4c4eaf76f4eeece3b5
generated: "2022-10-17T15:11:11.549316385-03:00"
- name: kube-prometheus-stack
repository: https://prometheus-community.github.io/helm-charts
version: 56.2.1
digest: sha256:c930a7e7afc57cb42aa0d2cc589d1523b910e7e83cc6b0e445130d5689bac272
generated: "2024-01-30T08:27:06.827785793-03:00"
10 changes: 7 additions & 3 deletions charts/bytewax/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: bytewax
description: A Helm chart for Bytewax
type: application
version: 0.7.1
appVersion: "0.17.0"
version: 0.8.0
appVersion: "0.18.1"
icon: https://bytewax.io/assets/img/favicon.png
dependencies:
- condition: opentelemetry-collector.enabled
Expand All @@ -13,4 +13,8 @@ dependencies:
- condition: jaeger.enabled
name: jaeger
repository: https://jaegertracing.github.io/helm-charts
version: 0.62.*
version: 0.62.*
- condition: kubePrometheusStack.enabled
name: kube-prometheus-stack
repository: https://prometheus-community.github.io/helm-charts
version: 56.2.*
Binary file modified charts/bytewax/examples.tar
Binary file not shown.
29 changes: 16 additions & 13 deletions charts/bytewax/examples/k8s_basic.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
import bytewax.operators as op
from bytewax.testing import TestingSource
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.inputs import StatelessSource, DynamicInput
from bytewax.connectors.stdio import StdOutSink
from bytewax.inputs import (
DynamicSource,
StatelessSourcePartition,
)
import time

class NumberSource(StatelessSource):
class NumberSource(StatelessSourcePartition):
def __init__(self, max, worker_index):
self.worker_index = worker_index
self.iterator = iter(range(max))

def next(self):
def next_batch(self, worker_index):
time.sleep(1)
return f"Worker: {self.worker_index} - {next(self.iterator)}"
return [f"Worker: {self.worker_index} - {next(self.iterator)}"]

def close(self):
pass


class NumberInput(DynamicInput):
class NumberInput(DynamicSource):
def __init__(self, max):
self.max = max

def build(self, worker_index, worker_count):
return NumberSource(self.max, worker_index)

def build(self, _now, worker_index, worker_count):
return NumberSource(max=self.max, worker_index=worker_index)

flow = Dataflow()
flow.input("inp", NumberInput(100))
flow.output("out", StdOutput())
flow = Dataflow("k8s_basic")
out = op.input("inp1", flow, NumberInput(100))
op.output("out", out, StdOutSink())
59 changes: 39 additions & 20 deletions charts/bytewax/examples/k8s_cluster.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,66 @@
import os
from pathlib import Path

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput

input_dir = Path("./sample_data/cluster/")
output_dir = Path("./cluster_out/")
import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.connectors.files import (
DirSink,
DirSource,
)

def to_tuple(x):
return tuple(map(str, x.split(',')))

flow = Dataflow()
flow.input("inp", DirInput(input_dir))
flow.map(str.upper)
flow.map(to_tuple)
flow.output("out", DirOutput(output_dir, 5, assign_file=int))
flow = Dataflow("k8s_cluster")
inp1 = op.input("inp", flow, DirSource(Path("./sample_data/cluster")))
inp2 = op.map("upper", inp1, str.upper)
out = op.map("tuple", inp2, to_tuple)
op.output("out1", out, DirSink(Path("./cluster_out/"), 2, assign_file=int))
op.output("out2", out, StdOutSink())


# import os
# from pathlib import Path

# from bytewax.dataflow import Dataflow
# from bytewax.connectors.stdio import StdOutput
# from bytewax.connectors.files import DirInput, DirOutput, FileInput, FileOutput

# input_dir = Path("./sample_data/cluster/")
# output_dir = Path("./cluster_out/")

# def to_tuple(x):
# return tuple(map(str, x.split(',')))

# flow = Dataflow()
# flow.input("inp", DirInput(input_dir))
# flow.map(str.upper)
# flow.map(to_tuple)
# flow.output("out", DirOutput(output_dir, 5, assign_file=int))


# We are going to use Waxctl, you can download it from https://bytewax.io/downloads
# Run these commands in your terminal to run a cluster of two containers:

# $ tar -C ./ -cvf cluster.tar examples
# $ waxctl dataflow deploy ./cluster.tar --name k8s-cluster --python-file-name examples/k8s_cluster.py -p2
# $ waxctl dataflow deploy ./cluster.tar \
# --name k8s-cluster \
# --python-file-name examples/k8s_cluster.py \
# -p2 --debug --keep-alive=true --yes

# Each worker will read the files in
# ./examples/sample_data/cluster/*.txt which have lines like
# `one1`.
# `ONE1`.

# They will then both finish and you'll see ./cluster_out/part_0
# and ./cluster_out/part_1 with the data that each process in the
# cluster wrote with the lines uppercased.

# To see that files in each container you can run these commands:

# kubectl exec -it k8s-cluster-0 -cprocess -- cat /var/bytewax/cluster_out/part_0.out
# kubectl exec -it k8s-cluster-1 -cprocess -- cat /var/bytewax/cluster_out/part_1.out
# kubectl exec -it k8s-cluster-0 -cprocess -- cat /var/bytewax/cluster_out/part_0
# kubectl exec -it k8s-cluster-1 -cprocess -- cat /var/bytewax/cluster_out/part_1

# You could imagine reading from / writing to separate Kafka
# partitions, S3 blobs, etc.

# When using `cluster_main()` you have to coordinate ensuring each
# process knows the address of all other processes in the cluster
# and their unique process ID. You can address that easily by deploying your
# dataflow program using Waxctl or installing the Bytewax Helm Chart
# cluster_main(flow, recovery_config=recovery_config, **parse.proc_env())
13 changes: 7 additions & 6 deletions charts/bytewax/examples/simple.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# ./simple.py
import bytewax.operators as op
from bytewax.testing import TestingSource
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.stdio import StdOutSink

flow = Dataflow()
flow.input("inp", TestingInput(range(99999999)))
flow.map(lambda item: item + 1)
flow.output("out", StdOutput())
flow = Dataflow("simple")

out = op.input("inp1", flow, TestingSource(range(99999999)))
op.output("out", out, StdOutSink())
18 changes: 12 additions & 6 deletions charts/bytewax/examples/simple_slow.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
# ./simple.py
import bytewax.operators as op
from bytewax.testing import TestingSource
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingInput
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.stdio import StdOutSink
import time

def slow_inc(x):
time.sleep(5)
return x + 1

flow = Dataflow()
flow.input("inp", TestingInput(range(99999999)))
flow.map(slow_inc)
flow.output("out", StdOutput())
flow = Dataflow("simple")

inp = op.input("inp1", flow, TestingSource(range(99999999)))
out = op.map("slow", inp, slow_inc)
op.output("out", out, StdOutSink())




Loading

0 comments on commit 08b55fa

Please sign in to comment.