Skip to content

Commit

Permalink
handle ECONNRESET from Twitter API; refs #202
Browse files Browse the repository at this point in the history
  • Loading branch information
edsu committed Mar 12, 2022
1 parent d4ea470 commit db5f3f9
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 152 deletions.
211 changes: 123 additions & 88 deletions dist/server/search-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,121 +102,85 @@ var SearchLoader = /*#__PURE__*/function () {

_this.twtr.search(opts, /*#__PURE__*/function () {
var _ref = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee(err, tweets, nextToken) {
var activeStream, query, _iterator, _step, j;

return _regenerator["default"].wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
if (!err) {
_context.next = 5;
if (!(err && err.match(/ECONNRESET/))) {
_context.next = 6;
break;
}

_logger["default"].error(err);
_logger["default"].error("caught ECONNRESET: ".concat(err));

_context.next = 4;
return (0, _utils.timer)(3000);
_logger["default"].info("requeueing ".concat(job.id));

case 4:
return _context.abrupt("return");
_this.db.redis.lpushAsync(_redis.startSearchJobKey, job.id);

_context.next = 26;
break;

case 5:
if (!(tweets == 0)) {
_context.next = 7;
case 6:
if (!err) {
_context.next = 11;
break;
}

return _context.abrupt("return");
_logger["default"].error("unexpected error for ".concat(job.id, " stopping"));

_this.stopJob(job);

_context.next = 26;
break;

case 7:
case 11:
if (!_this.active) {
_context.next = 32;
_context.next = 25;
break;
}

_context.next = 10;
if (!(tweets.length > 0)) {
_context.next = 15;
break;
}

_context.next = 15;
return _this.db.loadTweets(job.query.search, tweets);

case 10:
case 15:
if (!nextToken) {
_context.next = 17;
_context.next = 22;
break;
}

_logger["default"].info("queueing next search job ".concat(job.id));

_context.next = 14;
_context.next = 19;
return _this.db.updateSearchJob({
id: job.id,
nextToken: nextToken
});

case 14:
case 19:
_this.db.redis.lpushAsync(_redis.startSearchJobKey, job.id);

_context.next = 30;
_context.next = 23;
break;

case 17:
_logger["default"].info("no more search results for search job ".concat(job.id));

_context.next = 20;
return _this.db.updateSearchJob({
id: job.id,
ended: new Date()
});

case 20:
// determine if there's an active stream job for this search
activeStream = false;
_context.next = 23;
return _this.db.getQuery(job.query.id);
case 22:
_this.stopJob(job);

case 23:
query = _context.sent;
_iterator = _createForOfIteratorHelper(query.searchJobs);

try {
for (_iterator.s(); !(_step = _iterator.n()).done;) {
j = _step.value;

if (j.type == 'stream' && !j.ended) {
activeStream = true;
}
} // set search to inactive if there's not an active stream

} catch (err) {
_iterator.e(err);
} finally {
_iterator.f();
}

if (activeStream) {
_context.next = 30;
break;
}

_context.next = 29;
return _this.db.updateSearch({
id: query.search.id,
active: false
});

case 29:
_logger["default"].info("flagging search ".concat(query.search.id, " as inactive"));

case 30:
_context.next = 33;
_context.next = 26;
break;

case 32:
case 25:
_logger["default"].warn('search loader callback received tweets when no longer active');

case 33:
case 26:
return _context.abrupt("return", false);

case 34:
case 27:
case "end":
return _context.stop();
}
Expand Down Expand Up @@ -321,12 +285,83 @@ var SearchLoader = /*#__PURE__*/function () {
return fetchSearchJob;
}()
}, {
key: "stop",
key: "stopJob",
value: function () {
var _stop = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee4() {
var _stopJob = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee4(job) {
var activeStream, query, _iterator, _step, j;

return _regenerator["default"].wrap(function _callee4$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
_logger["default"].info("no more search results for search job ".concat(job.id));

_context5.next = 3;
return this.db.updateSearchJob({
id: job.id,
ended: new Date()
});

case 3:
// determine if there's an active stream job for this search
activeStream = false;
_context5.next = 6;
return this.db.getQuery(job.query.id);

case 6:
query = _context5.sent;
_iterator = _createForOfIteratorHelper(query.searchJobs);

try {
for (_iterator.s(); !(_step = _iterator.n()).done;) {
j = _step.value;

if (j.type == 'stream' && !j.ended) {
activeStream = true;
}
} // set search to inactive if there's not an active stream

} catch (err) {
_iterator.e(err);
} finally {
_iterator.f();
}

if (activeStream) {
_context5.next = 13;
break;
}

_context5.next = 12;
return this.db.updateSearch({
id: query.search.id,
active: false
});

case 12:
_logger["default"].info("flagging search ".concat(query.search.id, " as inactive"));

case 13:
case "end":
return _context5.stop();
}
}
}, _callee4, this);
}));

function stopJob(_x4) {
return _stopJob.apply(this, arguments);
}

return stopJob;
}()
}, {
key: "stop",
value: function () {
var _stop = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee5() {
return _regenerator["default"].wrap(function _callee5$(_context6) {
while (1) {
switch (_context6.prev = _context6.next) {
case 0:
this.active = false;
this.db.close();
Expand All @@ -336,10 +371,10 @@ var SearchLoader = /*#__PURE__*/function () {

case 4:
case "end":
return _context5.stop();
return _context6.stop();
}
}
}, _callee4, this);
}, _callee5, this);
}));

function stop() {
Expand All @@ -351,49 +386,49 @@ var SearchLoader = /*#__PURE__*/function () {
}, {
key: "setupTwitterClient",
value: function () {
var _setupTwitterClient = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee5() {
return _regenerator["default"].wrap(function _callee5$(_context6) {
var _setupTwitterClient = (0, _asyncToGenerator2["default"])( /*#__PURE__*/_regenerator["default"].mark(function _callee6() {
return _regenerator["default"].wrap(function _callee6$(_context7) {
while (1) {
switch (_context6.prev = _context6.next) {
switch (_context7.prev = _context7.next) {
case 0:
if (!(this.twtr === null)) {
_context6.next = 13;
_context7.next = 13;
break;
}

_logger["default"].info('attempting to get twitter client');

_context6.next = 4;
_context7.next = 4;
return this.db.getTwitterClientForApp();

case 4:
this.twtr = _context6.sent;
this.twtr = _context7.sent;

if (this.twtr) {
_context6.next = 10;
_context7.next = 10;
break;
}

_context6.next = 8;
_context7.next = 8;
return (0, _utils.timer)(20000);

case 8:
_context6.next = 11;
_context7.next = 11;
break;

case 10:
_logger["default"].info('got twitter client!');

case 11:
_context6.next = 0;
_context7.next = 0;
break;

case 13:
case "end":
return _context6.stop();
return _context7.stop();
}
}
}, _callee5, this);
}, _callee6, this);
}));

function setupTwitterClient() {
Expand Down
Loading

0 comments on commit db5f3f9

Please sign in to comment.