From c184e53e440d2c9f327a2860de9a5b28379d8dbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0koda?= Date: Thu, 2 May 2024 10:04:40 +0200 Subject: [PATCH] add order book updater --- lakeapi/orderbook.py | 51 ++++++++++++++++++++++++++++++++++++++ tests/test_orderbook.py | 55 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 lakeapi/orderbook.py create mode 100644 tests/test_orderbook.py diff --git a/lakeapi/orderbook.py b/lakeapi/orderbook.py new file mode 100644 index 0000000..7515f62 --- /dev/null +++ b/lakeapi/orderbook.py @@ -0,0 +1,51 @@ +from typing import TYPE_CHECKING, Optional, Tuple +from numba import float64, njit +from numba.typed import Dict + +if TYPE_CHECKING: + import pandas as pd + + +class OrderBookUpdater: + ''' Maintains order book snapshot while iterating over a dataframe with order book deltas. ''' + def __init__(self, df: 'pd.DataFrame'): + self.bid = Dict.empty(key_type = float64, value_type = float64) + self.ask = Dict.empty(key_type = float64, value_type = float64) + self.current_index = 0 + self.received_timestamp = None + self.np_arr = df[['bids', 'asks']].to_numpy() + self.received_times = df['received_time'].to_numpy() + + @staticmethod + @njit(cache = False) + def _update(bids, asks, bid_book, ask_book): + if len(bids): + for price, size in bids: + if size == 0: + if price in bid_book: + del bid_book[price] + else: + bid_book[price] = size + if len(asks) > 0: + for price, size in asks: + if size == 0: + if price in ask_book: + del ask_book[price] + else: + ask_book[price] = size + + def process_next_row(self, row: Optional[int] = None) -> None: + ''' row in df contains received_time, bid and ask columns with numpy list of price-quantity pairs''' + if self.current_index >= self.np_arr.shape[0]: + # return + raise StopIteration + if row is not None: + self.current_index = row + + self._update(*self.np_arr[self.current_index], self.bid, self.ask) + self.received_timestamp = self.received_times[self.current_index] + self.current_index += 1 + + def get_bests(self) -> Tuple[float, float]: + # TODO speed up + return max(self.bid), min(self.ask) diff --git a/tests/test_orderbook.py b/tests/test_orderbook.py new file mode 100644 index 0000000..e962e6a --- /dev/null +++ b/tests/test_orderbook.py @@ -0,0 +1,55 @@ +import pandas as pd +import numpy as np +import pytest + +from lakeapi.orderbook import OrderBookUpdater + +@pytest.fixture +def order_book_updater(example_data): + df = pd.DataFrame(example_data) + df['bids'] = df['bids'].apply(lambda x: np.array(x)) + df['asks'] = df['asks'].apply(lambda x: np.array(x)) + return OrderBookUpdater(df) + +@pytest.fixture +def example_data(): + return [ + { + 'received_time': 1, + 'bids': [(1, 10), (2, 20)], + 'asks': [(3, 10), (4, 20)] + }, + { + 'received_time': 2, + 'bids': [(1, 5)], + 'asks': [(3, 5)] + }, + { + 'received_time': 3, + 'bids': [(2, 0)], + 'asks': [(4, 0)] + } + ] + +def test_process_next_row(order_book_updater, example_data): + order_book_updater.process_next_row() + assert order_book_updater.bid == dict(example_data[0]['bids']) + assert order_book_updater.ask == dict(example_data[0]['asks']) + assert order_book_updater.received_timestamp == example_data[0]['received_time'] + + order_book_updater.process_next_row() + assert order_book_updater.bid[1] == 5 + assert order_book_updater.bid[2] == 20 + + order_book_updater.process_next_row() + assert order_book_updater.ask == {3: 5} + assert order_book_updater.received_timestamp == example_data[-1]['received_time'] + + +def test_get_bests(order_book_updater): + order_book_updater.process_next_row() + assert order_book_updater.get_bests() == (2, 3) + +@pytest.mark.benchmark(group='process_next_row') +def test_process_next_row_benchmark(order_book_updater, benchmark): + benchmark.pedantic(order_book_updater.process_next_row, args = (0,), warmup_rounds=100, iterations=1000, rounds=10)