forked from UBC-Thunderbots/Software
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_safe_buffer.py
114 lines (89 loc) · 4.19 KB
/
thread_safe_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import queue
from software.logger.logger import createLogger
from typing import Type, Optional
from google.protobuf.message import Message
class ThreadSafeBuffer(object):
MIN_DROPPED_BEFORE_LOG = 20
"""Multiple producer, multiple consumer buffer.
│ buffer_size │
├──────────────────────────────────────────┤
│ │
┌──────┬──────┬──────┬──────┬──────┬───────┐
put() │ │ │ │ │ │ │ get()
└──────┴──────┴──────┴──────┴──────┴───────┘
ThreadSafeBuffer
"""
def __init__(
self, buffer_size: int, protobuf_type: Type[Message], log_overrun: bool = False
) -> None:
"""A buffer to hold data to be consumed.
:param buffer size: The size of the buffer.
:param protobuf_type: To buffer
:param log_overrun: False
"""
self.logger = createLogger(protobuf_type.DESCRIPTOR.name + " Buffer")
self.queue = queue.Queue(buffer_size)
self.protobuf_type = protobuf_type
self.log_overrun = log_overrun
self.cached_msg = protobuf_type()
self.protos_dropped = 0
self.last_logged_protos_dropped = 0
def get(
self, block: bool = False, timeout: float = None, return_cached: bool = True
) -> Optional[Message]:
"""Get data from the buffer.
If the buffer is empty:
- If block is True
- wait until a new msg is received.
- If a timeout is supplied, wait for timeout seconds
- Then throw an error, or return cached message if return_cached is True
- If block is False
- Return None if return_cached is False
- Return cached message if return_cached is True
:param block: If block is True, then block until a new message
comes through, or returned the cached msg if return_cached = True
:param timeout: If block is True, then wait for this many seconds before
throwing an error or returning cached
:param return_cached: If queue is empty, decides whether to
return cached value (True) or return None / throw an error (false)
:return: protobuf (cached if block is False and there is no data
in the buffer)
"""
if (
self.log_overrun
and self.protos_dropped > self.last_logged_protos_dropped
and self.protos_dropped > self.MIN_DROPPED_BEFORE_LOG
):
self.logger.warn(
"packets dropped; thunderscope did not show {} protos".format(
self.protos_dropped
)
)
self.last_logged_protos_dropped = self.protos_dropped
if block:
try:
self.cached_msg = self.queue.get(timeout=timeout)
except queue.Empty as empty:
if not return_cached:
raise empty
else:
try:
self.cached_msg = self.queue.get_nowait()
except queue.Empty as empty:
if not return_cached:
return None
return self.cached_msg
def put(self, proto: Message, block: bool = False, timeout: float = None) -> None:
"""Put data into the buffer. If the buffer is full, then
the proto will be logged.
:param proto: The proto to place in the buffer
:param block: Should block until there is space in the buffer
:param timeout: If block is True, then wait for this many seconds
"""
if block:
self.queue.put(proto, block, timeout)
return
try:
self.queue.put_nowait(proto)
except queue.Full as full:
self.protos_dropped += 1