Skip to content

Commit

Permalink
Merge pull request #32 from bytewax/install-dependencies
Browse files Browse the repository at this point in the history
Adding dependencies init-container
  • Loading branch information
miccioest authored Apr 30, 2024
2 parents 386a5a0 + 3c7dbab commit c0a615a
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 4 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The command removes all the Kubernetes components associated with the chart and
| Parameter | Description | Default |
|-------------------------------------------|-----------------------------------------------|---------------------------------------------------------|
| `image.repository` | Image repository | `bytewax/bytewax` |
| `image.tag` | Image tag | `0.19.0-python3.9` |
| `image.tag` | Image tag | `0.19.1-python3.9` |
| `image.pullPolicy` | Image pull policy | `Always` |
| `imagePullSecrets` | Image pull secrets | `[]` |
| `serviceAccount.create` | Create service account | `true` |
Expand All @@ -74,6 +74,7 @@ The command removes all the Kubernetes components associated with the chart and
| `extraSecretMounts` | Secret mounts to get secrets from files instead of env vars | `[]` |
| `extraVolumeMounts` | Additional volume mounts | `[]` |
| `configuration.pythonFileName` | Path of the python file to run | `simple.py` |
| `configuration.dependencies ` | List of the python dependencies needed | `[]` |
| `configuration.processesCount` | Number of concurrent processes to run | `1` |
| `configuration.workersPerProcess` | Number of workers per process | `1` |
| `configuration.jobMode` | Create a kubernetes Job resource instead of a Statefulset (use this for batch processing) - Kubernetes version required: 1.24 or superior | `false` |
Expand Down
4 changes: 2 additions & 2 deletions charts/bytewax/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: bytewax
description: A Helm chart for Bytewax
type: application
version: 0.8.1
appVersion: "0.19.0"
version: 0.9.0
appVersion: "0.19.1"
icon: https://bytewax.io/assets/img/favicon.png
dependencies:
- condition: opentelemetry-collector.enabled
Expand Down
145 changes: 145 additions & 0 deletions charts/bytewax/examples/orderbook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import json
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Dict, List, Optional


import websockets
from bytewax import operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async


async def _ws_agen(product_id):
url = "wss://ws-feed.exchange.coinbase.com"
async with websockets.connect(url) as websocket:
msg = json.dumps(
{
"type": "subscribe",
"product_ids": [product_id],
"channels": ["level2_batch"],
}
)
await websocket.send(msg)
# The first msg is just a confirmation that we have subscribed.
await websocket.recv()

while True:
msg = await websocket.recv()
yield (product_id, json.loads(msg))


class CoinbasePartition(StatefulSourcePartition):
def __init__(self, product_id):
agen = _ws_agen(product_id)
self._batcher = batch_async(agen, timedelta(seconds=0.5), 100)

def next_batch(self):
return next(self._batcher)

def snapshot(self):
return None


@dataclass
class CoinbaseSource(FixedPartitionedSource):
product_ids: List[str]

def list_parts(self):
return self.product_ids

def build_part(self, step_id, for_key, _resume_state):
return CoinbasePartition(for_key)


@dataclass(frozen=True)
class OrderBookSummary:
bid_price: float
bid_size: float
ask_price: float
ask_size: float
spread: float


@dataclass
class OrderBookState:
bids: Dict[float, float] = field(default_factory=dict)
asks: Dict[float, float] = field(default_factory=dict)
bid_price: Optional[float] = None
ask_price: Optional[float] = None

def update(self, data):
# Initialize bids and asks if they're empty
if not self.bids:
self.bids = {float(price): float(size) for price, size in data["bids"]}
self.bid_price = max(self.bids.keys(), default=None)
if not self.asks:
self.asks = {float(price): float(size) for price, size in data["asks"]}
self.ask_price = min(self.asks.keys(), default=None)

# Process updates from the "changes" field in the data
for change in data.get("changes", []):
side, price_str, size_str = change
price, size = float(price_str), float(size_str)

target_dict = self.asks if side == "sell" else self.bids

# If size is zero, remove the price level; otherwise, update/add the price level
if size == 0.0:
target_dict.pop(price, None)
else:
target_dict[price] = size

# After update, recalculate the best bid and ask prices
if side == "sell":
self.ask_price = min(self.asks.keys(), default=None)
else:
self.bid_price = max(self.bids.keys(), default=None)


