-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscript.js
85 lines (77 loc) · 2.63 KB
/
script.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
var fs = require('fs');
var request = require('request');
var parse = require('csv-parse');
var transform = require('stream-transform');
var iconv = require('iconv-lite');
var uoZipUrl = 'http://old.minjust.gov.ua/downloads/15-UFOP.zip'
var uoZipFile = '/data/edr/15-UFOP.zip';
var uoFile = '/data/edr/uo.csv';
var dataDir = '/data/edr/';
var downloadAndUnzip = require('./downloadAndUnzip');
function downloadThen(done) {
var needToDownload = false;
try {
var stats = fs.statSync(uoFile);
} catch(e) {
needToDownload = true;
}
if(needToDownload) {
console.log(uoZipFile + ' doesn\'t exist, downloading...');
downloadAndUnzip(uoZipUrl, uoZipFile, dataDir)
.then(function(filenames) {
console.log('File unzipped');
done();
});
} else {
console.log('File already exists');
done();
}
}
downloadThen(function() {
var myWriter = require('./postgres')();
console.log('Reading ' + uoFile)
var input = fs.createReadStream(uoFile);
var parser = parse({delimiter: ';', columns: true});
var transformer = transform(function(record) {
return {
officialName: record["Найменування"],
name: record["Скорочена назва"],
edrpou: record["Код ЄДРПОУ"],
address: record["Місцезнаходження"],
mainPerson: record["ПІБ керівника"],
occupation: record["Основний вид діяльності"],
status: record["Стан"]
};
}, {parallel: 10, highWaterMark: 2});
function handleError(e, stage) {
console.log("Error while " + stage + ": " + e.message);
}
input
.on('error', function(e){handleError(e, 'reading file');})
.on('end', function() { console.log("reading file stream finished"); });
var decoder = iconv.decodeStream('win1251');
decoder
.on('error', function(e){handleError(e, 'decoding file');})
.on('end', function() { console.log("decoding stream finished"); });
parser
.on('error', function(e){handleError(e, 'parsing csv');})
.on('end', function() { console.log("csv parser stream finished"); })
transformer
.on('error', function(e){handleError(e, 'transforming csv to object');})
.on('unpipe', function() {console.log("Transformer: UNPIPE");})
.on('finish', function() {console.log("Transformer: finish");})
.on('end', function() {console.log("Transformer: end");})
myWriter
.on('error', function(e){
handleError(e, 'writing to db');
// continue piping!
transformer
.pipe(myWriter);
})
.on('end', function() { console.log("writer stream finished"); })
input
.pipe(decoder)
.pipe(parser)
.pipe(transformer)
.pipe(myWriter);
});