diff --git a/src/Engine.cpp b/src/Engine.cpp index ea01f0e..6253317 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -138,7 +138,7 @@ void Engine::end_pub_transaction() void Engine::pub_close() { auto self = sg4::Actor::self(); - XBT_DEBUG("Publisher '%s' is closing the engine", self->get_cname()); + XBT_DEBUG("Publisher '%s' is closing the engine '%s'", self->get_cname(), get_cname()); if (not pub_closing_) { // I'm the first to close pub_closing_ = true; @@ -221,7 +221,7 @@ void Engine::end_sub_transaction() void Engine::sub_close() { auto self = sg4::Actor::self(); - XBT_DEBUG("Subscriber '%s' is closing the engine", self->get_cname()); + XBT_DEBUG("Subscriber '%s' is closing the engine '%s'", self->get_cname(), get_cname()); if (not sub_closing_) { // I'm the first to close sub_closing_ = true; diff --git a/src/Stream.cpp b/src/Stream.cpp index 071b259..f7852f2 100644 --- a/src/Stream.cpp +++ b/src/Stream.cpp @@ -133,6 +133,7 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) dtl_->lock(); if (not engine_) { + XBT_DEBUG("Create Engine"); if (engine_type_ == Engine::Type::Staging) { engine_ = std::make_shared(name, this); } else if (engine_type_ == Engine::Type::File) { @@ -145,7 +146,7 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) dtl_->unlock(); while (not engine_) - sg4::this_actor::sleep_for(0.05); + sg4::this_actor::sleep_for(0.01); // Then we register the actors calling Stream::open as publishers or subscribers in the newly created Engine. if (mode == dtlmod::Stream::Mode::Publish) { @@ -154,9 +155,11 @@ std::shared_ptr Stream::open(const std::string& name, Mode mode) engine_->add_subscriber(sg4::Actor::self(), rendez_vous_); } - XBT_DEBUG("Stream '%s' uses engine '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), get_engine_type_str(), - get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers()); + XBT_DEBUG("Stream '%s' uses engine '%s' of type '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), engine_->get_cname(), + get_engine_type_str(), get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers()); + sg4::this_actor::sleep_for(0.05); + return engine_; } diff --git a/test/dtl_file_engine.cpp b/test/dtl_file_engine.cpp index 419233f..6409420 100644 --- a/test/dtl_file_engine.cpp +++ b/test/dtl_file_engine.cpp @@ -234,7 +234,7 @@ TEST_F(DTLFileEngineTest, MultiplePubSingleSubSharedStorage) XBT_INFO("Start a Transaction"); ASSERT_NO_THROW(engine->begin_transaction()); XBT_INFO("Transition can start as publishers have finished writing"); - ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.651431842993127); + ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.701431842993127); XBT_INFO("Get the entire Variable 'var' from the DTL"); ASSERT_NO_THROW(engine->get(var_sub)); XBT_INFO("End a Transaction"); diff --git a/test/dtl_stream.cpp b/test/dtl_stream.cpp index 0e17fd8..513a1a0 100644 --- a/test/dtl_stream.cpp +++ b/test/dtl_stream.cpp @@ -138,7 +138,7 @@ TEST_F(DTLStreamTest, PublishFileMultipleOpen) XBT_INFO("Open the Stream in Stream::Mode::Publish mode"); ASSERT_NO_THROW(engine = stream->open("zone:fs:/pfs/file", dtlmod::Stream::Mode::Publish)); XBT_INFO("Check current number of publishers and subscribers"); - ASSERT_EQ(stream->get_num_publishers(), 1); + ASSERT_EQ(stream->get_num_publishers(), 2); ASSERT_EQ(stream->get_num_subscribers(), 0); ASSERT_NO_THROW(sg4::this_actor::sleep_for(1)); XBT_INFO("Close the engine"); @@ -194,8 +194,8 @@ TEST_F(DTLStreamTest, OpenWithRendezVous) ASSERT_NO_THROW(stream->set_rendez_vous()); XBT_INFO("Open the Stream in Stream::Mode::Publish mode"); ASSERT_NO_THROW(engine = stream->open("foo", dtlmod::Stream::Mode::Publish)); - XBT_INFO("Open complete. Clock should be at 10s"); - ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.0); + XBT_INFO("Open complete. Clock should be at 10.05s"); + ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.05); XBT_INFO("Let actor %s sleep for 1 second", sg4::this_actor::get_cname()); ASSERT_NO_THROW(sg4::this_actor::sleep_for(1)); XBT_INFO("Close the engine");