Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ExecutorScheduler - New ComputationScheduler #1048

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Observable<Integer> call() throws Exception {
}
};

Observable<Integer> result = Async.deferFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.deferFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void testSimple() {

try {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testSimpleThrowing() {

try {
Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -128,7 +128,7 @@ public void call(Integer t1) {
@Test
public void testSimpleScheduled() {
Observable<Integer> source = Observable.from(1, 2, 3)
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down Expand Up @@ -158,7 +158,7 @@ public void call(Integer t1) {
public void testSimpleScheduledThrowing() {

Observable<Integer> source = Observable.<Integer>error(new CustomException())
.subscribeOn(Schedulers.threadPoolForComputation());
.subscribeOn(Schedulers.computation());

final AtomicInteger sum = new AtomicInteger();
Action1<Integer> add = new Action1<Integer>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Integer call() throws Exception {
}
};

Observable<Integer> result = Async.startFuture(func, Schedulers.threadPoolForComputation());
Observable<Integer> result = Async.startFuture(func, Schedulers.computation());

final Observer<Integer> observer = mock(Observer.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void testWhileDoZeroTimes() {

@Test
public void testWhileDoManyTimes() {
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread());
Observable<Integer> source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.trampoline());

List<Integer> expected = new ArrayList<Integer>(numRecursion * 3);
for (int i = 0; i < numRecursion; i++) {
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit) {
return buffer(source, timespan, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -259,7 +259,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, TimeUnit unit, int count) {
return buffer(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return buffer(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -325,7 +325,7 @@ public Subscription onSubscribe(final Observer<? super List<T>> observer) {
* the {@link Func1} object representing the specified buffer operation
*/
public static <T> OnSubscribeFunc<List<T>> buffer(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
return buffer(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return buffer(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
return window(source, timespan, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
return window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
return window(source, timespan, unit, count, Schedulers.computation());
}

/**
Expand Down Expand Up @@ -318,7 +318,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
* the {@link rx.functions.Func1} object representing the specified window operation
*/
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
return window(source, timespan, timeshift, unit, Schedulers.computation());
}

/**
Expand Down
109 changes: 109 additions & 0 deletions rxjava-core/src/main/java/rx/schedulers/ComputationScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package rx.schedulers;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.schedulers.NewThreadScheduler.OnActionComplete;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/* package */class ComputationScheduler extends Scheduler {

private static class ComputationSchedulerPool {
final int cores = Runtime.getRuntime().availableProcessors();
final ThreadFactory factory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet());
t.setDaemon(true);
return t;
}
};

final EventLoopScheduler[] eventLoops;

ComputationSchedulerPool() {
// initialize event loops
eventLoops = new EventLoopScheduler[cores];
for (int i = 0; i < cores; i++) {
eventLoops[i] = new EventLoopScheduler(factory);
}
}

private static ComputationSchedulerPool INSTANCE = new ComputationSchedulerPool();

long n = 0;

public EventLoopScheduler getEventLoop() {
// round-robin selection (improvements to come)
return eventLoops[(int) (n++ % cores)];
}

}

@Override
public Inner createInner() {
return new EventLoop();
}

private static class EventLoop extends Scheduler.Inner {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final EventLoopScheduler pooledEventLoop;
private final OnActionComplete onComplete;

EventLoop() {
pooledEventLoop = ComputationSchedulerPool.INSTANCE.getEventLoop();
onComplete = new OnActionComplete() {

@Override
public void complete(Subscription s) {
innerSubscription.remove(s);
}

};
}

@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}

@Override
public Subscription schedule(Action0 action) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}
return pooledEventLoop.schedule(action, onComplete);
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}

return pooledEventLoop.schedule(action, delayTime, unit, onComplete);
}

}

private static class EventLoopScheduler extends NewThreadScheduler.EventLoopScheduler {
EventLoopScheduler(ThreadFactory threadFactory) {
super(threadFactory);
}
}

}
Loading