Skip to content

Commit

Permalink
MT#57550 Demount call_subscription concept
Browse files Browse the repository at this point in the history
From now on the `call_subscription` concept gets
deprecated, and instead of it the `media_subscriptions`
concept gets applied.

Benefits of this change is:
- ability to subscribe one-to-multiple medias (different monologues)
- media level manipulations, without affecting whole SDP session
- no need to use medias offset, to detect proper subscription's media
- there is no need of particular medias order, they can be
  subscribed to each other in any possible way
  (even though RFC still requires to always have proper ordering)

Deprecated objects:
- `struct call_subscription`
- `GQueue subscriptions`
- `GQueue subscribers`
- `GHashTable * subscriptions_ht`
- `GHashTable * subscribers_ht`

Deprecated functionality:
- `__unsubscribe_one()`
- `__unsubscribe_all_offer_answer_subscribers()`
- `__unsubscribe_from_all()`
- `__subscribe_offer_answer_both_ways()`
- `__add_subscription()`
- `__unsubscribe_one_link()`
- `call_get_call_subscription()`
- `call_subscriptions_clear()`
- `call_subscriptions_free()`

Offtopic: additionally this commit adds helper func:
- `call_media_subscribed_to_monologue()`

Change-Id: Ifb44f7a1ba5b483b1472882b1b8d06444dba1727
  • Loading branch information
zenichev committed Nov 13, 2023
1 parent 8ac455d commit 6a792f2
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 212 deletions.
200 changes: 43 additions & 157 deletions daemon/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ void free_sink_handler(void *p) {
}

