From 3c38f35590e3d6b67df467f10d1efb1d7b429998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20=C5=A0koda?= Date: Thu, 2 May 2024 12:25:20 +0200 Subject: [PATCH] update order book updater to v2 --- lakeapi/orderbook.py | 59 +++++++++++++++++++---------- tests/test_orderbook.py | 83 +++++++++++++++++++++++++++++++---------- 2 files changed, 104 insertions(+), 38 deletions(-) diff --git a/lakeapi/orderbook.py b/lakeapi/orderbook.py index 7515f62..a1658de 100644 --- a/lakeapi/orderbook.py +++ b/lakeapi/orderbook.py @@ -1,6 +1,6 @@ from typing import TYPE_CHECKING, Optional, Tuple from numba import float64, njit -from numba.typed import Dict +from numba.typed import Dict, List if TYPE_CHECKING: import pandas as pd @@ -13,39 +13,60 @@ def __init__(self, df: 'pd.DataFrame'): self.ask = Dict.empty(key_type = float64, value_type = float64) self.current_index = 0 self.received_timestamp = None - self.np_arr = df[['bids', 'asks']].to_numpy() - self.received_times = df['received_time'].to_numpy() + self.sequence_number = None + self.int_arr = df[['received_time', 'sequence_number']].astype('int64').values + self.np_arr = df[['side_is_bid', 'price', 'size']].astype('float64').values + self._bests_cache = List() + self._bests_cache.append(0.) + self._bests_cache.append(0.) @staticmethod - @njit(cache = False) - def _update(bids, asks, bid_book, ask_book): - if len(bids): - for price, size in bids: + @njit(cache = True) + def _update_more(side_is_bid, prices, sizes, received_time, sequence_number, current_index, bid_book, ask_book, bests_cache): + starting_received_time = received_time[current_index] + while received_time[current_index] == starting_received_time: + price = prices[current_index] + size = sizes[current_index] + if side_is_bid[current_index]: if size == 0: if price in bid_book: del bid_book[price] + if bests_cache[0] == price: + bests_cache[0] = 0. else: bid_book[price] = size - if len(asks) > 0: - for price, size in asks: + if price > bests_cache[0]: + bests_cache[0] = price + else: if size == 0: if price in ask_book: del ask_book[price] + if bests_cache[1] == price: + bests_cache[1] = 0. else: ask_book[price] = size + if price < bests_cache[1]: + bests_cache[1] = price + current_index += 1 + if current_index >= prices.shape[0]: + break + return current_index, sequence_number[current_index-1], received_time[current_index-1] - def process_next_row(self, row: Optional[int] = None) -> None: + def process_next_update(self, starting_row: Optional[int] = None) -> int: ''' row in df contains received_time, bid and ask columns with numpy list of price-quantity pairs''' if self.current_index >= self.np_arr.shape[0]: - # return - raise StopIteration - if row is not None: - self.current_index = row + return 0 + if starting_row is not None: + self.current_index = starting_row - self._update(*self.np_arr[self.current_index], self.bid, self.ask) - self.received_timestamp = self.received_times[self.current_index] - self.current_index += 1 + self.current_index, self.sequence_number, self.received_timestamp = \ + self._update_more(*self.np_arr.T, *self.int_arr.T, self.current_index, self.bid, self.ask, self._bests_cache) + + return self.current_index def get_bests(self) -> Tuple[float, float]: - # TODO speed up - return max(self.bid), min(self.ask) + if not self._bests_cache[0]: + self._bests_cache[0] = max(self.bid) + if not self._bests_cache[1]: + self._bests_cache[1] = min(self.ask) + return tuple(self._bests_cache) diff --git a/tests/test_orderbook.py b/tests/test_orderbook.py index e962e6a..b1caceb 100644 --- a/tests/test_orderbook.py +++ b/tests/test_orderbook.py @@ -7,8 +7,10 @@ @pytest.fixture def order_book_updater(example_data): df = pd.DataFrame(example_data) - df['bids'] = df['bids'].apply(lambda x: np.array(x)) - df['asks'] = df['asks'].apply(lambda x: np.array(x)) + df['side_is_bid'] = df['side_is_bid'].apply(lambda x: np.array(x)) + df['price'] = df['price'].apply(lambda x: np.array(x)) + df['size'] = df['size'].apply(lambda x: np.array(x)) + df['received_time'] = df['received_time'].apply(lambda x: np.array(x)) return OrderBookUpdater(df) @pytest.fixture @@ -16,40 +18,83 @@ def example_data(): return [ { 'received_time': 1, - 'bids': [(1, 10), (2, 20)], - 'asks': [(3, 10), (4, 20)] + 'sequence_number': 1, + 'side_is_bid': True, + 'price': 1, + 'size': 10, + }, + { + 'received_time': 1, + 'sequence_number': 1, + 'side_is_bid': True, + 'price': 2, + 'size': 20, + }, + { + 'received_time': 1, + 'sequence_number': 1, + 'side_is_bid': False, + 'price': 3, + 'size': 10, + }, + { + 'received_time': 1, + 'sequence_number': 1, + 'side_is_bid': False, + 'price': 4, + 'size': 20, + }, + { + 'received_time': 2, + 'sequence_number': 2, + 'side_is_bid': True, + 'price': 1, + 'size': 5, }, { 'received_time': 2, - 'bids': [(1, 5)], - 'asks': [(3, 5)] + 'sequence_number': 2, + 'side_is_bid': False, + 'price': 3, + 'size': 5, }, { 'received_time': 3, - 'bids': [(2, 0)], - 'asks': [(4, 0)] - } + 'sequence_number': 3, + 'side_is_bid': True, + 'price': 2, + 'size': 0, + }, + { + 'received_time': 3, + 'sequence_number': 3, + 'side_is_bid': False, + 'price': 4, + 'size': 0, + }, ] -def test_process_next_row(order_book_updater, example_data): - order_book_updater.process_next_row() - assert order_book_updater.bid == dict(example_data[0]['bids']) - assert order_book_updater.ask == dict(example_data[0]['asks']) +def test_process_next_update(order_book_updater, example_data): + order_book_updater.process_next_update() + assert order_book_updater.bid == {1: 10, 2:20} + assert order_book_updater.ask == {3: 10, 4:20} assert order_book_updater.received_timestamp == example_data[0]['received_time'] + assert order_book_updater.sequence_number == example_data[0]['sequence_number'] - order_book_updater.process_next_row() + order_book_updater.process_next_update() assert order_book_updater.bid[1] == 5 assert order_book_updater.bid[2] == 20 - order_book_updater.process_next_row() + order_book_updater.process_next_update() assert order_book_updater.ask == {3: 5} assert order_book_updater.received_timestamp == example_data[-1]['received_time'] + assert order_book_updater.sequence_number == example_data[-1]['sequence_number'] def test_get_bests(order_book_updater): - order_book_updater.process_next_row() + order_book_updater.process_next_update() assert order_book_updater.get_bests() == (2, 3) -@pytest.mark.benchmark(group='process_next_row') -def test_process_next_row_benchmark(order_book_updater, benchmark): - benchmark.pedantic(order_book_updater.process_next_row, args = (0,), warmup_rounds=100, iterations=1000, rounds=10) +@pytest.mark.benchmark(group='process_next_update') +def test_process_next_update_benchmark(order_book_updater, benchmark): + benchmark.pedantic(order_book_updater.process_next_update, args = (0,), warmup_rounds=100, iterations=1000, rounds=10)