73 lines
1.5 KiB
JavaScript
73 lines
1.5 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
var RingBuffer = require('./ring_buffer');
|
||
|
|
||
|
var Functor = function(session, method) {
|
||
|
this._session = session;
|
||
|
this._method = method;
|
||
|
this._queue = new RingBuffer(Functor.QUEUE_SIZE);
|
||
|
this._stopped = false;
|
||
|
this.pending = 0;
|
||
|
};
|
||
|
|
||
|
Functor.QUEUE_SIZE = 8;
|
||
|
|
||
|
Functor.prototype.call = function(error, message, callback, context) {
|
||
|
if (this._stopped) return;
|
||
|
|
||
|
var record = {error: error, message: message, callback: callback, context: context, done: false},
|
||
|
called = false,
|
||
|
self = this;
|
||
|
|
||
|
this._queue.push(record);
|
||
|
|
||
|
if (record.error) {
|
||
|
record.done = true;
|
||
|
this._stop();
|
||
|
return this._flushQueue();
|
||
|
}
|
||
|
|
||
|
var handler = function(err, msg) {
|
||
|
if (!(called ^ (called = true))) return;
|
||
|
|
||
|
if (err) {
|
||
|
self._stop();
|
||
|
record.error = err;
|
||
|
record.message = null;
|
||
|
} else {
|
||
|
record.message = msg;
|
||
|
}
|
||
|
|
||
|
record.done = true;
|
||
|
self._flushQueue();
|
||
|
};
|
||
|
|
||
|
try {
|
||
|
this._session[this._method](message, handler);
|
||
|
} catch (err) {
|
||
|
handler(err);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
Functor.prototype._stop = function() {
|
||
|
this.pending = this._queue.length;
|
||
|
this._stopped = true;
|
||
|
};
|
||
|
|
||
|
Functor.prototype._flushQueue = function() {
|
||
|
var queue = this._queue, record;
|
||
|
|
||
|
while (queue.length > 0 && queue.peek().done) {
|
||
|
record = queue.shift();
|
||
|
if (record.error) {
|
||
|
this.pending = 0;
|
||
|
queue.clear();
|
||
|
} else {
|
||
|
this.pending -= 1;
|
||
|
}
|
||
|
record.callback.call(record.context, record.error, record.message);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
module.exports = Functor;
|