Skip to content

Commit

Permalink
Small improvement to address seq_num/time_stamp overflow. MacOS CI is…
Browse files Browse the repository at this point in the history
… back, needs fix.
  • Loading branch information
aous72 committed Apr 30, 2024
1 parent ee914d0 commit 4065291
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ccp-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
include: [
# { system: MacOS, runner: macos-latest },
{ system: MacOS, runner: macos-latest },
{ system: Ubuntu-20, runner: ubuntu-20.04 },
{ system: Ubuntu-latest, runner: ubuntu-latest },
]
Expand All @@ -27,7 +27,7 @@ jobs:
strategy:
matrix:
include: [
# { system: MacOS, runner: macos-latest },
{ system: MacOS, runner: macos-latest },
{ system: Ubuntu-latest, runner: ubuntu-latest },
]
name: ${{ matrix.system }} Test
Expand Down
96 changes: 72 additions & 24 deletions src/apps/ojph_stream_expand/stream_expand_support.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,53 @@ namespace ojph
namespace stex
{

///////////////////////////////////////////////////////////////////////////////
//
//
// static comparison functions
//
//
///////////////////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////////////////////////
// Compares two 32 bit values, A with B, with the possibility A or B has
// undergone overflow. This problem has no proper solution, but here we
// assume that the value B approximately divides the space into two regions,
// a region larger than B and a region smaller than B. This leaves one
// undetermined value that lies at the opposite end of B, a case we just
// ignore -- it is part of smaller.
// NB: This is my current thinking -- I might be wrong
static inline bool is_greater32(ui32 a, ui32 b)
{
ui32 c = a - b;
return (c > 0u && c <= 0x7FFFFFFFu);
}

///////////////////////////////////////////////////////////////////////////////
// Compares two 32 bit values, A with B, with the possibility A or B has
// undergone overflow. This problem has no proper solution, but here we
// assume that the value B approximately divides the space into two regions,
// a region larger than B and a region smaller than B. This leaves one
// undetermined value that lies at the opposite end of B, a case we just
// ignore -- it is part of smaller.
// NB: This is my current thinking -- I might be wrong
static inline bool is_smaller32(ui32 a, ui32 b)
{
ui32 c = a - b;
return (c >= 0x80000000u && c <= 0xFFFFFFFFu);
}

///////////////////////////////////////////////////////////////////////////////
static inline bool is_greater24(ui32 a, ui32 b)
{ return is_greater32(a << 8, b << 8); }

///////////////////////////////////////////////////////////////////////////////
static inline bool is_smaller24(ui32 a, ui32 b)
{ return is_smaller32(a << 8, b << 8); }

///////////////////////////////////////////////////////////////////////////////
static inline ui32 clip_seq_num(ui32 n) { return (n & 0xFFFFFF); }

///////////////////////////////////////////////////////////////////////////////
//
//
Expand Down Expand Up @@ -79,20 +126,20 @@ rtp_packet* packets_handler::exchange(rtp_packet* p)
return p;

if (last_seq_num == 0) // initialization
last_seq_num = p->get_sequence_number() - 1;
last_seq_num = clip_seq_num(p->get_seq_num() - 1);

// packet is old, and is ignored -- no need to included it in the
// lost packets, because this packet was considered lost previously.
// This also captures the case where the previous packet and this packet
// has the same sequence number, which is rather weird but possible
// if some intermediate network unit retransmits packets.
if (p->get_sequence_number() < last_seq_num + 1)
if (is_smaller24(p->get_seq_num(), clip_seq_num(last_seq_num + 1)))
return p;
else if (p->get_sequence_number() == last_seq_num + 1)
else if (p->get_seq_num() == clip_seq_num(last_seq_num + 1))
{
consume_packet();
// see if we can push one packet from the top of the buffer
if (in_use && in_use->get_sequence_number() == last_seq_num + 1)
if (in_use && in_use->get_seq_num() == clip_seq_num(last_seq_num + 1))
consume_packet();
}
else // sequence larger than expected
Expand All @@ -104,12 +151,11 @@ rtp_packet* packets_handler::exchange(rtp_packet* p)
if (in_use->next != NULL) // we have more than 1 packet in queue
{
rtp_packet* t = in_use;
while (t->next != NULL &&
p->get_sequence_number() > t->next->get_sequence_number())
while (t->next != NULL &&
is_greater24(p->get_seq_num(), t->next->get_seq_num()))
t = t->next;

if (t->next != NULL &&
p->get_sequence_number() == t->next->get_sequence_number())
if (t->next != NULL && p->get_seq_num() == t->next->get_seq_num())
{ // this is a repeated packet and must be removed
in_use = in_use->next;
p->next = avail;
Expand All @@ -136,13 +182,15 @@ rtp_packet* packets_handler::exchange(rtp_packet* p)
// queue.
// If avail != NULL, we push one packet from the top of the buffer,
// if it has the correct sequence number.
if (avail == NULL || in_use->get_sequence_number() == last_seq_num + 1)
if (avail == NULL ||
in_use->get_seq_num() == clip_seq_num(last_seq_num + 1))
{
if (avail == NULL)
lost_packets += in_use->get_sequence_number() - last_seq_num - 1;
lost_packets +=
in_use->get_seq_num() - clip_seq_num(last_seq_num + 1);
consume_packet();
if (in_use && in_use->get_sequence_number() == last_seq_num + 1)
consume_packet();
if (in_use && in_use->get_seq_num() == clip_seq_num(last_seq_num + 1))
consume_packet();
}
}
}
Expand Down Expand Up @@ -172,7 +220,7 @@ void packets_handler::flush()
///////////////////////////////////////////////////////////////////////////////
void packets_handler::consume_packet()
{
last_seq_num = in_use->get_sequence_number();
last_seq_num = in_use->get_seq_num();
frames->push(in_use);
// move pack from in_use to avail; the packet must be equal to in_use
rtp_packet* p = in_use;
Expand Down Expand Up @@ -242,9 +290,9 @@ void frames_handler::init(bool quiet, const char *target_name,
///////////////////////////////////////////////////////////////////////////////
void frames_handler::push(rtp_packet* p)
{
assert(p->get_time_stamp() >= last_time_stamp);
assert(p->get_sequence_number() >= last_seq_number);
last_seq_number = p->get_sequence_number();
assert(!is_smaller32(p->get_time_stamp(), last_time_stamp));
assert(!is_smaller24(p->get_seq_num(), last_seq_number));
last_seq_number = p->get_seq_num();

// check if any of the frames processed in other threads are done
check_files_in_processing();
Expand All @@ -269,8 +317,8 @@ void frames_handler::push(rtp_packet* p)
in_use->next = NULL;

assert(in_use->done.load(std::memory_order_acquire) == 0);
in_use->timestamp = p->get_time_stamp();
in_use->last_seen_seq = p->get_sequence_number();
in_use->time_stamp = p->get_time_stamp();
in_use->last_seen_seq = p->get_seq_num();
in_use->frame_idx = total_frames;
in_use->f.open();
in_use->f.write(p->get_data(), p->get_data_size());
Expand All @@ -285,11 +333,11 @@ void frames_handler::push(rtp_packet* p)
{ // body packet payload
if (in_use != NULL)
{
if (p->get_time_stamp() == in_use->timestamp)
if (p->get_time_stamp() == in_use->time_stamp)
{ // this is a continuation of a previous frame
if (p->get_sequence_number() == in_use->last_seen_seq + 1)
if (p->get_seq_num() == clip_seq_num(in_use->last_seen_seq + 1))
{
in_use->last_seen_seq = p->get_sequence_number();
in_use->last_seen_seq = p->get_seq_num();
in_use->f.write(p->get_data(), p->get_data_size());
if (p->is_marked())
send_to_processing();
Expand All @@ -307,7 +355,7 @@ void frames_handler::push(rtp_packet* p)
++trunc_frames;
send_to_processing();

if (p->get_time_stamp() > last_time_stamp)
if (is_greater32(p->get_time_stamp(), last_time_stamp))
{
++total_frames;
last_time_stamp = p->get_time_stamp();
Expand All @@ -316,7 +364,7 @@ void frames_handler::push(rtp_packet* p)
}
else // no frame is being written
{
if (p->get_time_stamp() > last_time_stamp)
if (is_greater32(p->get_time_stamp(), last_time_stamp))
{
++total_frames;
last_time_stamp = p->get_time_stamp();
Expand Down Expand Up @@ -368,7 +416,7 @@ void frames_handler::check_files_in_processing()
if (f->done.load(std::memory_order_acquire) == 0)
{
// move f from processing to avail
f->timestamp = 0;
f->time_stamp = 0;
f->last_seen_seq = 0;
f->frame_idx = 0;
if (f == processing)
Expand Down
6 changes: 3 additions & 3 deletions src/apps/ojph_stream_expand/stream_expand_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ struct rtp_packet
ui32 get_csrc_count() { return (ui32)(data[0]) & 0xF; }
bool is_marked() { return (data[1] & 0x80) != 0; }
ui32 get_payload_type() { return (ui32)(data[1]) & 0x7F; }
ui32 get_sequence_number() {
ui32 get_seq_num() {
ui32 result = ntohs(*(ui16*)(data + 2));
result |= ((ui32)data[15]) << 16; // extended sequence (ESEQ)
return result;
Expand Down Expand Up @@ -357,7 +357,7 @@ struct stex_file {
*/
stex_file()
{
timestamp = last_seen_seq = 0;
time_stamp = last_seen_seq = 0;
done.store(0, std::memory_order_relaxed);
frame_idx = 0;
parent = NULL;
Expand Down Expand Up @@ -399,7 +399,7 @@ struct stex_file {

public:
ojph::mem_outfile f; //!<holds in-memory j2k codestream
ui32 timestamp; //!<time stamp at which this file must be displayed
ui32 time_stamp; //!<time stamp at which this file must be displayed
ui32 last_seen_seq; //!<the last seen RTP sequence number
std::atomic_int done; //!<saving is completed when 0 is reached
ui32 frame_idx; //!<frame number in the sequence
Expand Down

0 comments on commit 4065291

Please sign in to comment.