/**
* A transfer of flags from the subscription (call_subscription) to the sink handlers (sink_handler) is done
* A transfer of flags from the subscription to the sink handlers (sink_handler) is done
* using the __init_streams() through __add_sink_handler().
*/
void __add_sink_handler(GQueue *q, struct packet_stream *sink, const struct sink_attrs *attrs) {
Expand Down Expand Up @@ -2984,11 +2984,6 @@ int monologue_offer_answer(struct call_monologue *monologues[2], GQueue *streams
return ERROR_NO_FREE_LOGS;
}


void call_subscriptions_clear(GQueue *q) {
g_queue_clear_full(q, call_subscription_free);
}

void media_subscriptions_clear(GQueue *q) {
g_queue_clear_full(q, media_subscription_free);
}
Expand All @@ -3013,20 +3008,6 @@ static void __unsubscribe_media_link(struct call_media * which, GList * which_cm
g_slice_free1(sizeof(*ms), ms);
g_slice_free1(sizeof(*rev_ms), rev_ms);
}
static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) {
struct call_subscription *cs = which_cs_link->data;
struct call_subscription *rev_cs = cs->link->data;
struct call_monologue *from = cs->monologue;
ilog(LOG_DEBUG, "Unsubscribing '" STR_FORMAT_M "' from '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&from->tag));
g_queue_delete_link(&from->subscribers, cs->link);
g_queue_delete_link(&which->subscriptions, which_cs_link);
g_hash_table_remove(which->subscriptions_ht, cs->monologue);
g_hash_table_remove(from->subscribers_ht, rev_cs->monologue);
g_slice_free1(sizeof(*cs), cs);
g_slice_free1(sizeof(*rev_cs), rev_cs);
}
/**
* Unsubscribe one particular media subscriber from this call media.
*/
Expand All @@ -3047,31 +3028,6 @@ static bool __unsubscribe_media(struct call_media * which, struct call_media * f
__unsubscribe_media_link(which, l);
return true;
}
static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) {
GList *l = g_hash_table_lookup(which->subscriptions_ht, from);
if (!l) {
ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is not subscribed to '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&from->tag));
return false;
}
__unsubscribe_one_link(which, l);
return true;
}
static void __unsubscribe_all_offer_answer_subscribers(struct call_monologue *ml) {
for (GList *l = ml->subscribers.head; l; ) {
struct call_subscription *cs = l->data;
if (!cs->attrs.offer_answer) {
l = l->next;
continue;
}
GList *next = l->next;
struct call_monologue *other_ml = cs->monologue;
__unsubscribe_one(other_ml, ml);
__unsubscribe_one(ml, other_ml);
l = next;
}
}
/**
* Deletes all offer/answer media subscriptions.
*/
Expand All @@ -3093,13 +3049,6 @@ static void __unsubscribe_all_offer_answer_medias(struct call_media * cm) {
l = next;
}
}
static void __unsubscribe_from_all(struct call_monologue *ml) {
for (GList *l = ml->subscriptions.head; l; ) {
GList *next = l->next;
__unsubscribe_one_link(ml, l);
l = next;
}
}
static void __unsubscribe_medias_from_all(struct call_monologue *ml) {
for (int i = 0; i < ml->medias->len; i++)
{
Expand Down Expand Up @@ -3190,45 +3139,6 @@ void __add_media_subscription(struct call_media * which, struct call_media * to,
g_hash_table_insert(which->media_subscriptions_ht, to, to_rev_ms->link);
g_hash_table_insert(to->media_subscribers_ht, which, which_ms->link);
}
void __add_subscription(struct call_monologue *which, struct call_monologue *to,
unsigned int offset, const struct sink_attrs *attrs)
{
if (g_hash_table_lookup(which->subscriptions_ht, to)) {
ilog(LOG_DEBUG, "Tag '" STR_FORMAT_M "' is already subscribed to '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&to->tag));
return;
}
ilog(LOG_DEBUG, "Subscribing '" STR_FORMAT_M "' to '" STR_FORMAT_M "'",
STR_FMT_M(&which->tag),
STR_FMT_M(&to->tag));
struct call_subscription *which_cs = g_slice_alloc0(sizeof(*which_cs));
struct call_subscription *to_rev_cs = g_slice_alloc0(sizeof(*to_rev_cs));
which_cs->monologue = to;
to_rev_cs->monologue = which;
which_cs->media_offset = offset;
to_rev_cs->media_offset = offset;
// preserve attributes if they were present previously
if (attrs) {
which_cs->attrs = *attrs;
to_rev_cs->attrs = *attrs;
}
// keep offer-answer subscriptions first in the list
if (!attrs || !attrs->offer_answer) {
g_queue_push_tail(&which->subscriptions, which_cs);
g_queue_push_tail(&to->subscribers, to_rev_cs);
which_cs->link = to->subscribers.tail;
to_rev_cs->link = which->subscriptions.tail;
}
else {
g_queue_push_head(&which->subscriptions, which_cs);
g_queue_push_head(&to->subscribers, to_rev_cs);
which_cs->link = to->subscribers.head;
to_rev_cs->link = which->subscriptions.head;
}
g_hash_table_insert(which->subscriptions_ht, to, to_rev_cs->link);
g_hash_table_insert(to->subscribers_ht, which, which_cs->link);
}
/**
* Subscribe medias to each other.
*/
Expand Down Expand Up @@ -3263,28 +3173,6 @@ static void __subscribe_medias_both_ways(struct call_media * a, struct call_medi
__add_media_subscription(a, b, &a_attrs);
__add_media_subscription(b, a, &b_attrs);
}
static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) {
// retrieve previous subscriptions to retain attributes
struct call_subscription *a_cs = call_get_call_subscription(a->subscriptions_ht, b);
struct call_subscription *b_cs = call_get_call_subscription(b->subscriptions_ht, a);
// copy out attributes
struct sink_attrs a_attrs = {0,};
struct sink_attrs b_attrs = {0,};
if (a_cs)
a_attrs = a_cs->attrs;
if (b_cs)
b_attrs = b_cs->attrs;
// override/reset some attributes
a_attrs.offer_answer = b_attrs.offer_answer = true;
a_attrs.egress = b_attrs.egress = false;
a_attrs.rtcp_only = b_attrs.rtcp_only = false;
// delete existing subscriptions
__unsubscribe_all_offer_answer_subscribers(a);
__unsubscribe_all_offer_answer_subscribers(b);
// (re)create, preserving existing attributes if there were any
__add_subscription(a, b, 0, &a_attrs);
__add_subscription(b, a, 0, &b_attrs);
}

/**
* Subscribe media lines to each other respecting the given order in the SDP offer/answer.
Expand Down Expand Up @@ -3341,12 +3229,6 @@ struct media_subscription *call_get_media_subscription(GHashTable *ht, struct ca
return NULL;
return l->data;
}
struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml) {
GList *l = g_hash_table_lookup(ht, ml);
if (!l)
return NULL;
return l->data;
}

/* called with call->master_lock held in W */
__attribute__((nonnull(1, 2, 3)))
Expand Down Expand Up @@ -3617,32 +3499,31 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, struct sdp_ng_flag
/* called with call->master_lock held in W */
__attribute__((nonnull(1, 2)))
int monologue_unsubscribe(struct call_monologue *dst_ml, struct sdp_ng_flags *flags) {
for (GList *l = dst_ml->subscriptions.head; l; ) {
GList *next = l->next;
struct call_subscription *cs = l->data;
struct call_monologue *src_ml = cs->monologue;
for (unsigned int i = 0; i < dst_ml->medias->len; i++)
{
struct call_media *media = dst_ml->medias->pdata[i];
if (!media)
continue;

__unsubscribe_one_link(dst_ml, l);
__media_unconfirm(media, "media unsubscribe");
__update_init_subscribers(media, NULL, NULL, flags->opmode);

for (unsigned int i = 0; i < dst_ml->medias->len; i++)
/* TODO: should we care about subscribers as well? */
for (GList *l = media->media_subscriptions.head; l; )
{
struct call_media *media = dst_ml->medias->pdata[i];
if (!media)
continue;
__update_init_subscribers(media, NULL, NULL, flags->opmode);
}
for (unsigned int i = 0; i < src_ml->medias->len; i++)
{
struct call_media *media = src_ml->medias->pdata[i];
if (!media)
GList *next = l->next;
struct media_subscription * ms = l->data;
struct call_media * src_media = ms->media;

if (!src_media)
continue;
__update_init_subscribers(media, NULL, NULL, flags->opmode);
}

dialogue_unconfirm(src_ml, "monologue unsubscribe");
dialogue_unconfirm(dst_ml, "monologue unsubscribe");
__media_unconfirm(src_media, "media unsubscribe");
__update_init_subscribers(src_media, NULL, NULL, flags->opmode);
__unsubscribe_media_link(media, l);

l = next;
l = next;
}
}

return 0;
Expand Down Expand Up @@ -4038,10 +3919,6 @@ void call_media_free(struct call_media **mdp) {
*mdp = NULL;
}

void call_subscription_free(void *p) {
g_slice_free1(sizeof(struct call_subscription), p);
}

void __monologue_free(struct call_monologue *m) {
g_ptr_array_free(m->medias, true);
g_hash_table_destroy(m->associated_tags);
Expand All @@ -4053,10 +3930,6 @@ void __monologue_free(struct call_monologue *m) {
sdp_free(&m->last_in_sdp_parsed);
g_queue_clear_full(&m->sdp_attributes, free);
sdp_streams_free(&m->last_in_sdp_streams);
g_hash_table_destroy(m->subscribers_ht);
g_hash_table_destroy(m->subscriptions_ht);
g_queue_clear_full(&m->subscribers, call_subscription_free);
g_queue_clear_full(&m->subscriptions, call_subscription_free);
g_slice_free1(sizeof(*m), m);
}

Expand Down Expand Up @@ -4264,8 +4137,6 @@ struct call_monologue *__monologue_create(struct call *call) {
ret->medias = g_ptr_array_new();
ret->media_ids = g_hash_table_new(str_hash, str_equal);
ret->ssrc_hash = create_ssrc_hash_call();
ret->subscribers_ht = g_hash_table_new(g_direct_hash, g_direct_equal);
ret->subscriptions_ht = g_hash_table_new(g_direct_hash, g_direct_equal);

gettimeofday(&ret->started, NULL);

Expand Down Expand Up @@ -4540,6 +4411,29 @@ static bool call_monologues_associations_left(struct call * c) {
return false;
}

/**
* Check whether given media is subscribed to any of given monologue medias.
* Returns: media subscription or NULL.
*/
struct media_subscription * call_media_subscribed_to_monologue(const struct call_media * media,
const struct call_monologue * monologue)
{
if (!media || !monologue)
return false;

for (int i = 0; i < monologue->medias->len; i++)
{
struct call_media * sub_media = monologue->medias->pdata[i];
if (!sub_media)
continue;

GList * l = g_hash_table_lookup(sub_media->media_subscribers_ht, media);
if (l)
return l->data; /* found */
}
return NULL;
}

/**
* Check whether given totag is already subscribed to the given monologue medias.
* Returns: true - subscribed, false - not subscribed.
Expand Down Expand Up @@ -4668,9 +4562,6 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c
/* if monologue doesn't exist, then nothing to subscribe yet */
if (!monologue)
goto new_branch;

__subscribe_offer_answer_both_ways(ret, monologue); /* TODO: deprecate */

/* susbcribe existing medias */
__subscribe_matched_medias(ret, monologue);
}
Expand All @@ -4688,8 +4579,6 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c
if (os) {
/* previously seen branch. use it */
__monologue_unconfirm(os, "dialogue/branch association changed");
__subscribe_offer_answer_both_ways(ret, os); /* TODO: deprecate */

/* susbcribe medias to medias */
__subscribe_matched_medias(ret, os);
goto monologues_intact;
Expand All @@ -4701,9 +4590,7 @@ static int call_get_monologue_new(struct call_monologue *monologues[2], struct c
new_branch:
__C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch));
os = __monologue_create(call);
__subscribe_offer_answer_both_ways(ret, os); /* TODO: deprecate */
__monologue_viabranch(os, viabranch);

/* susbcribe medias to medias */
__subscribe_matched_medias(ret, os);

Expand Down Expand Up @@ -4821,7 +4708,6 @@ static int call_get_dialogue(struct call_monologue *monologues[2], struct call *

dialogue_unconfirm(ft, "dialogue signalling event");
dialogue_unconfirm(tt, "dialogue signalling event");
__subscribe_offer_answer_both_ways(ft, tt);

/* susbcribe medias to medias */
__subscribe_matched_medias(ft, tt);
Expand Down
Loading

0 comments on commit 6a792f2

Please sign in to comment.