Skip to content

Commit

Permalink
add tiny delay at the end of Stream open to avoid race condition + te…
Browse files Browse the repository at this point in the history
…st revalidation
  • Loading branch information
frs69wq committed Sep 23, 2024
1 parent 06cd22b commit 864c921
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void Engine::end_pub_transaction()
void Engine::pub_close()
{
auto self = sg4::Actor::self();
XBT_DEBUG("Publisher '%s' is closing the engine", self->get_cname());
XBT_DEBUG("Publisher '%s' is closing the engine '%s'", self->get_cname(), get_cname());
if (not pub_closing_) {
// I'm the first to close
pub_closing_ = true;
Expand Down Expand Up @@ -221,7 +221,7 @@ void Engine::end_sub_transaction()
void Engine::sub_close()
{
auto self = sg4::Actor::self();
XBT_DEBUG("Subscriber '%s' is closing the engine", self->get_cname());
XBT_DEBUG("Subscriber '%s' is closing the engine '%s'", self->get_cname(), get_cname());
if (not sub_closing_) {
// I'm the first to close
sub_closing_ = true;
Expand Down
9 changes: 6 additions & 3 deletions src/Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
dtl_->lock();

if (not engine_) {
XBT_DEBUG("Create Engine");
if (engine_type_ == Engine::Type::Staging) {
engine_ = std::make_shared<StagingEngine>(name, this);
} else if (engine_type_ == Engine::Type::File) {
Expand All @@ -145,7 +146,7 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
dtl_->unlock();

while (not engine_)
sg4::this_actor::sleep_for(0.05);
sg4::this_actor::sleep_for(0.01);

// Then we register the actors calling Stream::open as publishers or subscribers in the newly created Engine.
if (mode == dtlmod::Stream::Mode::Publish) {
Expand All @@ -154,9 +155,11 @@ std::shared_ptr<Engine> Stream::open(const std::string& name, Mode mode)
engine_->add_subscriber(sg4::Actor::self(), rendez_vous_);
}

XBT_DEBUG("Stream '%s' uses engine '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), get_engine_type_str(),
get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());
XBT_DEBUG("Stream '%s' uses engine '%s' of type '%s' and transport '%s' (%zu Pub. / %zu Sub.)", get_cname(), engine_->get_cname(),
get_engine_type_str(), get_transport_method_str(), engine_->get_num_publishers(), engine_->get_num_subscribers());

sg4::this_actor::sleep_for(0.05);

return engine_;
}

Expand Down
2 changes: 1 addition & 1 deletion test/dtl_file_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ TEST_F(DTLFileEngineTest, MultiplePubSingleSubSharedStorage)
XBT_INFO("Start a Transaction");
ASSERT_NO_THROW(engine->begin_transaction());
XBT_INFO("Transition can start as publishers have finished writing");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.651431842993127);
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 15.701431842993127);
XBT_INFO("Get the entire Variable 'var' from the DTL");
ASSERT_NO_THROW(engine->get(var_sub));
XBT_INFO("End a Transaction");
Expand Down
6 changes: 3 additions & 3 deletions test/dtl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ TEST_F(DTLStreamTest, PublishFileMultipleOpen)
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
ASSERT_NO_THROW(engine = stream->open("zone:fs:/pfs/file", dtlmod::Stream::Mode::Publish));
XBT_INFO("Check current number of publishers and subscribers");
ASSERT_EQ(stream->get_num_publishers(), 1);
ASSERT_EQ(stream->get_num_publishers(), 2);
ASSERT_EQ(stream->get_num_subscribers(), 0);
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
XBT_INFO("Close the engine");
Expand Down Expand Up @@ -194,8 +194,8 @@ TEST_F(DTLStreamTest, OpenWithRendezVous)
ASSERT_NO_THROW(stream->set_rendez_vous());
XBT_INFO("Open the Stream in Stream::Mode::Publish mode");
ASSERT_NO_THROW(engine = stream->open("foo", dtlmod::Stream::Mode::Publish));
XBT_INFO("Open complete. Clock should be at 10s");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.0);
XBT_INFO("Open complete. Clock should be at 10.05s");
ASSERT_DOUBLE_EQ(sg4::Engine::get_clock(), 10.05);
XBT_INFO("Let actor %s sleep for 1 second", sg4::this_actor::get_cname());
ASSERT_NO_THROW(sg4::this_actor::sleep_for(1));
XBT_INFO("Close the engine");
Expand Down

0 comments on commit 864c921

Please sign in to comment.