-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfuture_queue.py
281 lines (223 loc) · 8.94 KB
/
future_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import asyncio
import aiormq
import logging
from enum import Enum
from .basic_queue import BasicQueue, QueueMessage
log = logging.getLogger(__name__)
class MessageFuture(asyncio.Future):
"""
A Future that represents criteria for a message
It will be set once a message is passed to it that satisfies the criteria
:param custom: a callable that takes a message and returns True or False
"""
# TODO: match any / all
def __init__(self, match = {}, headers = {}, headers_eq = {}, corr_id=None, custom=None, *args, **kwargs):
super().__init__(*args, **kwargs)
assert isinstance(match, dict)
assert isinstance(headers, dict)
assert isinstance(headers_eq, dict)
assert custom is None or callable(custom)
self._match = match
self._headers = headers
self._headers_eq = headers_eq
self._corr_id = corr_id
self._custom = custom
self._result_value = None
def is_match(self, msg : QueueMessage):
if self._match:
if isinstance(msg.content, dict):
if not msg.match_dict(self._match):
return False
if self._headers:
if not msg.match_headers_dict(self._headers):
return False
if self._headers_eq:
if msg.headers != self._headers_eq:
return False
if self._corr_id:
if msg.corr_id != self._corr_id:
return False
if self._custom:
if not self._custom(msg):
return False
return True
def process(self, msg : QueueMessage):
if self.done():
return False
assert not self.cancelled(), "Can't pass a message to a cancelled MessageFuture"
assert not self.done(), f"Tried to process {msg} on a MessageFuture that is already set to {self._result_value}"
if self.is_match(msg):
self.set_result(msg)
self._result_value = msg
return True
else:
return False
def cancel(self, *args, **kwargs):
#log.debug(f"Cancel {self!r}")
return super().cancel(*args, **kwargs)
def __str__(self):
tmp = {
'corr_id' : self._corr_id,
'headers_match' : self._headers,
'headers_eq': self._headers_eq
}
attr = [repr(self._match)] + [f"{k}={tmp[k]}" for k in sorted(tmp.keys()) if tmp[k]]
return f"<{', '.join(attr)}>"
def __repr__(self):
return str(self)
class FutureQueueMode(Enum):
"""
What do we do with messages for which no Future has been found?
DROP: drop unmatched messages
STRICT: raise an error if a message can't be matched
WAIT: wait (indefinitely) until a matching Future has been passed via receive; if the buffer is full, messages will be dropped
CIRCULAR: Like WAIT, only that old messages are dropped if the buffer is full and they haven't been used (buffer overflow safe!)
"""
DROP = 'drop'
STRICT = 'strict'
WAIT = 'wait'
CIRCULAR = 'circular'
class FutureQueueDropReason(Enum):
"""
Represents the reason why a message was dropped. Passed to on_drop callback
NO_FUTURE_FOUND: If a matching Future has not been found (for DROP)
BUFFER_FULL: If the buffer is full (WAIT, CIRCULAR)
NOT_REGISTERED: The message has not been registered (used by ManagedQueue)
"""
NO_FUTURE_FOUND = 'no_future_found'
FULL_BUFFER = 'full_buffer'
NOT_REGISTERED = 'not_registered'
class FutureQueueException(Exception):
pass
class FutureQueue(BasicQueue):
# TODO: use Queue instead of linked list
"""
:param mode: Specifies behavior for unexpected messages, either a FutureQueueMode or a function that takes the unexpected message and returns a FutureQueueMode
:param on_drop: a callable of the form func(msg: QueueMessage, reason: FutureQueueDropReason). It is called every time a message is dropped in the DROP, WAIT and CIRCULAR modes
"""
def __init__(self, *args, mode : FutureQueueMode = FutureQueueMode.DROP, on_drop = None, buffer_size = 1000, **kwargs):
super().__init__(*args, **kwargs)
self._receive = []
self._wait = []
self._mode = mode
self._on_drop = on_drop or self._on_drop_default
self._buffer_size = buffer_size
async def _process_message(self, message):
log.debug(f"Process message {message}")
found = False
n = 0
for f in self._receive:
if f.process(message):
found = True
n += 1
if found:
log.debug(f"{n} Future(s) found for {message}")
if not found:
log.debug(f"No future found for {message}")
selected_mode = None
if isinstance(self._mode, FutureQueueMode):
selected_mode = self._mode
elif callable(self._mode):
selected_mode = self._mode(message)
assert isinstance(selected_mode, FutureQueueMode)
else:
assert False
if selected_mode is FutureQueueMode.STRICT:
raise FutureQueueException("Couldn't find a matching future for a message")
if selected_mode is FutureQueueMode.WAIT:
if len(self._wait) < self._buffer_size:
log.debug(f"Add to queue: {message}")
self._wait.append(message)
else:
log.warning(f"Message buffer full: Drop message: {message!r}")
self._on_drop(message, FutureQueueDropReason.FULL_BUFFER)
if selected_mode is FutureQueueMode.DROP:
log.warning(f"Drop message: {message!r}")
self._on_drop(message, FutureQueueDropReason.NO_FUTURE_FOUND)
if selected_mode is FutureQueueMode.CIRCULAR:
# At no point are two messages added, so == is enough
if len(self._wait) == self._buffer_size:
old_message = self._wait.pop(0)
log.warning(f"Dropped old message: {old_message!r}")
self._on_drop(old_message, FutureQueueDropReason.FULL_BUFFER)
# Sanity check (could fail if you have multiple threads)
assert len(self._wait) < self._buffer_size
log.debug(f"Add to queue: {message}")
self._wait.append(message)
async def start_consume(self, **kwargs):
await super().start_consume(self._process_message, **kwargs)
def receive_future(self, *args, timeout=None, **kwargs):
"""
Works like receive, but returns a future
TODO: Not tested
"""
future = MessageFuture(*args, **kwargs)
log.debug('Wait for: %s', future)
# Try to find a message in the waiting queue first
# If we find a suitable one, we don't need the done callback since we processed it immediately and didn't add it to `_receive`
for message in self._wait:
if future.process(message):
log.debug(f"Future found for: {message!r}")
self._wait.remove(message)
return future
self._receive.append(future)
# Remove a MessageFuture from the list when it is done
# Removes if when a result is set
# Ensures that a cancelled future is removed
# This can happen if the user decides they no longer need a message
def on_done(fut):
if fut.cancelled():
log.debug(f"on_done:MessageFuture cancelled: {fut!r}")
self._receive.remove(fut)
future.add_done_callback(on_done)
if not timeout:
return future
else:
return asyncio.create_task(asyncio.wait_for(future, timeout=timeout))
async def receive(self, *args, timeout=None, **kwargs):
"""
Raises asyncio.TimeoutError if a timeout was set and elapsed before the message arrived
"""
future = MessageFuture(*args, **kwargs)
log.debug('Wait for: %s', future)
# Try to find a message in the waiting queue first
# If we find a suitable one, we don't need the done callback since we processed it immediately and didn't add it to `_receive`
for message in self._wait:
if future.process(message):
log.debug(f"Future found for: {message!r}")
self._wait.remove(message)
return future.result()
self._receive.append(future)
# Remove a MessageFuture from the list when it is done
# Removes if when a result is set
# Ensures that a cancelled future is removed
# This can happen if the user decides they no longer need a message
def on_done(fut):
if fut.cancelled():
log.debug(f"on_done:MessageFuture cancelled: {fut!r}")
self._receive.remove(fut)
future.add_done_callback(on_done)
if not timeout:
return await future
else:
try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.CancelledError:
raise
except asyncio.TimeoutError as ex:
log.debug(f'Message timeout: {future}')
raise
# TODO: Alternative is to have a normal routine that returns a Future
# if not timeout:
# return future
# else:
# t = asyncio.create_task(asyncio.wait_for(future, timeout=timeout))
# t.add_done_callback(react_to_timeout)
# return t
def _on_drop_default(self, msg, reason):
if reason is FutureQueueDropReason.FULL_BUFFER:
log.warning(f"Buffer full. Drop: {msg.short_str()}")
elif reason is FutureQueueDropReason.NOT_REGISTERED:
log.warning(f"Unexpected message. Drop: {msg.short_str()}")
elif reason is FutureQueueDropReason.NO_FUTURE_FOUND:
log.warning(f"No future found. Drop: {msg.short_str()}")