Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read event data in parallel to backtest #124

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hftbacktest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hmac = { version = "0.13.0-pre.3", optional = true }
rand = { version = "0.8.5", optional = true }
uuid = { version = "1.8.0", features = ["v4"], optional = true }
nom = { version = "7.1.3", optional = true }
bus = { version = "2.4" }
Copy link
Contributor

@bohblue2 bohblue2 Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: bus sometimes busy-waits in the current implementation, which may cause increased CPU usage — see jonhoo/bus#23.

. There is at least one case where the readers race with the writer and may not successfully wake it up, so the writer has to park with a timeout. I would love to get rid of this, but haven't had a chance to dig into it, and no longer use this library actively myself. If you want to take a look, I'd be happy to help out!

It probably won't cause any problems (because this happens in SPMC and we're an SPSC structure), but I think I'll put up some docs anyway.

hftbacktest-derive = { path = "../hftbacktest-derive", optional = true, version = "0.1.0" }

[dev-dependencies]
Expand Down
106 changes: 106 additions & 0 deletions hftbacktest/src/backtest/data/bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::{io, io::ErrorKind};
use std::iter::Peekable;
use bus::{Bus, BusIntoIter, BusReader};
use tracing::{error, info, info_span};

use crate::backtest::{
data::{read_npy_file, read_npz_file, Data, NpyDTyped},
BacktestError,
};

#[derive(Copy, Clone)]
pub enum EventBusMessage<EventT: Clone> {
Item(EventT),
EndOfData,
}

pub struct EventBusReader<EventT: Clone + Send + Sync> {
reader: Peekable<BusIntoIter<EventBusMessage<EventT>>>,
}

impl<EventT: Clone + Send + Sync> EventBusReader<EventT> {
pub fn new(reader: BusReader<EventBusMessage<EventT>>) -> Self {
Self {
reader: reader.into_iter().peekable()
}
}

pub fn peek(&mut self) -> Option<&EventT> {
self.reader.peek().and_then(|ev| match ev {
EventBusMessage::Item(item) => Some(item),
EventBusMessage::EndOfData => None,
})
}

pub fn next(&mut self) -> Option<EventT> {
self.reader.next().and_then(|ev| match ev {
EventBusMessage::Item(item) => Some(item),
EventBusMessage::EndOfData => None,
})
}
}

pub trait TimestampedEventQueue<EventT> {
fn next_event(&mut self) -> Option<EventT>;

fn peek_event(&mut self) -> Option<&EventT>;

fn event_time(value: &EventT) -> i64;
}

pub trait EventConsumer<EventT> {
fn is_event_relevant(event: &EventT) -> bool;

fn process_event(&mut self, event: EventT) -> Result<(), BacktestError>;
}

fn load_data<EventT: NpyDTyped + Clone + Send>(
filepath: String,
) -> Result<Data<EventT>, BacktestError> {
let data = if filepath.ends_with(".npy") {
read_npy_file(&filepath)?
} else if filepath.ends_with(".npz") {
read_npz_file(&filepath, "data")?
} else {
return Err(BacktestError::DataError(io::Error::new(
ErrorKind::InvalidData,
"unsupported data type",
)));
};

Ok(data)
}

#[tracing::instrument(skip_all)]
pub fn replay_events_to_bus<EventT: NpyDTyped + Clone + Send + 'static>(
mut bus: Bus<EventBusMessage<EventT>>,
mut sources: Vec<String>,
) {
for source in sources.drain(..) {
let source_load_span = info_span!("load_data", source = &source);
let _source_load_span = source_load_span.entered();

let data = load_data::<EventT>(source);

match data {
Ok(data) => {
info!(
records = data.len(),
"found {} events in data source",
data.len()
);

for row in 0..data.len() {
bus.broadcast(EventBusMessage::Item(data[row].clone()));
}
}
Err(e) => {
error!("encountered error loading data source: {}", e);
// TODO: handle as an error.
break;
}
}
}

bus.broadcast(EventBusMessage::EndOfData);
}
4 changes: 4 additions & 0 deletions hftbacktest/src/backtest/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod bus;
mod npy;
mod reader;

Expand All @@ -10,6 +11,7 @@ use std::{
slice::SliceIndex,
};

pub use bus::{replay_events_to_bus, EventBusMessage, EventBusReader, EventConsumer, TimestampedEventQueue};
pub use npy::{read_npy_file, read_npz_file, write_npy, Field, NpyDTyped, NpyHeader};
pub use reader::{Cache, DataSource, Reader};

Expand Down Expand Up @@ -107,6 +109,8 @@ where
}
}

unsafe impl Send for DataPtr {}

#[derive(Debug)]
pub struct DataPtr {
ptr: *mut [u8],
Expand Down
3 changes: 2 additions & 1 deletion hftbacktest/src/backtest/data/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::HashMap,
io::{Error as IoError, ErrorKind},
rc::Rc,
sync::Arc,
};

use uuid::Uuid;
Expand Down Expand Up @@ -60,7 +61,7 @@ where
/// Provides a data cache that allows both the local processor and exchange processor to access the
/// same or different data based on their timestamps without the need for reloading.
#[derive(Clone, Debug)]
pub struct Cache<D>(Rc<RefCell<HashMap<String, CachedData<D>>>>)
pub struct Cache<D>(Arc<RefCell<HashMap<String, CachedData<D>>>>)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from the initial prototype, needs reverted.

where
D: POD + Clone;

