Skip to content

Commit

Permalink
After thinking about ReactiveX#2575 I added a note to all the places …
Browse files Browse the repository at this point in the history
…where a Subscription is lost from a Scheduler.
  • Loading branch information
abersnaze committed Jan 31, 2015
1 parent fe3cc75 commit 4ee0983
Show file tree
Hide file tree
Showing 16 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void call(final Subscriber<? super T> s) {
final Worker worker = scheduler.createWorker();
s.add(worker);

// FIXME should subscription returned be added to the s composite
worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void setProducer(Producer producer) {
}));

// subscribe to the restarts observable to know when to schedule the next redo.
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {
@Override
public void call() {
Expand All @@ -304,6 +305,7 @@ public void onError(Throwable e) {
public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
if (consumerCapacity.get() > 0) {
// FIXME should subscription returned be added to the child composite
worker.schedule(subscribeToSource);
} else {
resumeBoundary.compareAndSet(false, true);
Expand All @@ -329,6 +331,7 @@ public void request(final long n) {
producer.request(n);
} else
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
// FIXME should subscription returned be added to the child composite
worker.schedule(subscribeToSource);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public OnSubscribeTimerOnce(long time, TimeUnit unit, Scheduler scheduler) {
public void call(final Subscriber<? super Long> child) {
Worker worker = scheduler.createWorker();
child.add(worker);
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit uni
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
// FIXME should subscription returned be added to the child composite
worker.schedulePeriodically(new Action0() {
long counter;
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public void onCompleted() {
unsubscribe();
}
void scheduleChunk() {
// FIXME should subscription returned be added to the child composite
inner.schedulePeriodically(new Action0() {
@Override
public void call() {
Expand All @@ -181,6 +182,7 @@ void startNewChunk() {
}
chunks.add(chunk);
}
// FIXME should subscription returned be added to the child composite
inner.schedule(new Action0() {
@Override
public void call() {
Expand Down Expand Up @@ -280,6 +282,7 @@ public void onCompleted() {
unsubscribe();
}
void scheduleExact() {
// FIXME should subscription returned be added to the child composite
inner.schedulePeriodically(new Action0() {
@Override
public void call() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/rx/internal/operators/OperatorDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

@Override
public void onCompleted() {
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {

@Override
Expand All @@ -69,6 +70,7 @@ public void onError(Throwable e) {

@Override
public void onNext(final T t) {
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void onError(final Throwable e) {

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
// FIXME should subscription returned be added to the child composite
recursiveScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -229,6 +230,7 @@ public boolean isUnsubscribed() {
@Override
public void unsubscribe() {
if (ONCE_UPDATER.getAndSet(this, 1) == 0) {
// FIXME should subscription returned be added to something
worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void onError(Throwable e) {

@Override
public void onNext(final Observable<T> o) {
// FIXME should subscription returned be added to the child composite
inner.schedule(new Action0() {

@Override
Expand All @@ -94,6 +95,7 @@ public void onCompleted() {
public void onError(Throwable e) {
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
// retry again
// FIXME should subscription returned be added to the child composite
inner.schedule(_self);
} else {
// give up and pass the failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {

SamplerSubscriber<T> sampler = new SamplerSubscriber<T>(s);
child.add(sampler);
// FIXME should subscription returned be added to the child composite
worker.schedulePeriodically(sampler, time, time, unit);

return sampler;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/operators/OperatorSkipTimed.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
final AtomicBoolean gate = new AtomicBoolean();
// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {
@Override
public void call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void request(final long n) {
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
// FIXME should subscription returned be added to the subscriber composite
inner.schedule(new Action0() {

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/operators/OperatorTakeTimed.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
child.add(worker);

TakeSubscriber<T> ts = new TakeSubscriber<T>(new SerializedSubscriber<T>(child));
// FIXME should subscription returned be added to the child composite
worker.schedule(ts, time, unit);
return ts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void onNext(T t) {
@Override
public void call() {
final Scheduler.Worker inner = scheduler.createWorker();
// FIXME should subscription returned be added to the subscriber composite
inner.schedule(new Action0() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public void onCompleted() {
}

void scheduleExact() {
// FIXME should subscription returned be added to the child composite
worker.schedulePeriodically(new Action0() {

@Override
Expand Down Expand Up @@ -420,6 +421,7 @@ public void onCompleted() {
child.onCompleted();
}
void scheduleChunk() {
// FIXME should subscription returned be added to the child composite
worker.schedulePeriodically(new Action0() {

@Override
Expand All @@ -444,6 +446,7 @@ void startNewChunk() {
return;
}

// FIXME should subscription returned be added to the child composite
worker.schedule(new Action0() {

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/internal/util/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private ObjectPool(final int min, final int max, final long validationInterval)
initialize(min);

schedulerWorker = Schedulers.computation().createWorker();
// FIXME should subscription returned be added to some composite
schedulerWorker.schedulePeriodically(new Action0() {

@Override
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/rx/subjects/TestSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private void _onCompleted() {
* the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted}
*/
public void onCompleted(long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
innerScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -125,6 +126,7 @@ private void _onError(final Throwable e) {
* the number of milliseconds in the future relative to "now()" at which to call {@code onError}
*/
public void onError(final Throwable e, long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
innerScheduler.schedule(new Action0() {

@Override
Expand Down Expand Up @@ -158,6 +160,7 @@ private void _onNext(T v) {
* the number of milliseconds in the future relative to "now()" at which to call {@code onNext}
*/
public void onNext(final T v, long timeInMilliseconds) {
// FIXME should subscription returned be added to some composite
innerScheduler.schedule(new Action0() {

@Override
Expand Down

0 comments on commit 4ee0983

Please sign in to comment.