node.js - Choose proper async method for batch processing -
i need perform cyclic call external api delay, prevent 'user rate limit exceeded' restriction.
google maps geocoding api sensitive 'req/sec', allowing 10 req/sec. should make geocoding hundreds of contacts, , such delay required. so, need have 10 async geocoding functions post-delay in 1 sec each. so, collect contacts in array, , loop through array in async manner.
generally, need have n simultaneous threads, delay in d msecs in end of each thread. entire loop iterates on array of user entities. each thread process single entity, usual.
i suppose have code like:
const n = 10; # threads count const d = 1000; # delay after each execution var processuser = function(user, callback){ somebusinesslogicproc(user, function(err) { settimeout(function() { return callback(err); }, d); }); } var async = require('async') ; var people = new array(900); async.batchmethod(people, processuser, n, finalcallback);
in pseudocode batchmethod
method asking for.
putting delay on results not want. instead, want keep track of you've sent , when sent fall under requests per second boundary, can send request.
here's general concept function control rate limiting fixed number of requests per second. uses promises , requires supply request function returns promise (if aren't using promises now, need wrap request function in promise).
function ratelimitmap(array, requestspersec, maxinflight, fn) { return new promise(function(resolve, reject) { var index = 0; var inflightcntr = 0; var donecntr = 0; var launchtimes = []; var results = new array(array.length); // calculate num requests in last second function calcrequestsinlastsecond() { var = date.now(); // backwards in launchtimes see how many launched within last second var cnt = 0; (var = launchtimes.length - 1; >= 0; i--) { if (now - launchtimes[i] < 1000) { ++cnt; } else { break; } } return cnt; } function runmore() { while (index < array.length && inflightcntr < maxinflight && calcrequestsinlastsecond() < requestspersec) { (function(i) { ++inflightcntr; launchtimes.push(date.now()); fn(array[i]).then(function(val) { results[i] = val; --inflightcntr; ++donecntr; runmore(); }, reject); })(index); ++index; } // see if we're done if (donecntr === array.length) { resolve(results); } else if (launchtimes.length > requestspersec) { // calc how long have wait before sending more var delta = 1000 - (date.now() - launchtimes[launchtimes.length - requestspersec]); if (delta > 0) { settimeout(runmore, delta); } } } runmore(); }); }
example usage:
ratelimitmap(inputarraytoprocess, 9, 20, myrequestfunc).then(function(results) { // process array of results here }, function(err) { // process error here });
the general idea behind code this:
- you pass in array iterate through
- it returns promise who's resolved value array of results (in order)
- you pass max number of requestspersec ever hit
- you pass max number of requests in flight @ same time
- you pass function passed element array being iterated , must return promise
- it keeps array of timestamps when request last sent.
- to see if request can sent, looks backwards in array , counts how many requests sent in last second.
- if number lower threshold, sends one.
- if number meets threshold, calciulates how long have wait send 1 , set timer amount of time.
- upon completion of each request, checks see if can send more
- if request rejects promise, returned promise rejects immediately. if don't want stop upon first error, modify passed in function not reject, resolve value can identify failed request later when processing results.
here's working simulation: https://jsfiddle.net/jfriend00/3gr0tq7k/
note: if maxinflight
value pass in higher requestspersec
value, function send requestspersec requests , 1 second later, send requestspersec requests since that's quickest way stay under requestspersec
boundary. if maxinflight
value same or lower requestspersec
send requestspersec
, each request finishes, see if can send one.
Comments
Post a Comment