Expand Down
49 changes: 34 additions & 15 deletions hftbacktest/src/backtest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, io::Error as IoError, marker::PhantomData};
use std::{collections::HashMap, io::Error as IoError, marker::PhantomData, sync::mpsc::Receiver};

use bus::Bus;
pub use data::DataSource;
use data::{Cache, Reader};
use models::FeeModel;
Expand All @@ -13,10 +14,18 @@ pub use crate::backtest::{
use crate::{
backtest::{
assettype::AssetType,
data::replay_events_to_bus,
evs::{EventIntentKind, EventSet},
models::{LatencyModel, QueueModel},
order::OrderBus,
proc::{Local, LocalProcessor, NoPartialFillExchange, PartialFillExchange, Processor},
proc::{
Local,
LocalProcessor,
NoPartialFillExchange,
OrderConsumer,
PartialFillExchange,
Processor,
},
state::State,
},
depth::{HashMapMarketDepth, L2MarketDepth, L3MarketDepth, MarketDepth},
Expand All @@ -34,6 +43,7 @@ use crate::{
},
types::{BuildError, Event},
};
use crate::backtest::data::EventBusReader;

/// Provides asset types.
pub mod assettype;
Expand Down Expand Up @@ -113,11 +123,11 @@ pub enum ExchangeKind {

/// A builder for `Asset`.
pub struct AssetBuilder<LM, AT, QM, MD, FM> {
data_sources: Vec<String>,
latency_model: Option<LM>,
asset_type: Option<AT>,
queue_model: Option<QM>,
depth_builder: Option<Box<dyn Fn() -> MD>>,
reader: Reader<Event>,
fee_model: Option<FM>,
exch_kind: ExchangeKind,
last_trades_cap: usize,
Expand All @@ -133,18 +143,15 @@ where
{
/// Constructs an instance of `AssetBuilder`.
pub fn new() -> Self {
let cache = Cache::new();
let reader = Reader::new(cache);

Self {
latency_model: None,
asset_type: None,
queue_model: None,
depth_builder: None,
reader,
fee_model: None,
exch_kind: ExchangeKind::NoPartialFillExchange,
last_trades_cap: 0,
data_sources: vec![],
}
}

Expand All @@ -153,10 +160,10 @@ where
for item in data {
match item {
DataSource::File(filename) => {
self.reader.add_file(filename);
self.data_sources.push(filename);
}
DataSource::Data(data) => {
self.reader.add_data(data);
DataSource::Data(_) => {
todo!("involves a copy");
}
}
}
Expand Down Expand Up @@ -242,8 +249,16 @@ where
.clone()
.ok_or(BuildError::BuilderIncomplete("fee_model"))?;

let mut bus = Bus::new(10_000);
let exch_bus = bus.add_rx();
let local_bus = bus.add_rx();

std::thread::spawn(move || {
replay_events_to_bus(bus, self.data_sources);
});

let local = Local::new(
self.reader.clone(),
EventBusReader::new(local_bus),
create_depth(),
State::new(asset_type, fee_model),
order_latency,
Expand Down Expand Up @@ -271,7 +286,7 @@ where
match self.exch_kind {
ExchangeKind::NoPartialFillExchange => {
let exch = NoPartialFillExchange::new(
self.reader.clone(),
EventBusReader::new(exch_bus),
create_depth(),
State::new(asset_type, fee_model),
order_latency,
Expand All @@ -287,7 +302,7 @@ where
}
ExchangeKind::PartialFillExchange => {
let exch = PartialFillExchange::new(
self.reader.clone(),
EventBusReader::new(exch_bus),
create_depth(),
State::new(asset_type, fee_model),
order_latency,
Expand Down Expand Up @@ -330,8 +345,12 @@ where
.clone()
.ok_or(BuildError::BuilderIncomplete("fee_model"))?;

let mut bus = Bus::new(1000);
let local_reader = EventBusReader::new(bus.add_rx());
let exch_reader = EventBusReader::new(bus.add_rx());

let local = Local::new(
self.reader.clone(),
local_reader,
create_depth(),
State::new(asset_type, fee_model),
order_latency,
Expand All @@ -356,7 +375,7 @@ where
.clone()
.ok_or(BuildError::BuilderIncomplete("fee_model"))?;
let exch = NoPartialFillExchange::new(
self.reader.clone(),
exch_reader,
create_depth(),
State::new(asset_type, fee_model),
order_latency,
Expand Down
13 changes: 10 additions & 3 deletions hftbacktest/src/backtest/proc/l3_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
data::{Data, Reader},
models::{FeeModel, LatencyModel},
order::OrderBus,
proc::{LocalProcessor, Processor},
proc::{LocalProcessor, OrderConsumer, Processor},
state::State,
BacktestError,
},
Expand Down Expand Up @@ -312,7 +312,16 @@ where

Ok((next_ts, i64::MAX))
}
}

impl<AT, LM, MD, FM> OrderConsumer for L3Local<AT, LM, MD, FM>
where
AT: AssetType,
LM: LatencyModel,
MD: L3MarketDepth,
FM: FeeModel,
BacktestError: From<<MD as L3MarketDepth>::Error>,
{
fn process_recv_order(
&mut self,
timestamp: i64,
Expand Down Expand Up @@ -347,11 +356,9 @@ where
}
Ok(wait_resp_order_received)
}

fn earliest_recv_order_timestamp(&self) -> i64 {
self.orders_from.earliest_timestamp().unwrap_or(i64::MAX)
}

fn earliest_send_order_timestamp(&self) -> i64 {
self.orders_to.earliest_timestamp().unwrap_or(i64::MAX)
}
Expand Down
Loading