78 lines
2.4 KiB
JavaScript
78 lines
2.4 KiB
JavaScript
|
import { Subscriber } from '../Subscriber';
|
||
|
export function bufferCount(bufferSize, startBufferEvery = null) {
|
||
|
return function bufferCountOperatorFunction(source) {
|
||
|
return source.lift(new BufferCountOperator(bufferSize, startBufferEvery));
|
||
|
};
|
||
|
}
|
||
|
class BufferCountOperator {
|
||
|
constructor(bufferSize, startBufferEvery) {
|
||
|
this.bufferSize = bufferSize;
|
||
|
this.startBufferEvery = startBufferEvery;
|
||
|
if (!startBufferEvery || bufferSize === startBufferEvery) {
|
||
|
this.subscriberClass = BufferCountSubscriber;
|
||
|
}
|
||
|
else {
|
||
|
this.subscriberClass = BufferSkipCountSubscriber;
|
||
|
}
|
||
|
}
|
||
|
call(subscriber, source) {
|
||
|
return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
|
||
|
}
|
||
|
}
|
||
|
class BufferCountSubscriber extends Subscriber {
|
||
|
constructor(destination, bufferSize) {
|
||
|
super(destination);
|
||
|
this.bufferSize = bufferSize;
|
||
|
this.buffer = [];
|
||
|
}
|
||
|
_next(value) {
|
||
|
const buffer = this.buffer;
|
||
|
buffer.push(value);
|
||
|
if (buffer.length == this.bufferSize) {
|
||
|
this.destination.next(buffer);
|
||
|
this.buffer = [];
|
||
|
}
|
||
|
}
|
||
|
_complete() {
|
||
|
const buffer = this.buffer;
|
||
|
if (buffer.length > 0) {
|
||
|
this.destination.next(buffer);
|
||
|
}
|
||
|
super._complete();
|
||
|
}
|
||
|
}
|
||
|
class BufferSkipCountSubscriber extends Subscriber {
|
||
|
constructor(destination, bufferSize, startBufferEvery) {
|
||
|
super(destination);
|
||
|
this.bufferSize = bufferSize;
|
||
|
this.startBufferEvery = startBufferEvery;
|
||
|
this.buffers = [];
|
||
|
this.count = 0;
|
||
|
}
|
||
|
_next(value) {
|
||
|
const { bufferSize, startBufferEvery, buffers, count } = this;
|
||
|
this.count++;
|
||
|
if (count % startBufferEvery === 0) {
|
||
|
buffers.push([]);
|
||
|
}
|
||
|
for (let i = buffers.length; i--;) {
|
||
|
const buffer = buffers[i];
|
||
|
buffer.push(value);
|
||
|
if (buffer.length === bufferSize) {
|
||
|
buffers.splice(i, 1);
|
||
|
this.destination.next(buffer);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
_complete() {
|
||
|
const { buffers, destination } = this;
|
||
|
while (buffers.length > 0) {
|
||
|
let buffer = buffers.shift();
|
||
|
if (buffer.length > 0) {
|
||
|
destination.next(buffer);
|
||
|
}
|
||
|
}
|
||
|
super._complete();
|
||
|
}
|
||
|
}
|
||
|
//# sourceMappingURL=bufferCount.js.map
|