Skip to content

Commit

Permalink
New Read URL trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
A3a3e1 authored May 21, 2024
1 parent 7a4d92a commit ad3c290
Show file tree
Hide file tree
Showing 8 changed files with 1,960 additions and 1,276 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.3.0 (May 21, 2024)
* Added new trigger - `Read CSV file from URL`

## 3.2.0 (May 02, 2023)
* Added new config fields to `Read CSV attachment` action:
* `Skip empty lines`
Expand Down
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* [Read CSV attachment](#read-CSV-attachment)
* [Create CSV From Message Stream](#create-CSV-from-message-stream)
* [Create CSV From JSON Array](#create-CSV-from-JSON-array)
* [Triggers](#triggers)
* [Read CSV file from URL](#read-csv-file-from-url)
* [Limitations](#limitations)

## Description
Expand Down Expand Up @@ -183,6 +185,51 @@ This action will convert an incoming array into a CSV file
* `attachmentExpiryTime` - When the attachment is set to expire
* `contentType` - Always set to `text/csv`
## Triggers
### Read CSV file from URL
This trigger read the CSV file from the URL provided in the configuration fields and output the result as a JSON object.
The trigger works pretty much the same as the [Read CSV attachment action](#read-CSV-attachment). The difference is that all the settings are to be provided in the configuration fields, not in the body message. As the triggers do not have input messages.
#### Config Fields
* `Emit Behavior` (dropdown, required) - this selector configures output behavior of the component.
* `Fetch All` - the component emits an array of messages;
* `Emit Individually` - the component emits a message per row;
* `Emit Batch` - component will produce a series of message where each message has an array of max length equal to the `Batch Size`;
* `Skip empty lines` (checkbox, optional) - by default, empty lines are parsed if checked they will be skipped
* `Comment char` (string, optional) - if specified, skips lines starting with this string
#### Input Metadata
* `URL` (string, required) - URL of the CSV file to parse
* `Contains headers` (boolean, optional) - If true, the first row of parsed data will be interpreted as field names, false by default.
* `Delimiter` (string, optional) - The delimiting character. Leave blank to auto-detect from a list of most common delimiters or provide your own
<details><summary>Example</summary>
if you use "$" as Delimiter, this CSV:
```
a$b$c$d
```
can be parsed into this JSON
``` json
{
"column0": "a",
"column1": "b",
"column2": "c",
"column3": "d"
}
```
</details>
* `Convert Data types` (boolean, optional) - Numeric data and boolean data will be converted to their type instead of remaining strings, false by default.
#### Output Metadata
- For `Fetch page` and `Emit Batch`: An object with key ***result*** that has an array as its value
- For `Emit Individually`: Each object fill the entire message
## Limitations
### General
Expand All @@ -191,4 +238,4 @@ This action will convert an incoming array into a CSV file
`EIO_REQUIRED_RAM_MB` environment variable with an appropriate value (e.g. value `1024` means that 1024 MB will be allocated) for the component in this case.
* Maximal possible size for an attachment is 10 MB.
* Attachments mechanism does not work with [Local Agent Installation](https://docs.elastic.io/getting-started/local-agent.html)
* Inbound message in `Message Stream` and each element of `JSON Array` should be a plain Object, if value not a primitive type it will be set as `[object Object]`
* Inbound message in `Message Stream` and each element of `JSON Array` should be a plain Object, if value not a primitive type it will be set as `[object Object]`
97 changes: 91 additions & 6 deletions component.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"title": "CSV",
"title": "CSV v3",
"description": "A comma-separated values (CSV) file stores tabular data (numbers and text) in plain-text form",
"docsUrl": "https://github.com/elasticio/csv-component",
"version": "3.2.0",
"version": "3.3.0",
"actions": {
"read_action": {
"main": "./lib/actions/read.js",
"title": "Read CSV attachment",
"help": {
"description": "This action will read the CSV attachment of the incoming message or from the specified URL and output a JSON object.",
"link": "/components/csv/actions#read-csv-attachment"
"link": "/components/csv/actions.html#read-csv-attachment"
},
"fields": {
"emitAll": {
Expand Down Expand Up @@ -50,7 +50,7 @@
"title": "Create CSV From Message Stream",
"help": {
"description": "Multiple incoming events can be combined into one CSV file with the write CSV action.",
"link": "/components/csv/actions#write-csv-attachment"
"link": "/components/csv/actions.html#write-csv-attachment"
},
"fields": {
"uploadToAttachment": {
Expand Down Expand Up @@ -101,7 +101,7 @@
"title": "Create CSV From JSON Array",
"help": {
"description": "Incoming array can be converted into one CSV file",
"link": "/components/csv/actions#write-csv-attachment"
"link": "/components/csv/actions.html#write-csv-attachment"
},
"fields": {
"uploadToAttachment": {
Expand Down Expand Up @@ -147,5 +147,90 @@
},
"dynamicMetadata": true
}
},
"triggers": {
"read_trigger": {
"type": "polling",
"main": "./lib/triggers/read.js",
"title": "Read CSV file from URL",
"help": {
"description": "This action will read the CSV attachment from the specified URL and output a JSON object.",
"link": "/components/csv/triggers.html#read-csv-file"
},
"fields": {
"url": {
"label": "URL of the CSV file",
"order": 90,
"required": true,
"viewClass": "TextFieldView"
},
"header": {
"label": "Contains headers",
"order": 80,
"required": false,
"viewClass": "CheckBoxView",
"help": {
"description": "If true, the first row of parsed data will be interpreted as field names, false by default"
}
},
"delimiter": {
"label": "Delimiter",
"order": 70,
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "The delimiting character. Leave blank to auto-detect from a list of most common delimiters or provide your own"
}
},
"dynamicTyping": {
"label": "Convert Data types",
"order": 60,
"required": false,
"viewClass": "CheckBoxView",
"help": {
"description": "Numeric data and boolean data will be converted to their type instead of remaining strings, false by default"
}
},
"emitBehavior": {
"label": "Emit Behavior",
"required": true,
"order": 50,
"viewClass": "SelectView",
"model": {
"fetchAll": "Fetch All",
"emitIndividually": "Emit Individually",
"emitBatch": "Emit Batch"
},
"prompt": "Select Emit Behavior"
},
"batchSize": {
"label": "Batch Size",
"required": false,
"order": 45,
"viewClass": "TextFieldView",
"help": {
"description": "Enter batch size if the 'Emit Behavior' field is set to 'Emit Batch'"
}
},
"skipEmptyLines": {
"label": "Skip empty lines",
"order": 40,
"required": false,
"viewClass": "CheckBoxView",
"help": {
"description": "By default, empty lines are parsed if checked they will be skipped"
}
},
"comments": {
"label": "Comment char",
"order": 30,
"required": false,
"viewClass": "TextFieldView",
"help": {
"description": "If specified, skips lines starting with this string"
}
}
}
}
}
}
}
118 changes: 118 additions & 0 deletions lib/triggers/read.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/* eslint-disable no-restricted-syntax,semi,comma-dangle,class-methods-use-this */

const { AttachmentProcessor } = require('@elastic.io/component-commons-library')
const { Writable } = require('stream');
const { messages } = require('elasticio-node')
const stream = require('stream')
const util = require('util')
const papa = require('papaparse')
const { getUserAgent } = require('../util');

const pipeline = util.promisify(stream.pipeline);

// transform array to obj, for example:
// ['aa', 'bb', 'cc'] => {column0: 'aa', column1: 'bb', column2: 'cc'}
function arrayToObj(arr) {
let columns = {}
arr.forEach((value, index) => {
columns = { ...columns, ...{ [`column${index}`]: value } }
})
return columns
}

async function readCSV(msg, cfg) {
const that = this;
const {
url,
header,
delimiter,
dynamicTyping,
emitBehavior,
batchSize,
skipEmptyLines,
comments
} = cfg;

if (emitBehavior === 'emitBatch') {
if (!isPositiveInteger(parseInt(batchSize, 10))) {
throw new Error("'batchSize' must be a positive integer!");
}
}
if (!url) throw new Error('URL of the CSV is missing');

const parseOptions = {
header,
dynamicTyping,
delimiter,
skipEmptyLines,
comments,
}

// if set "Fetch All" create object with results
const result = [];

let dataStream;
const parseStream = papa.parse(papa.NODE_STREAM_INPUT, parseOptions);

const attachmentProcessor = new AttachmentProcessor(getUserAgent(), msg.id);
try {
dataStream = await attachmentProcessor.getAttachment(url, 'stream')
this.logger.info('File received, trying to parse CSV')
} catch (err) {
this.logger.error(`URL - "${url}" unreachable: ${err}`);
await this.emit('error', `URL - "${url}" unreachable: ${err}`);
return;
}

const buf = [];
class CsvWriter extends Writable {
async write(chunk) {
let data = {}
if (parseOptions.header) {
data = chunk
} else {
data = arrayToObj(chunk)
}
if (emitBehavior === 'emitIndividually' || emitBehavior === false || emitBehavior === 'false' || emitBehavior === 'emitBatch') {
parseStream.pause()
if (emitBehavior === 'emitBatch') {
buf.push(data);
if (buf.length >= batchSize) await that.emit('data', messages.newMessageWithBody({ result: buf.splice(0, batchSize) }))
} else {
await that.emit('data', messages.newMessageWithBody(data))
}
parseStream.resume()
} else {
result.push(data)
}
}
}
const writerStream = new CsvWriter();
writerStream.logger = this.logger;

try {
await pipeline(
dataStream.data,
parseStream,
writerStream
)
this.logger.info('File parsed successfully')
} catch (err) {
this.logger.error(`error during file parse: ${err}`);
await this.emit('error', `error during file parse: ${err}`)
return;
}

if (emitBehavior === 'fetchAll') {
await this.emit('data', messages.newMessageWithBody({ result }))
} else if (emitBehavior === 'emitBatch' && buf.length > 0) {
await that.emit('data', messages.newMessageWithBody({ result: buf }))
}
this.logger.info(`Complete, memory used: ${process.memoryUsage().heapUsed / 1024 / 1024} Mb`)
}

module.exports.process = readCSV;

function isPositiveInteger(input) {
return Number.isSafeInteger(input) && input > 0;
}
Loading

0 comments on commit ad3c290

Please sign in to comment.