Skip to content

Commit

Permalink
Refs #22648: corrected failing CI
Browse files Browse the repository at this point in the history
Signed-off-by: Juanjo Garcia <[email protected]>
  • Loading branch information
juanjo4936 committed Jan 29, 2025
1 parent a972051 commit e2cee93
Showing 1 changed file with 49 additions and 47 deletions.
96 changes: 49 additions & 47 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3449,54 +3449,56 @@ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call)
{
auto test_transport = std::make_shared<test_UDPv4TransportDescriptor>();
test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool
{
static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t, std::chrono::steady_clock::time_point>> delayed_messages;

uint32_t old_pos = msg.pos;

// Parse writer ID and sequence number
msg.pos += 2; // flags
msg.pos += 2; // inline QoS
msg.pos += 4; // reader ID
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
eprosima::fastdds::rtps::SequenceNumber_t sn;
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);

// Restore buffer position
msg.pos = old_pos;

// Delay logic for user endpoints only
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
{
auto now = std::chrono::steady_clock::now();
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
[&sn](const auto& pair) {
return pair.first == sn;
});

if (it == delayed_messages.end())
{
// If the sequence number is encountered for the first time, start the delay
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
return true; // Start dropping this message
}
else if (now < it->second)
{
// If the delay period has not elapsed, keep dropping the message
return true;
}
else
{
// Once the delay has elapsed, allow the message to proceed
delayed_messages.erase(it);
}
}
return false; // Allow message to proceed
};

static std::vector<std::pair<eprosima::fastdds::rtps::SequenceNumber_t,
std::chrono::steady_clock::time_point>> delayed_messages;

uint32_t old_pos = msg.pos;

// Parse writer ID and sequence number
msg.pos += 2; // flags
msg.pos += 2; // inline QoS
msg.pos += 4; // reader ID
auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
eprosima::fastdds::rtps::SequenceNumber_t sn;
sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);
msg.pos += 4;
sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]);

// Restore buffer position
msg.pos = old_pos;

// Delay logic for user endpoints only
if ((writerID.value[3] & 0xC0) == 0) // only user endpoints
{
auto now = std::chrono::steady_clock::now();
auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(),
[&sn](const auto& pair)
{
return pair.first == sn;
});

if (it == delayed_messages.end())
{
// If the sequence number is encountered for the first time, start the delay
delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay
return true; // Start dropping this message
}
else if (now < it->second)
{
// If the delay period has not elapsed, keep dropping the message
return true;
}
else
{
// Once the delay has elapsed, allow the message to proceed
delayed_messages.erase(it);
}
}
return false; // Allow message to proceed
};

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

Expand Down

0 comments on commit e2cee93

Please sign in to comment.