Skip to content

Commit

Permalink
Merge pull request #5 from dsec-capital/de/fxcm-testing
Browse files Browse the repository at this point in the history
De/fxcm testing
  • Loading branch information
degloff authored Jun 6, 2024
2 parents 68ef42e + c9a4af7 commit b20abc1
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 369 deletions.
41 changes: 30 additions & 11 deletions common/blocking_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,23 @@ namespace common {
pushed_cond.notify_all();
}

bool pop(T& item)
{
std::unique_lock<std::mutex> ul(mutex);
if (queue.empty()) {
return false;
}
item = std::move(queue.front());
queue.pop();
return true;
}

template<class R, class P>
bool pop(T& item, const std::chrono::duration<R, P>& timeout)
{
std::unique_lock<std::mutex> ul(mutex);
if (queue.empty()) {
if (!pushed_cond.wait_for(ul, timeout, [this]() {return !this->queue.empty(); })) {
if (!pushed_cond.wait_for(ul, timeout, [this]() { return !this->queue.empty(); })) {
return false;
}
}
Expand All @@ -149,15 +160,23 @@ namespace common {
return true;
}

bool pop(T& item)
template<class Op, class R, class P>
bool pop_until(Op op, const std::chrono::duration<R, P>& timeout)
{
std::unique_lock<std::mutex> ul(mutex);
if (queue.empty()) {
return false;
}
item = std::move(queue.front());
queue.pop();
return true;
std::unique_lock<std::mutex> ul(mutex);
auto now = std::chrono::steady_clock::now();
auto until = now + timeout;
auto done = false;
while (!done) {
if (queue.empty()) {
if (!pushed_cond.wait_until(ul, until, [this]() { return !this->queue.empty(); })) {
return false;
}
}
done = op(queue.front());
queue.pop();
}
return true;
}

template<class Op>
Expand Down Expand Up @@ -198,7 +217,7 @@ namespace common {
{
std::unique_lock<std::mutex> ul(mutex);
if (queue.size() >= max_size) {
if (!popped_cond.wait_for(ul, timeout, [this]() {return this->queue.size() < this->max_size; }))
if (!popped_cond.wait_for(ul, timeout, [this]() { return this->queue.size() < this->max_size; }))
return false;
}
queue.push(std::move(item));
Expand All @@ -211,7 +230,7 @@ namespace common {
{
std::unique_lock<std::mutex> ul(mutex);
if (queue.empty()) {
if (!pushed_cond.wait_for(ul, timeout, [this]() {return !this->queue.empty(); }))
if (!pushed_cond.wait_for(ul, timeout, [this]() { return !this->queue.empty(); }))
return false;
}
item = std::move(queue.front());
Expand Down
12 changes: 12 additions & 0 deletions common/exec_report.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ namespace common {
return "TRADE";
case FIX::ExecType_REJECTED:
return "REJECTED";
case FIX::ExecType_STOPPED:
return "STOPPED";
case FIX::ExecType_SUSPENDED:
return "SUSPENDED";
case FIX::ExecType_DONE_FOR_DAY:
return "DONE_FOR_DAY";
default:
return "UNKNOWN";
}
Expand Down Expand Up @@ -67,6 +73,12 @@ namespace common {
return "REPLACED";
case FIX::OrdStatus_REJECTED:
return "REJECTED";
case FIX::OrdStatus_STOPPED:
return "STOPPED";
case FIX::OrdStatus_SUSPENDED:
return "SUSPENDED";
case FIX::OrdStatus_DONE_FOR_DAY:
return "DONE_FOR_DAY";
default:
return "UNKNOWN";
}
Expand Down
12 changes: 12 additions & 0 deletions common/fix.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include "quickfix/FixFields.h"
#include "quickfix/FixValues.h"
#include "quickfix/Message.h"

namespace common::fix {

Expand All @@ -17,6 +18,17 @@ namespace common::fix {
REQUEST_REJECTED = 6
};


inline bool is_market_data_message(const FIX::Message& message) {
const auto& msg_type = message.getHeader().getField(FIX::FIELD::MsgType);
return msg_type == FIX::MsgType_MarketDataSnapshotFullRefresh || msg_type == FIX::MsgType_MarketDataIncrementalRefresh;
}

inline bool is_exec_report_message(const FIX::Message& message) {
const auto& msg_type = message.getHeader().getField(FIX::FIELD::MsgType);
return msg_type == FIX::MsgType_ExecutionReport;
}

}

#endif
45 changes: 37 additions & 8 deletions common/order_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ namespace common {
, leaves_qty(leaves_qty)
{}


bool OrderReport::is_sell() const {
return side == FIX::Side_SELL;
}

bool OrderReport::is_buy() const {
return side == FIX::Side_BUY;
}

bool OrderReport::is_filled() const {
return leaves_qty == 0;
}
Expand Down Expand Up @@ -149,13 +158,16 @@ namespace common {
return false;
}

spdlog::debug("OrderTracker::process: processing report {}", report.to_string());
spdlog::debug("OrderTracker[{}]::process: processing report {}", account, report.to_string());

switch (report.exec_type) {
case FIX::ExecType_PENDING_NEW: {
auto [_, inserted] = pending_orders_by_cl_ord_id.emplace(report.cl_ord_id, std::move(OrderReport(report)));
if (!inserted) {
spdlog::error("OrderTracker::process[FIX::ExecType_PENDING_NEW]: failed to insert cl_ord_id={} report={}", report.cl_ord_id, report.to_string());
spdlog::error(
"OrderTracker[{}]::process[FIX::ExecType_PENDING_NEW]: failed to insert cl_ord_id={} report={}",
account, report.cl_ord_id, report.to_string()
);
return false;
}
return true;
Expand All @@ -165,7 +177,10 @@ namespace common {
pending_orders_by_cl_ord_id.erase(report.cl_ord_id);
auto [_, inserted] = orders_by_ord_id.emplace(report.ord_id, std::move(OrderReport(report)));
if (!inserted) {
spdlog::error("OrderTracker::process[FIX::ExecType_NEW]: failed to insert ord_id={} report={}", report.ord_id, report.to_string());
spdlog::error(
"OrderTracker[{}]::process[FIX::ExecType_NEW]: failed to insert ord_id={} report={}",
account, report.ord_id, report.to_string()
);
return false;
}
return true;
Expand All @@ -178,12 +193,18 @@ namespace common {
if (report.side == FIX::Side_BUY) {
auto prev = position.qty;
position.qty += report.last_qty;
spdlog::debug("OrderTracker::process[FIX::ExecType_TRADE]: buy fill updated net positon from {} to {}", prev, position.qty);
spdlog::debug(
"OrderTracker[{}]::process[FIX::ExecType_TRADE]: buy fill updated net positon from {} to {}",
account, prev, position.qty
);
}
else if (report.side == FIX::Side_SELL) {
auto prev = position.qty;
position.qty -= report.last_qty;
spdlog::debug("OrderTracker::process[FIX::ExecType_TRADE]: sell fill updated net positon from {} to {}", prev, position.qty);
spdlog::debug(
"OrderTracker[{}]::process[FIX::ExecType_TRADE]: sell fill updated net positon from {} to {}",
account, prev, position.qty
);
}
position.avg_px = report.avg_px;
}
Expand All @@ -206,20 +227,28 @@ namespace common {
}

case FIX::ExecType_REJECTED: {
spdlog::warn("OrderTracker::process[FIX::ExecType_REJECTED]: rejected {}", report.to_string());
spdlog::warn("OrderTracker[{}]::process[FIX::ExecType_REJECTED]: rejected {}", account, report.to_string());
return true;
}

default: {
spdlog::error("OrderTracker::process: invalid FIX::ExecType {}", report.exec_type);
spdlog::error("OrderTracker[{}]::process: invalid FIX::ExecType {}", account, report.exec_type);
return false;
}
}
}

void OrderTracker::set_account(const std::string& new_account) {
account = new_account;
}

const std::string& OrderTracker::get_account() const {
return account;
}

std::string OrderTracker::to_string() const {
std::string rows;
rows += "OrderTracker[\n";
rows += std::format("OrderTracker[{}][\n", account);
rows += " pending orders:\n";
for (auto& [cl_ord_id, order] : pending_orders_by_cl_ord_id) {
rows += std::format(" cl_ord_id={} order={}\n", cl_ord_id, order.to_string());
Expand Down
8 changes: 8 additions & 0 deletions common/order_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ namespace common {
double cum_qty{ 0 };
double leaves_qty{ 0 };

bool is_buy() const;

bool is_sell() const;

bool is_filled() const;

bool is_cancelled() const;
Expand Down Expand Up @@ -109,6 +113,10 @@ namespace common {

OrderTracker(const std::string& account);

void set_account(const std::string& account);

const std::string& get_account() const;

NetPosition& net_position(const std::string& symbol);

std::pair<typename OrderTracker::const_iterator, bool> get_pending_order(const std::string& ord_id) const;
Expand Down
4 changes: 4 additions & 0 deletions test_fxcm_market_data/test_fxcm_market_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

namespace zorro {
int(__cdecl* BrokerError)(const char* txt);

namespace log {
std::size_t logging_verbosity = 2;
}
}

int show(const char* txt) {
Expand Down
4 changes: 2 additions & 2 deletions zorro_common/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

namespace zorro {

int logging_verbosity = 2;

namespace log {

extern std::size_t logging_verbosity;

namespace {
inline void _show(const std::string& msg) {
if (!BrokerError) return;
Expand Down
Loading

0 comments on commit b20abc1

Please sign in to comment.