-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
133 lines (109 loc) · 3.58 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
const EventEmitter = require('events')
const createHash = require('./src/create-hash')
const chunker = require('./src/chunker')
const zlib = require('./src/zlib')
/**
* @typedef {object} Chunk
* @property {string} id Id of the entire data this chunk is part of.
* @property {number} total Total amounts of chunks for the entire data.
* @property {number} index Number of this chunk for the entire data.
* @property {string} data Stringified array using JSON.stringfy of the buffer part for this chunk.
*/
/**
* Create an array of Chunk objects from from the given String or Buffer
* @param {object} options
* @param {Buffer} options.content Content that needs to be chunked.
* @param {number} options.chunkSize Maximum size in bytes for each chunk.
* @param {boolean} [options.compress=true] If the content should be compressed using gzip
* @returns {Promise<Chunk[]>} An array containing all the chunks objects.
*/
exports.createChunks = async ({
content,
chunkSize,
compress = true
}) => {
if (!Buffer.isBuffer(content)) {
throw new Error('options.content must be a buffer')
}
const id = createHash(content)
const compressed = compress
? await zlib.compress(content)
: content
const chunks = chunker.split(compressed, chunkSize)
const total = chunks.length
const result = chunks.map((data, index) => ({
id,
total,
index,
data
}))
return result
}
/**
* Joins the given array of Chunk objects into a string.
* @param {Chunk[]} chunks array of chnks generated using the createChunks fn
* @returns {Promise<string>}
*/
exports.joinChunks = async (chunks) => {
if (!Array.isArray(chunks) || chunks.length === 0) {
throw new Error('Invalid chunks param')
}
const compressed = chunker.join(chunks.map((chunk) => chunk.data))
const buff = await zlib.decompress(compressed)
return buff.toString()
}
/**
* @typedef {object} ChunksReceiver
* @property {function} addChunk
*/
/**
* Create a ChunksReader object from a String or a Buffer
* @param {object} [options]
* @param {number} [options.timeout=60000] Milliseconds before emitting a TIMEOUT error
* @returns {EventEmitter & ChunksReceiver}
*/
exports.createReceiver = ({ timeout = 60000 } = {}) => {
const receivers = new Map()
const chunksReceiver = new EventEmitter()
/**
* @param {Chunk} chunk Received item chunk to add to the final data.
*/
chunksReceiver.addChunk = async (chunk) => {
if (!chunk) throw new Error('Missing chunk to add')
const { id, total } = chunk
if (!receivers.has(id)) {
const receiver = {
total,
received: 0,
chunks: [],
timeoutId: setTimeout(() => {
receivers.delete(id)
const err = new Error(`Timeout when receiving chunks "${id}"`)
chunksReceiver.emit('error', err)
}, timeout)
}
receivers.set(id, receiver)
}
const receiver = receivers.get(id)
if (receiver.total <= receiver.received) {
throw new Error('All chunks already received')
}
if (!Number.isSafeInteger(chunk.index) || chunk.index < 0 || chunk.index >= total) {
throw new Error('Invalid chunk.index value')
}
receiver.chunks[chunk.index] = chunk
receiver.received++
// The message has been completely received
if (receiver.total === receiver.received) {
clearTimeout(receiver.timeoutId)
receivers.delete(id)
try {
const result = await exports.joinChunks(receiver.chunks)
chunksReceiver.emit('message', result)
} catch (err) {
chunksReceiver.emit('error', err)
}
}
}
return chunksReceiver
}