Skip to content

Commit

Permalink
Merge pull request #47 from lsst-ts/unique_initial_state_request
Browse files Browse the repository at this point in the history
Initial_state subscription request
  • Loading branch information
jgalazm authored Dec 17, 2019
2 parents 8139d25 + a3b4c98 commit e07cc34
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
35 changes: 17 additions & 18 deletions manager/subscription/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,6 @@ async def handle_subscription_message(self, message):
'data': 'Successfully subscribed to %s-%s-%s-%s' % (category, csc, salindex, stream)
})

# If subscribed to an event, send request for initial-state
if category == 'event':
await self.channel_layer.group_send(
'initial_state-all-all-all',
{
'type': 'subscription_all_data',
'category': 'initial_state',
'data': [{
"csc": csc,
"salindex": salindex,
"data": {
"event_name": stream
}
}]
}
)
return

if option == 'unsubscribe':
# Unsubscribe and send confirmation
csc = message['csc']
Expand Down Expand Up @@ -236,6 +218,23 @@ async def _join_group(self, category, csc, salindex, stream):
self.channel_name
)

# If subscribing to an event, send the initial_state
if category == 'event':
await self.channel_layer.group_send(
'initial_state-all-all-all',
{
'type': 'subscription_all_data',
'category': 'initial_state',
'data': [{
"csc": csc,
"salindex": int(salindex),
"data": {
"event_name": stream
}
}]
}
)

async def _leave_group(self, category, csc, salindex, stream):
"""Leave a group in order to receive messages from it.
Expand Down
13 changes: 11 additions & 2 deletions manager/subscription/tests/test_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,24 +362,33 @@ async def test_request_initial_state_when_subscribing_to_event(self):
"stream": combination["stream"],
"category": combination["category"]
}

# Subscribe the first time
await client_communicator.send_json_to(msg)
producer_consumer_response = await producer_communicator.receive_json_from()

# Assert
# Assert first subscription
print('\n producer_consumer_response:')
pprint.pprint(producer_consumer_response)
assert producer_consumer_response == {
'category': 'initial_state',
'data': [{
'csc': combination["csc"],
'salindex': combination["salindex"],
'stream': {
'data': {
'event_name': combination["stream"]
},
}],
'subscription': 'initial_state-all-all-all'

}

# Assert second subscription doesn't produce a message
await client_communicator.send_json_to(msg)

with pytest.raises(asyncio.TimeoutError):
producer_consumer_response = await asyncio.wait_for(producer_communicator.receive_json_from(), timeout=self.no_reception_timeout)
print('adsfadsf')

await client_communicator.disconnect()
await producer_communicator.disconnect()

0 comments on commit e07cc34

Please sign in to comment.