RxKotlin
Kotlin Extensions for RxJava
RxKotlin is a lightweight library that adds convenient extension functions to RxJava. You can use RxJava with Kotlin out-of-the-box, but Kotlin has language features (such as extension functions) that can streamline usage of RxJava even more. RxKotlin aims to conservatively collect these conveniences in one centralized library, and standardize conventions for using RxJava with Kotlin.
import io.reactivex.rxkotlin.subscribeBy import io.reactivex.rxkotlin.toObservable fun main(args: Array<String>) {
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
list.toObservable() // extension function for Iterables
.filter {
it.length >= 5
}
.subscribeBy( // named arguments for lambda Subscribers
onNext = {
println(it)
}
,
onError = {
it.printStackTrace()
}
,
onComplete = {
println("Done!")
}
)
}
Resources
Learning RxJava Packt Book
Chapter 12 of Learning RxJava covers RxKotlin and Kotlin idioms with RxJava.
Kotlin Slack Channel
Join us on the #rx channel in Kotlin Slack!
https://kotlinlang.slack.com/messages/rx
Support for RxJava 1.x and RxJava 2.x
Use RxKotlin 1.x versions to target RxJava 1.x.
Use RxKotlin 2.x versions to target RxJava 2.x.
The maintainers do not update the RxJava dependency version for every RxJava release, so you should explicitly add the desired RxJava dependency version to your pom.xml
or build.gradle
.
Build
Binaries
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.
RxKotlin 1.x
Example for Maven:
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxkotlin</artifactId>
<version>1.x.y</version> </dependency>
and for Gradle:
compile 'io.reactivex:rxkotlin:x.y.z'
RxKotlin 2.x
Example for Maven:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.x.y</version> </dependency>
and for Gradle:
compile 'io.reactivex.rxjava2:rxkotlin:x.y.z'
Building with JitPack
You can also use Gradle or Maven with JitPack to build directly off a snapshot, branch, or commit of this repository.
For example, to build off the 1.x branch, use this setup for Gradle:
repositories {
maven {
url 'https://jitpack.io'
}
}
dependencies {
compile 'com.github.ReactiveX:RxKotlin:1.x-SNAPSHOT'
}
Use this setup for Maven:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository> </repositories>
<dependency>
<groupId>com.github.ReactiveX</groupId>
<artifactId>RxKotlin</artifactId>
<version>1.x-SNAPSHOT</version> </dependency>
Learn more about building this project with JitPack here.
Extensions
Target Type | Method | Return Type | Description |
---|---|---|---|
BooleanArray | toObservable() | Observable | Turns a Boolean array into an Observable |
ByteArray | toObservable() | Observable | Turns a Byte array into an Observable |
ShortArray | toObservable() | Observable | Turns a Short array into an Observable |
IntArray | toObservable() | Observable | Turns an Int array into an Observable |
LongArray | toObservable() | Observable | Turns a Long array into an Observable |
FloatArray | toObservable() | Observable | Turns an Float array into an Observable |
DoubleArray | toObservable() | Observable | Turns an Double array into an Observable |
Array | toObservable() | Observable | Turns a T array into an Observable |
IntProgression | toObservable() | Observable | Turns an IntProgression into an Observable |
Iterable | toObservable() | Observable | Turns an Iterable<T> into an Observable |
Iterator | toObservable() | Observable | Turns an Iterator<T> into an Observable |
Observable | flatMapSequence() | Observable | Flat maps each T emission to a Sequenece<R> |
Observable<Pair<A,B>> | toMap() | Collects Pair<A,B> emissions into a Map<A,B> | |
Observable<Pair<A,B>> | toMultimap() | Collects Pair<A,B> emissions into a Map<A,List<B>> | |
Observable<Observable> | mergeAll() | Observable | Merges all Observables emitted from an Observable |
Observable<Observable> | concatAll() | Observable | Cocnatenates all Observables emitted from an Observable |
Observable<Observable> | switchLatest() | Observable | Emits from the last emitted Observable |
Observable<*> | cast() | Observable | Casts all emissions to the reified type |
Observable<*> | ofType() | Observable | Filters all emissions to only the reified type |
Iterable<Observable> | merge() | Merges an Iterable of Observables into a single Observable | |
Iterable<Observable> | mergeDelayError() | Merges an Iterable of Observables into a single Observable, but delays any error | |
BooleanArray | toFlowable() | Flowable | Turns a Boolean array into an Flowable |
ByteArray | toFlowable() | Flowable | Turns a Byte array into an Flowable |
ShortArray | toFlowable() | Flowable | Turns a Short array into an Flowable |
IntArray | toFlowable() | Flowable | Turns an Int array into an Flowable |
LongArray | toFlowable() | Flowable | Turns a Long array into an Flowable |
FloatArray | toFlowable() | Flowable | Turns an Float array into an Flowable |
DoubleArray | toFlowable() | Flowable | Turns an Double array into an Flowable |
Array | toFlowable() | Flowable | Turns a T array into an Flowable |
IntProgression | toFlowable() | Flowable | Turns an IntProgression into an Flowable |
Iterable | toFlowable() | Flowable | Turns an Iterable<T> into an Flowable |
Iterator | toFlowable() | Flowable | Turns an Iterator<T> into an Flowable |
Flowable | flatMapSequence() | Flowable | Flat maps each T emission to a Sequenece<R> |
Flowable<Pair<A,B>> | toMap() | Collects Pair<A,B> emissions into a Map<A,B> | |
Flowable<Pair<A,B>> | toMultimap() | Collects Pair<A,B> emissions into a Map<A,List<B>> | |
Flowable<Flowable> | mergeAll() | Flowable | Merges all Flowables emitted from an Flowable |
Flowable<Flowable> | concatAll() | Flowable | Cocnatenates all Flowables emitted from an Flowable |
Flowable<Flowable> | switchLatest() | Flowable | Emits from the last emitted Flowable |
Flowable | cast() | Flowable | Casts all emissions to the reified type |
Flowable | ofType() | Flowable | Filters all emissions to only the reified type |
Iterable<Flowable> | merge() | Merges an Iterable of Flowables into a single Flowable | |
Iterable<Flowable> | mergeDelayError() | Merges an Iterable of Flowables into a single Flowable, but delays any error | |
T | toSingle() | Single | Turns any T item into a Single<T> |
Future | toSingle() | Single | Turns a Future<T> into a Single<T> |
Callable | toSingle() | Single | Turns a Callable<T> into a Single<T> |
() -> T | toSingle() | Single | Turns a () -> T into a Single<T> |
Single | cast() | Single | Casts all emissions to the reified type |
Observable<Single> | mergeAllSingles() | Observable | Mergaes all Singles emitted from an Observable |
Flowable<Single> | mergeAllSingles() | Flowable | Mergaes all Singles emitted from a Flowable |
T?.toMaybe() | toMaybe() | Maybe | Turns a nullable T value into a Maybe<T> that will only emit if not null |
Future | toMaybe() | Maybe | Turns a Future<T> into a Maybe<T> |
Callable | toMaybe() | Maybe | Turns a Callable<T> into a Maybe<T> |
() -> T | toMaybe() | Maybe | Turns a () -> T into a Maybe<T> |
Maybe | cast() | Maybe | Casts any emissions to the reified type |
Maybe | ofType() | Maybe | Filters any emission that is the reified type |
Observable<Maybe> | mergeAllMaybes() | Observable | Merges all emitted Maybes |
Flowable<Maybe> | mergeAllMaybes() | Flowable | Merges all emitted Maybes |
Action | toCompletable() | Completable | Turns an Action into a Completable |
Callable | toCompletable() | Completable | Turns a Callable into a Completable |
Future | toCompletable() | Completable | Turns a Future into a Completable |
(() -> Any) | toCompletable() | Completable | Turns a (() -> Any) into a Completable |
Observable | mergeAllCompletables() | Completable> | Merges all emitted Completables |
Flowable | mergeAllCompletables() | Completable | Merges all emitted Completables |
Observable | subscribeBy() | Disposable | Allows named arguments to construct an Observer |
Flowable | subscribeBy() | Disposable | Allows named arguments to construct a Subscriber |
Single | subscribeBy() | Disposable | Allows named arguments to construct a SingleObserver |
Maybe | subscribeBy() | Disposable | Allows named arguments to construct a MaybeObserver |
Completable | subscribeBy() | Disposable | Allows named arguments to construct a CompletableObserver |
Observable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Observer |
Flowable | blockingSubscribeBy() | Unit | Allows named arguments to construct a blocking Subscriber |
Disposable | addTo() | Disposable | Adds a Disposable to the specified CompositeDisposable |
CompositeDisposable | plusAssign() | Disposable | Operator function to add a Disposable to this CompositeDisposable |
SAM Helpers
To help cope with the SAM ambiguity issue when using RxJava 2.x with Kotlin, there are a number of helper factories and extension functions to workaround the affected operators.
Observables.zip() Observables.combineLatest() Observable#zipWith() Observable#withLatestFrom() Flowables.zip() Flowables.combineLatest() Flowable#zipWith() Flowable#withLatestFrom() Singles.zip() Single#zipWith() Maybes.zip()
Usage with Other Rx Libraries
RxKotlin can be used in conjunction with other Rx and Kotlin libraries, such as RxAndroid, RxBinding, and TornadoFX/ RxKotlinFX. These libraries and RxKotlin are modular, and RxKotlin is merely a set of extension functions to RxJava that can be used with these other libraries. There should be no overlap or dependency issues.
Contributing
We welcome contributions and discussion for new features. It is recommended to file an issue first to prevent unnecessary efforts, but feel free to put in pull requests. The vision is to keep this library lightweight, with a tight and focused scope applicable to all platforms (including Android, server, and desktop). Anything specific to a particular domain (for example, JavaFX or JDBC), might be better suited as a separate project. Feel free to open discussion and we will help figure out where your functionality may belong.