-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathconsolidation.js
191 lines (179 loc) · 6.92 KB
/
consolidation.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/*jslint node: true */
"use strict";
var constants = require('ocore/constants.js');
var db = require('ocore/db.js');
var mutex = require('ocore/mutex.js');
const ValidationUtils = require('ocore/validation_utils');
const AUTHOR_SIZE = 3 // "sig"
+ 44 // pubkey
+ 88 // signature
+ 32 // address
+ "pubkey".length + "definition".length + "r".length + "authentifiers".length + "address".length;
const TRANSFER_INPUT_SIZE = 0 // type: "transfer" omitted
+ 44 // unit
+ 8 // message_index
+ 8 // output_index
+ "unit".length + "message_index".length + "output_index".length; // keys
function readLeastFundedAddresses(asset, wallet, handleFundedAddresses){
if (ValidationUtils.isValidAddress(wallet))
return handleFundedAddresses([wallet]);
db.query(
"SELECT address, SUM(amount) AS total \n\
FROM my_addresses CROSS JOIN outputs USING(address) \n\
CROSS JOIN units USING(unit) \n\
WHERE wallet=? AND is_stable=1 AND sequence='good' AND is_spent=0 AND "+(asset ? "asset="+db.escape(asset) : "asset IS NULL")+" \n\
AND NOT EXISTS ( \n\
SELECT * FROM units CROSS JOIN unit_authors USING(unit) \n\
WHERE is_stable=0 AND unit_authors.address=outputs.address AND definition_chash IS NOT NULL \n\
) \n\
GROUP BY address ORDER BY SUM(amount) LIMIT 15",
[wallet],
function(rows){
handleFundedAddresses(rows.map(row => row.address));
}
);
}
function determineCountOfOutputs(asset, wallet, handleCount){
let filter = ValidationUtils.isValidAddress(wallet) ? "address=?" : "wallet=?";
db.query(
"SELECT COUNT(*) AS count FROM my_addresses CROSS JOIN outputs USING(address) JOIN units USING(unit) \n\
WHERE "+filter+" AND is_spent=0 AND "+(asset ? "asset="+db.escape(asset) : "asset IS NULL")+" AND is_stable=1 AND sequence='good'",
[wallet],
function(rows){
handleCount(rows[0].count);
}
);
}
function readDestinationAddress(wallet, handleAddress){
if (ValidationUtils.isValidAddress(wallet))
return handleAddress(wallet);
db.query("SELECT address FROM my_addresses WHERE wallet=? ORDER BY is_change DESC, address_index ASC LIMIT 1", [wallet], rows => {
if (rows.length === 0)
throw Error('no dest address');
handleAddress(rows[0].address);
});
}
function consolidate(wallet, signer, maxUnspentOutputs){
if (!maxUnspentOutputs)
throw Error("no maxUnspentOutputs");
const network = require('ocore/network.js');
if (network.isCatchingUp())
return;
var asset = null;
mutex.lock(['consolidate'], unlock => {
determineCountOfOutputs(asset, wallet, count => {
console.log(count+' unspent outputs');
if (count <= maxUnspentOutputs)
return unlock();
let count_to_spend = Math.min(count - maxUnspentOutputs + 1, constants.MAX_INPUTS_PER_PAYMENT_MESSAGE - 1);
readLeastFundedAddresses(asset, wallet, arrAddresses => {
db.query(
"SELECT address, unit, message_index, output_index, amount \n\
FROM outputs \n\
CROSS JOIN units USING(unit) \n\
WHERE address IN(?) AND is_stable=1 AND sequence='good' AND is_spent=0 AND "+(asset ? "asset="+db.escape(asset) : "asset IS NULL")+" \n\
AND NOT EXISTS ( \n\
SELECT * FROM units CROSS JOIN unit_authors USING(unit) \n\
WHERE is_stable=0 AND unit_authors.address=outputs.address AND definition_chash IS NOT NULL \n\
) \n\
ORDER BY amount LIMIT ?",
[arrAddresses, count_to_spend],
function(rows){
// if all inputs are so small that they don't pay even for fees, add one more large input
function addLargeInputIfNecessary(onDone){
var target_amount = 1000 + TRANSFER_INPUT_SIZE*rows.length + AUTHOR_SIZE*arrAddresses.length;
if (input_amount > target_amount)
return onDone();
target_amount += TRANSFER_INPUT_SIZE + AUTHOR_SIZE;
let filter = ValidationUtils.isValidAddress(wallet) ? "address=?" : "wallet=?";
db.query(
"SELECT address, unit, message_index, output_index, amount \n\
FROM my_addresses \n\
CROSS JOIN outputs USING(address) \n\
CROSS JOIN units USING(unit) \n\
WHERE "+filter+" AND is_stable=1 AND sequence='good' \n\
AND is_spent=0 AND "+(asset ? "asset="+db.escape(asset) : "asset IS NULL")+" \n\
AND NOT EXISTS ( \n\
SELECT * FROM units CROSS JOIN unit_authors USING(unit) \n\
WHERE is_stable=0 AND unit_authors.address=outputs.address AND definition_chash IS NOT NULL \n\
) \n\
AND amount>? AND unit NOT IN(?) \n\
LIMIT 1",
[wallet, target_amount - input_amount, Object.keys(assocUsedUnits)],
large_rows => {
if (large_rows.length === 0)
return onDone("no large input found");
let row = large_rows[0];
assocUsedAddresses[row.address] = true;
input_amount += row.amount;
arrInputs.push({
unit: row.unit,
message_index: row.message_index,
output_index: row.output_index
});
onDone();
}
);
}
var assocUsedAddresses = {};
var assocUsedUnits = {};
var input_amount = 0;
var arrInputs = rows.map(row => {
assocUsedAddresses[row.address] = true;
assocUsedUnits[row.unit] = true;
input_amount += row.amount;
return {
unit: row.unit,
message_index: row.message_index,
output_index: row.output_index
};
});
addLargeInputIfNecessary(err => {
if (err){
console.log("consolidation failed: "+err);
return unlock();
}
let arrUsedAddresses = Object.keys(assocUsedAddresses);
readDestinationAddress(wallet, dest_address => {
var composer = require('ocore/composer.js');
composer.composeJoint({
paying_addresses: arrUsedAddresses,
outputs: [{address: dest_address, amount: 0}],
inputs: arrInputs,
input_amount: input_amount,
earned_headers_commission_recipients: [{address: dest_address, earned_headers_commission_share: 100}],
callbacks: composer.getSavingCallbacks({
ifOk: function(objJoint){
network.broadcastJoint(objJoint);
unlock();
consolidate(wallet, signer, maxUnspentOutputs); // do more if something's left
},
ifError: function(err){
console.log('failed to compose consolidation transaction: '+err);
unlock();
},
ifNotEnoughFunds: function(err){
throw Error('not enough funds to compose consolidation transaction: '+err);
}
}),
signer: signer
});
});
});
}
);
});
});
});
}
function scheduleConsolidation(wallet, signer, maxUnspentOutputs, consolidationInterval){
if (!maxUnspentOutputs || !consolidationInterval)
return;
function doConsolidate(){
consolidate(wallet, signer, maxUnspentOutputs);
}
setInterval(doConsolidate, consolidationInterval);
setTimeout(doConsolidate, 300*1000);
}
exports.consolidate = consolidate;
exports.scheduleConsolidation = scheduleConsolidation;