Skip to content

Commit

Permalink
Merge pull request #1042 from MarioAriasC/master
Browse files Browse the repository at this point in the history
Kotlin M7 and full compatibility with 0.17.0
  • Loading branch information
benjchristensen committed Apr 16, 2014
2 parents cc17cbd + a26a6d0 commit 187eddb
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 57 deletions.
32 changes: 22 additions & 10 deletions language-adaptors/rxjava-kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,37 @@
Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor

```kotlin
Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Observable.create(OnSubscribeFunc<String> { observer ->
observer!!.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()
})!!.subscribe { result ->
a!!.received(result)
}
```

This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage
In RxJava [0.17.0](https://github.com/Netflix/RxJava/releases/tag/0.17.0) version a new Subscriber type was included

```kotlin
import rx.lang.kotlin.*
Observable.create(object:OnSubscribe<String> {
override fun call(subscriber: Subscriber<in String>?) {
subscriber!!.onNext("Hello")
subscriber.onCompleted()
}
})!!.subscribe { result ->
a!!.received(result)
}
```

{(observer: Observer<in String>) ->
observer.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()!!
}.asObservableFunc().subscribe { result ->
(Due to a [bug in Kotlin's compiler](http://youtrack.jetbrains.com/issue/KT-4753) you can't use SAM with OnSubscribe)

This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage

```kotlin
{(subscriber: Subscriber<in String>) ->
subscriber.onNext("Hello")
subscriber.onCompleted()
}.asObservable().subscribe { result ->
a!!.received(result)
}
```
Expand Down
4 changes: 2 additions & 2 deletions language-adaptors/rxjava-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ buildscript {
}

dependencies {
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.1673'
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.7.270'
}
}

Expand All @@ -13,7 +13,7 @@ apply plugin: 'osgi'

dependencies {
compile project(':rxjava-core')
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.1673'
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.7.270'
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ import rx.Observable
import rx.Observable.OnSubscribe
import rx.Subscription
import rx.Observable.OnSubscribeFunc
import rx.Subscriber


public fun<T> Function1<Observer<in T>, Unit>.asObservable(): Observable<T> {
return Observable.create(OnSubscribe<T>{ t1 ->
this(t1!!)
public fun<T> Function1<Subscriber<in T>, Unit>.asObservable(): Observable<T> {
return Observable.create(object:OnSubscribe<T> {
override fun call(t1: Subscriber<in T>?) {
this@asObservable(t1!!)
}

})!!
}

[deprecated("Use Function1<Subscriber<in T>, Unit>.asObservable()")]
public fun<T> Function1<Observer<in T>, Subscription>.asObservableFunc(): Observable<T> {
return Observable.create(OnSubscribeFunc<T>{ op ->
this(op!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package rx.lang.kotlin

import org.mockito.Mock
import rx.Observable
import org.junit.Before
import org.mockito.MockitoAnnotations
import org.junit.Test
import rx.subscriptions.Subscriptions
import org.mockito.Mockito.*
Expand All @@ -31,21 +28,23 @@ import rx.Subscription
import kotlin.concurrent.thread
import rx.Observable.OnSubscribeFunc
import rx.lang.kotlin.BasicKotlinTests.AsyncObservable
import rx.Observable.OnSubscribe
import rx.Subscriber

/**
* This class use plain Kotlin without extensions from the language adaptor
*/
public class BasicKotlinTests:KotlinTests() {

public class BasicKotlinTests : KotlinTests() {


[Test]
public fun testCreate() {

Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Subscriptions.empty()
Observable.create(object:OnSubscribe<String> {
override fun call(subscriber: Subscriber<in String>?) {
subscriber!!.onNext("Hello")
subscriber.onCompleted()
}
})!!.subscribe { result ->
a!!.received(result)
}
Expand Down Expand Up @@ -310,7 +309,7 @@ public class BasicKotlinTests:KotlinTests() {



public class TestFactory(){
public class TestFactory() {
var counter = 1

val numbers: Observable<Int>
Expand All @@ -330,24 +329,23 @@ public class BasicKotlinTests:KotlinTests() {

}

class AsyncObservable : OnSubscribeFunc<Int>{
override fun onSubscribe(op: Observer<in Int>?): Subscription? {
class AsyncObservable : OnSubscribe<Int> {
override fun call(op: Subscriber<in Int>?) {
thread {
Thread.sleep(50)
op!!.onNext(1)
op.onNext(2)
op.onNext(3)
op.onCompleted()
}
return Subscriptions.empty()

}
}

class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
override fun onSubscribe(op: Observer<in String>?): Subscription? {
class TestOnSubscribe(val count: Int) : OnSubscribe<String> {
override fun call(op: Subscriber<in String>?) {
op!!.onNext("hello_$count")
op.onCompleted()
return Subscriptions.empty()!!
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@

package rx.lang.kotlin

import org.mockito.Mock
import rx.Observable
import org.junit.Before
import org.mockito.MockitoAnnotations
import org.junit.Test
import rx.subscriptions.Subscriptions
import org.mockito.Mockito.*
import org.mockito.Matchers.*
import rx.Observer
import org.junit.Assert.*
import rx.Notification
import rx.Subscription
import kotlin.concurrent.thread
import rx.Subscriber

/**
* This class contains tests using the extension functions provided by the language adaptor.
Expand All @@ -39,11 +34,10 @@ public class ExtensionTests : KotlinTests() {
[Test]
public fun testCreate() {

{(observer: Observer<in String>) ->
observer.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()!!
}.asObservableFunc().subscribe { result ->
{(subscriber: Subscriber<in String>) ->
subscriber.onNext("Hello")
subscriber.onCompleted()
}.asObservable().subscribe { result ->
a!!.received(result)
}

Expand Down Expand Up @@ -216,15 +210,15 @@ public class ExtensionTests : KotlinTests() {

[Test]
public fun testForEach() {
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach(received())
asyncObservable.asObservable().toBlockingObservable()!!.forEach(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(1))!!.received(3)
}

[Test(expected = javaClass<RuntimeException>())]
public fun testForEachWithError() {
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
asyncObservable.asObservable().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
fail("we expect an exception to be thrown")
}

Expand Down Expand Up @@ -259,21 +253,19 @@ public class ExtensionTests : KotlinTests() {
assertEquals(listOf(3, 6, 9), values[2])
}

val funOnSubscribe: (Int, Observer<in String>) -> Subscription = { counter, observer ->
observer.onNext("hello_$counter")
observer.onCompleted()
Subscriptions.empty()!!
val funOnSubscribe: (Int, Subscriber<in String>) -> Unit = { counter, subscriber ->
subscriber.onNext("hello_$counter")
subscriber.onCompleted()
}

val asyncObservable: (Observer<in Int>) -> Subscription = { observer ->
val asyncObservable: (Subscriber<in Int>) -> Unit = { subscriber ->
thread {
Thread.sleep(50)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
subscriber.onNext(1)
subscriber.onNext(2)
subscriber.onNext(3)
subscriber.onCompleted()
}
Subscriptions.empty()!!
}

/**
Expand All @@ -283,22 +275,22 @@ public class ExtensionTests : KotlinTests() {
return {(p2: P2) -> this(p1, p2) }
}

inner public class TestFactory(){
inner public class TestFactory() {
var counter = 1

val numbers: Observable<Int>
get(){
return listOf(1, 3, 2, 5, 4).asObservable()
}

val onSubscribe: (Observer<in String>) -> Subscription
val onSubscribe: (Subscriber<in String>) -> Unit
get(){
return funOnSubscribe.partially1(counter++)
}

val observable: Observable<String>
get(){
return onSubscribe.asObservableFunc()
return onSubscribe.asObservable()
}

}
Expand Down

0 comments on commit 187eddb

Please sign in to comment.