-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathzstd_reader.py
78 lines (64 loc) · 2.59 KB
/
zstd_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
from io import BufferedReader
import io
import zstandard as zstd
from utilities import Direction
class ZstdFrameReader:
def __init__(self, reader: asyncio.StreamReader, direction: Direction):
self.outputbuffer = NonSeekableMemoryStream()
self.decompressor = zstd.ZstdDecompressor().stream_writer(self.outputbuffer)
self.raw_reader = reader
self.direction = direction
self.zstd_enabled = False
def enable_zstd(self):
self.zstd_enabled = True
async def readexactly(self, count):
# print(f"Reading exactly {count} bytes")
while True:
# if there are enough bytes, return them
if self.outputbuffer.remaining() >= count:
# print (f"Returning {count} bytes from buffer {self.direction}")
return self.outputbuffer.read(count)
# print(f"Reading from network since there are only {self.remaining} bytes in buffer")
await self.read_from_network(count)
async def read_from_network(self, target_count):
while self.outputbuffer.remaining() < target_count:
chunk = await self.raw_reader.read(32768) # Read in chunks; we'll only get what's available
# print(f"Read {len(chunk)} bytes from network")
if not chunk:
raise asyncio.CancelledError("Connection closed")
if not self.zstd_enabled:
self.outputbuffer.write(chunk)
else:
try:
self.decompressor.write(chunk)
except zstd.ZstdError:
print("Zstd error, dropping connection")
raise asyncio.CancelledError("Error in compressed data stream!")
class NonSeekableMemoryStream(io.RawIOBase):
def __init__(self):
self.buffer = bytearray()
self.read_pos = 0
self.write_pos = 0
def write(self, b):
self.buffer.extend(b)
self.write_pos += len(b)
return len(b)
def read(self, size=-1):
if size == -1 or size > self.write_pos - self.read_pos:
size = self.write_pos - self.read_pos
if size == 0:
return b''
data = self.buffer[self.read_pos:self.read_pos + size]
self.read_pos += size
if self.read_pos == self.write_pos:
self.buffer = bytearray()
self.read_pos = 0
self.write_pos = 0
return bytes(data)
def remaining(self):
return self.write_pos - self.read_pos
def readable(self):
return True
def writable(self):
return True