def spread(self) -> float:
return self.ask_price - self.bid_price # type: ignore

def summarize(self):
return OrderBookSummary(
bid_price=self.bid_price,
bid_size=self.bids[self.bid_price],
ask_price=self.ask_price,
ask_size=self.asks[self.ask_price],
spread=self.spread(),
)


flow = Dataflow("orderbook")
inp = op.input(
"input", flow, CoinbaseSource(["BTC-USD", "ETH-USD", "BTC-EUR", "ETH-EUR"])
)
# ('BTC-USD', {
# 'type': 'l2update',
# 'product_id': 'BTC-USD',
# 'changes': [['buy', '36905.39', '0.00334873']],
# 'time': '2022-05-05T17:25:09.072519Z',
# })


def mapper(state, value):
if state is None:
state = OrderBookState()

state.update(value)
return (state, state.summarize())


stats = op.stateful_map("orderbook", inp, mapper)
# ('BTC-USD', (36905.39, 0.00334873, 36905.4, 1.6e-05, 0.010000000002037268))


# # filter on 0.1% spread as a per
def just_large_spread(prod_summary):
product, summary = prod_summary
return summary.spread / summary.ask_price > 0.0001


state = op.filter("big_spread", stats, just_large_spread)
op.output("out", stats, StdOutSink())
32 changes: 32 additions & 0 deletions charts/bytewax/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,34 @@ spec:
mountPath: {{ .mountPath }}
subPath: {{ .subPath | default "" }}
readOnly: {{ .readOnly }}
{{- end }}
{{- if .Values.configuration.dependencies }}
- name: init-dependencies
command:
- sh
- -c
- |
{{- range .Values.configuration.dependencies }}
/venv/bin/pip install {{ . }} -t /var/bytewax
{{- end }}
image: bytewax/bytewax:latest
imagePullPolicy: Always
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
add:
- NET_BIND_SERVICE
drop:
- ALL
readOnlyRootFilesystem: true
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/bytewax/
name: working-directory
- mountPath: /tmp
name: pip-tmp
{{- end }}
terminationGracePeriodSeconds: 10
containers:
Expand Down Expand Up @@ -197,6 +225,10 @@ spec:
mountPath: /var/recovery
{{- end }}
volumes:
{{- if .Values.configuration.dependencies }}
- name: pip-tmp
emptyDir: {}
{{- end }}
- name: hostfile
emptyDir: {}
- name: python-files
Expand Down
32 changes: 32 additions & 0 deletions charts/bytewax/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ spec:
mountPath: {{ .mountPath }}
subPath: {{ .subPath | default "" }}
readOnly: {{ .readOnly }}
{{- end }}
{{- if .Values.configuration.dependencies }}
- name: init-dependencies
command:
- sh
- -c
- |
{{- range .Values.configuration.dependencies }}
/venv/bin/pip install {{ . }} -t /var/bytewax
{{- end }}
image: bytewax/bytewax:latest
imagePullPolicy: Always
resources: {}
securityContext:
allowPrivilegeEscalation: false
capabilities:
add:
- NET_BIND_SERVICE
drop:
- ALL
readOnlyRootFilesystem: true
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/bytewax/
name: working-directory
- mountPath: /tmp
name: pip-tmp
{{- end }}
terminationGracePeriodSeconds: 10
containers:
Expand Down Expand Up @@ -234,6 +262,10 @@ spec:
readOnly: {{ .readOnly }}
{{- end }}
volumes:
{{- if .Values.configuration.dependencies }}
- name: pip-tmp
emptyDir: {}
{{- end }}
{{- if .Values.api.enabled }}
- name: api-cache
emptyDir: {}
Expand Down
3 changes: 2 additions & 1 deletion charts/bytewax/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image:
repository: bytewax/bytewax
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
tag: "0.19.0-python3.9"
tag: "0.19.1-python3.9"

imagePullSecrets: []

Expand Down Expand Up @@ -117,6 +117,7 @@ extraVolumeMounts: []

configuration:
pythonFileName: "simple.py"
dependencies: []
processesCount: 1
workersPerProcess: 1
keepAlive: false
Expand Down

0 comments on commit c0a615a

Please sign in to comment.