Skip to content

Commit

Permalink
Merge pull request #609 from akarnokd/OperationTimer3
Browse files Browse the repository at this point in the history
Operation Timer 3.0
  • Loading branch information
benjchristensen committed Dec 12, 2013
2 parents ccf921b + 2bb345b commit 034cf47
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 60 deletions.
43 changes: 37 additions & 6 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2001,12 +2001,12 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.png">
*
* @param interval interval size in time units
* @param delay the initial delay before emitting a single 0L
* @param unit time units to use for the interval size
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
*/
public static Observable<Void> timer(long interval, TimeUnit unit) {
return create(OperationTimer.timer(interval, unit));
public static Observable<Long> timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.threadPoolForComputation());
}

/**
Expand All @@ -2015,13 +2015,44 @@ public static Observable<Void> timer(long interval, TimeUnit unit) {
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timer.s.png">
*
* @param interval interval size in time units
* @param delay the initial delay before emitting a single 0L
* @param unit time units to use for the interval size
* @param scheduler the scheduler to use for scheduling the item
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
*/
public static Observable<Void> timer(long interval, TimeUnit unit, Scheduler scheduler) {
return create(OperationTimer.timer(interval, unit, scheduler));
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
return create(new OperationTimer.TimerOnce(delay, unit, scheduler));
}

/**
* Return an Observable which emits a 0L after the initialDelay and ever increasing
* numbers after each period.
*
* @param initialDelay the initial delay time to wait before emitting the first value of 0L
* @param period the time period after emitting the subsequent numbers
* @param unit the time unit for both <code>initialDelay</code> and <code>period</code>
* @return an Observable which emits a 0L after the initialDelay and ever increasing
* numbers after each period
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229435.aspx'>MSDN: Observable.Timer</a>
*/
public static Observable<Long> timer(long initialDelay, long period, TimeUnit unit) {
return timer(initialDelay, period, unit, Schedulers.threadPoolForComputation());
}

/**
* Return an Observable which emits a 0L after the initialDelay and ever increasing
* numbers after each period while running on the given scheduler.
*
* @param initialDelay the initial delay time to wait before emitting the first value of 0L
* @param period the time period after emitting the subsequent numbers
* @param unit the time unit for both <code>initialDelay</code> and <code>period</code>
* @param scheduler the scheduler where the waiting happens and value emissions run.
* @return an Observable which emits a 0L after the initialDelay and ever increasing
* numbers after each period while running on the given scheduler
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229652.aspx'>MSDN: Observable.Timer</a>
*/
public static Observable<Long> timer(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
return create(new OperationTimer.TimerPeriodically(initialDelay, period, unit, scheduler));
}

/**
Expand Down
5 changes: 3 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public static <T> Observable<T> delay(Observable<T> observable, final long delay
// observable.map(x => Observable.timer(t).map(_ => x).startItAlreadyNow()).concat()
Observable<Observable<T>> seqs = observable.map(new Func1<T, Observable<T>>() {
public Observable<T> call(final T x) {
ConnectableObservable<T> co = Observable.timer(delay, unit, scheduler).map(new Func1<Void, T>() {
public T call(Void ignored) {
ConnectableObservable<T> co = Observable.timer(delay, unit, scheduler).map(new Func1<Long, T>() {
@Override
public T call(Long ignored) {
return x;
}
}).replay();
Expand Down
116 changes: 64 additions & 52 deletions rxjava-core/src/main/java/rx/operators/OperationTimer.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,86 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.concurrent.TimeUnit;

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;

/**
* Operation Timer with several overloads.
*
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.timer.aspx'>MSDN Observable.Timer</a>
*/
public final class OperationTimer {

public static OnSubscribeFunc<Void> timer(long interval, TimeUnit unit) {
return timer(interval, unit, Schedulers.threadPoolForComputation());
}

public static OnSubscribeFunc<Void> timer(final long delay, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<Void>() {
@Override
public Subscription onSubscribe(Observer<? super Void> observer) {
return new Timer(delay, unit, scheduler, observer).start();
}
};
}

private static class Timer {
private final long period;
private final TimeUnit unit;
private OperationTimer() { throw new IllegalStateException("No instances!"); }

/**
* Emit a single 0L after the specified time elapses.
*/
public static class TimerOnce implements OnSubscribeFunc<Long> {
private final Scheduler scheduler;
private final Observer<? super Void> observer;

private Timer(long period, TimeUnit unit, Scheduler scheduler, Observer<? super Void> observer) {
this.period = period;
this.unit = unit;
private final long dueTime;
private final TimeUnit dueUnit;
public TimerOnce(long dueTime, TimeUnit unit, Scheduler scheduler) {
this.scheduler = scheduler;
this.observer = observer;
this.dueTime = dueTime;
this.dueUnit = unit;
}

public Subscription start() {
final Subscription s = scheduler.schedule(new Action0() {

@Override
public Subscription onSubscribe(final Observer<? super Long> t1) {
return scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onNext(null);
observer.onCompleted();
t1.onNext(0L);
t1.onCompleted();
}
}, period, unit);

return Subscriptions.create(new Action0() {

}, dueTime, dueUnit);
}
}
/**
* Emit 0L after the initial period and ever increasing number after each period.
*/
public static class TimerPeriodically implements OnSubscribeFunc<Long> {
private final Scheduler scheduler;
private final long initialDelay;
private final long period;
private final TimeUnit unit;
public TimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.scheduler = scheduler;
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
}

@Override
public Subscription onSubscribe(final Observer<? super Long> t1) {
return scheduler.schedulePeriodically(new Action0() {
long count;
@Override
public void call() {
s.unsubscribe();
t1.onNext(count++);
}
});
},
initialDelay, period, unit
);
}
}

}
72 changes: 72 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationTimerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import static org.mockito.Mockito.*;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.TestScheduler;

public class OperationTimerTest {
@Mock
Observer<Object> observer;
TestScheduler s;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
s = new TestScheduler();
}
@Test
public void testTimerOnce() {
Observable.timer(100, TimeUnit.MILLISECONDS, s).subscribe(observer);
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);

verify(observer, times(1)).onNext(0L);
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testTimerPeriodically() {
Subscription c = Observable.timer(100, 100, TimeUnit.MILLISECONDS, s).subscribe(observer);
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(0L);

s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext(1L);

s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext(2L);

s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext(3L);

c.unsubscribe();
s.advanceTimeBy(100, TimeUnit.MILLISECONDS);
inOrder.verify(observer, never()).onNext(any());

verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
}

0 comments on commit 034cf47

Please sign in to comment.