Skip to content

Commit

Permalink
🐛 throttle concurrent file reads to prevent exceptions when there are…
Browse files Browse the repository at this point in the history
… many files (#2)
  • Loading branch information
ctcpip authored Apr 30, 2024
1 parent 729ac87 commit 51fa060
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 1,046 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: 'test'

on: [pull_request, push]

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: ljharb/actions/node/install@main
name: 'nvm install lts/* && npm install'
- run: npm test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules/
db/
*.log
.vscode
1 change: 1 addition & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
10 changes: 0 additions & 10 deletions .travis.yml

This file was deleted.

29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,35 @@ const store = fortune(recordTypes, {
```


## Options


| Option | Default | |
| --- | --- | ---|
| `concurrentReads`| `128` | limits how many files can be read concurrently by `Adapter.find()` |

### Options Example

```js
const path = require('path')
const fortune = require('fortune')
const fsAdapter = require('fortune-fs')

const store = fortune(recordTypes, {
adapter: [ fsAdapter, {
// Absolute path to database directory.
path: path.join(__dirname, 'db'),
concurrentReads: 32
} ]
})
```


## Troubleshooting

If you have a large number of records (files), you may encounter `Error: EMFILE: too many open files`. Set the `concurrentReads` [option](#options) to a lower value to resolve this.


## License

This software is licensed under the [MIT license](https://raw.githubusercontent.com/fortunejs/fortune-fs/master/LICENSE).
25 changes: 25 additions & 0 deletions eslint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const js = require('@eslint/js')
const stylisticJS = require('@stylistic/eslint-plugin-js')

module.exports = [
js.configs.recommended,
{
languageOptions: {
ecmaVersion: 'latest',
globals: {
__dirname: true,
console: true,
module: true,
process: true,
require: true,
setTimeout: true,
},
},
plugins: { '@stylistic/js': stylisticJS },
rules: {
"no-trailing-spaces": "error",
semi: ["error", "never"],
'@stylistic/js/indent': ['error', 2],
}
}
]
89 changes: 63 additions & 26 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
var path = require('path')
var fs = require('fs')
var msgpack = require('msgpack-lite')
var mkdirp = require('mkdirp')
var { mkdirp } = require('mkdirp')
var lockFile = require('lockfile')

// in benchmarking tests with 124732 records,
// a concurrency limit of 128 was the sweet spot.
// lower limits took longer.
// higher limits caused performance degradation.
let concurrentReads = 128

/**
* File system adapter. Available options:
Expand All @@ -25,6 +30,13 @@ module.exports = function (Adapter) {
// No LRU, allow as many records as possible.
delete this.options.recordsPerType

if(Number.isInteger(this.options.concurrentReads)) {
if(this.options.concurrentReads < 1) {
throw new RangeError("concurrentReads must be > 0")
}
concurrentReads = this.options.concurrentReads
}

primaryKey = properties.common.constants.primary
map = properties.common.map
}
Expand All @@ -45,10 +57,7 @@ module.exports = function (Adapter) {
return Promise.all(map(Object.keys(self.recordTypes), function (type) {
return new Promise(function (resolve, reject) {
var typeDir = path.join(dbPath, type)

mkdirp(typeDir, function (error) {
return error ? reject(error) : resolve()
})
mkdirp(typeDir).then(resolve,reject)
})
}))
})
Expand Down Expand Up @@ -85,27 +94,43 @@ module.exports = function (Adapter) {
return error ? reject(error) : resolve(files)
})
})
).then(function (files) {
return Promise.all(map(files, function (file) {
return new Promise(function (resolve, reject) {
var filePath = path.join(dbPath, type, '' + file)

fs.readFile(filePath, function (error, buffer) {
var record

if (error)
return error.code === 'ENOENT' ? resolve() : reject(error)

record = msgpack.decode(buffer)

if (!(type in self.db)) self.db[type] = {}

self.db[type][record[primaryKey]] = record

return resolve()
})
})
}))
).then(async function (files) {
const allThePromises = []
let readsInFlight = 0
const iterator = files[Symbol.iterator]()

while (allThePromises.length < files.length) {
if(readsInFlight >= concurrentReads) {
// back off
await pause(0)
}
else {
allThePromises.push(
new Promise(function (resolve, reject) {
var filePath = path.join(dbPath, type, '' + iterator.next().value)
readsInFlight +=1

fs.readFile(filePath, function (error, buffer) {
var record

if (error)
return error.code === 'ENOENT' ? resolve() : reject(error)

record = msgpack.decode(buffer)

if (!(type in self.db)) self.db[type] = {}

self.db[type][record[primaryKey]] = record

return resolve()
})
}).finally(()=>{
readsInFlight -=1
}))
}
}

return Promise.all(allThePromises)
}).then(function () {
return DefaultAdapter.prototype.find.call(self, type, ids, options)
})
Expand Down Expand Up @@ -203,3 +228,15 @@ module.exports = function (Adapter) {
})
}
}

/**
* Pause for an amount of time
* @param {number} ms milliseconds to pause
* @returns {promise}
* @instance
* @example
* await pause(1 * 1000); // pause for 1 second
*/
function pause(ms) {
return new Promise(resolve => { return setTimeout(resolve, ms) })
}
Loading

0 comments on commit 51fa060

Please sign in to comment.