-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbulk-query.js
156 lines (143 loc) · 5.49 KB
/
bulk-query.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
const { Request: TDS_Request, BulkLoad: TDS_BulkLoad } = require('tedious');
const EventTracker = require('./event-tracker.js');
const Connection = require('./connection.js');
const Query = require('./query.js');
const Result = require('./result.js');
const tarn = require('tarn');
/**
* @typedef {BulkQuery | Promise.<Result>} PromiseBulkQuery
*/
/**
* @typedef BulkQuery.Options
* @property {Boolean} checkConstraints - Honors constraints during bulk load, using T-SQL CHECK_CONSTRAINTS.
* @property {Boolean} fireTriggers - Honors insert triggers during bulk load, using the T-SQL FIRE_TRIGGERS.
* @property {Boolean} keepNulls - Honors null value passed, ignores the default values set on table, using T-SQL KEEP_NULLS.
* @property {Boolean} tableLock - Places a bulk update(BU) lock on table while performing bulk load, using T-SQL TABLOCK.
* @property {Number} timeout - The number of milliseconds before the bulk load is considered failed, or 0 for no timeout.
*/
/**
* Provides promise extensions to a `BulkQuery` object and allows it to be executed on an aquired connection.
*/
class BulkQuery {
/**
* Creates a new instance of a `BulkQuery`.
* @param {String} tableName - The name of the table to perform the bulk insert.
* @param {BulkQuery.Options} options - Options to pass to the bulk query.
* @param {tarn.Pool} pool - The connection pool to utilize for aquiring the connection.
*/
constructor(tableName, options, pool) {
//validate
if (!pool) {
throw new Error('The parameter "pool" argument is required.');
}
/**
* @type {String}
*/
this.tableName = tableName || null;
/**
* @type {BulkQuery.Options}
*/
this.options = options || {};
/**
* The `tarn.Pool` instance linked to this query.
* @type {tarn.Pool}
*/
this.pool = pool;
/**
* Tracks the state of the bulk-query & connection objects and promises.
* @private
*/
this.state = {
/** @param {TDS_BulkLoad} */
bulk: null,
/** @param {Array} */
rows: [],
/** @type {Promise.<Number>} */
done: null,
/** @type {Connection} */
connection: null
}
}
/**
* Establishes a connection to begin a bulk-load operation.
* This is called automatically upon `column` or `row`, so you generally *do not* need to call it explicitly.
* @returns {BulkQuery}
*/
async aquire() {
if (!this.pool) {
throw new Error('The "pool" property is required.');
}
if (!this.state.bulk) {
//execute the query directly on TDS connection.
//note to self: avoid using async on thennables... it creates... oddities.
this.state.connection = await this.pool.acquire().promise;
this.state.bulk = this.state.connection._tdsConnection.newBulkLoad(this.tableName, this.options);
if (typeof this.options.timeout === 'number') {
this.state.bulk.setTimeout(this.options.timeout);
}
//create the bulk callback wrapped in a promise
let me = this;
this.state.done = new Promise((res, rej) => {
me.state.bulk.callback = (err, rowCount) => {
if (err) {
rej(err);
} else {
res(rowCount);
}
};
}).finally(() => {
if (me.state.connection) {
me.pool.release(me.state.connection);
}
});
}
return this;
}
/**
* Fire and complete the bulk-load.
*/
async execute() {
this.state.connection._tdsConnection.execBulkLoad(this.state.bulk, this.state.rows);
return await this.state.done;
}
/**
* Adds a column to the bulk query.
* @param {String} name - The column name.
* @param {QueryTypes} type - The TDS type of the column.
* @param {*} options - column options.
* @returns {BulkQuery}
*/
async column(name, type, options) {
await this.aquire();
this.state.bulk.addColumn(name, type, options);
return this;
}
/**
* Adds a row to the bulk query.
*
* Any `row` argument that is `null` or `undefined` is ignored (skipped).
* @throws Error when the row is non-null, non-undefined, and not an object.
* @param {...Object} rows - A spread of row objects. If an object it should have key/value pairs representing
* column name and value. If an array then it should represent the values of each column in the same order which
* they were added to the `BulkQuery` object.
* @returns {BulkQuery}
*/
async add(...rows) {
if (rows && rows.length) {
await this.aquire();
for (let i = 0; i < rows.length; i++) {
let row = rows[i];
let rtype = typeof row;
if (row === null || rtype === 'undefined') {
continue;
} else if (rtype === 'object') {
this.state.rows.push(row);
} else {
throw new Error(`Invalid row in bulk-query rows argument at index ${i}. Row must be an object, but instead found "${rtype}".`);
}
}
}
return this;
}
}
module.exports = BulkQuery;