This repository was archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathmergeconcat.js
84 lines (71 loc) · 2.89 KB
/
mergeconcat.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
var MergeObservable = (function (__super__) {
inherits(MergeObservable, __super__);
function MergeObservable(source, maxConcurrent) {
this.source = source;
this.maxConcurrent = maxConcurrent;
__super__.call(this);
}
MergeObservable.prototype.subscribeCore = function(observer) {
var g = new CompositeDisposable();
g.add(this.source.subscribe(new MergeObserver(observer, this.maxConcurrent, g)));
return g;
};
return MergeObservable;
}(ObservableBase));
var MergeObserver = (function (__super__) {
function MergeObserver(o, max, g) {
this.o = o;
this.max = max;
this.g = g;
this.done = false;
this.q = [];
this.activeCount = 0;
__super__.call(this);
}
inherits(MergeObserver, __super__);
MergeObserver.prototype.handleSubscribe = function (xs) {
var sad = new SingleAssignmentDisposable();
this.g.add(sad);
isPromise(xs) && (xs = observableFromPromise(xs));
sad.setDisposable(xs.subscribe(new InnerObserver(this, sad)));
};
MergeObserver.prototype.next = function (innerSource) {
if(this.activeCount < this.max) {
this.activeCount++;
this.handleSubscribe(innerSource);
} else {
this.q.push(innerSource);
}
};
MergeObserver.prototype.error = function (e) { this.o.onError(e); };
MergeObserver.prototype.completed = function () { this.done = true; this.activeCount === 0 && this.o.onCompleted(); };
function InnerObserver(parent, sad) {
this.parent = parent;
this.sad = sad;
__super__.call(this);
}
inherits(InnerObserver, __super__);
InnerObserver.prototype.next = function (x) { this.parent.o.onNext(x); };
InnerObserver.prototype.error = function (e) { this.parent.o.onError(e); };
InnerObserver.prototype.completed = function () {
this.parent.g.remove(this.sad);
if (this.parent.q.length > 0) {
this.parent.handleSubscribe(this.parent.q.shift());
} else {
this.parent.activeCount--;
this.parent.done && this.parent.activeCount === 0 && this.parent.o.onCompleted();
}
};
return MergeObserver;
}(AbstractObserver));
/**
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
* Or merges two observable sequences into a single observable sequence.
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
*/
observableProto.merge = function (maxConcurrentOrOther) {
return typeof maxConcurrentOrOther !== 'number' ?
observableMerge(this, maxConcurrentOrOther) :
new MergeObservable(this, maxConcurrentOrOther);
};