Это конспект-перевод книги Томаса Нилда "Learning RxJava", 2017
Для того, чтобы упростить навигацию по материалу, была сохранена оригинальная структура книги: все оглавления и заголовки остались в родном виде.
-
Chapter 2: Observables and Subscribers
- The Observable
- How Observables work
- Using Observable.create()
- Using Observable.just()
- The Observer interface
- Implementing and subscribing to an Observer
- Shorthand Observers with lambdas
- Cold versus hot Observables
- Cold Observables
- Hot Observables
- ConnectableObservable
- Other Observable sources
- Observable.range()
- Observable.interval()
- Observable.future()
- Observable.empty()
- Observable.never()
- Observable.error()
- Observable.defer()
- Observable.fromCallable()
- Single, Completable, and Maybe
- Single
- Maybe
- Completable
- Disposing
- Handling a Disposable within an Observer
- Using CompositeDisposable
- Handling Disposal with Observable.create()
- The Observable
-
- Suppressing operators
- filter()
- take()
- skip()
- takeWhile() and skipWhile()
- distinct()
- distinctUntilChanged()
- elementAt()
- Transforming operators
- map()
- cast()
- startWith()
- defaultIfEmpty()
- switchIfEmpty()
- sorted()
- delay()
- repeat()
- scan()
- Reducing operators
- count()
- reduce()
- all()
- any()
- contains()
- Collection operators
- toList()
- toSortedList()
- toMap() and toMultiMap()
- collect()
- Error recovery operators
- onErrorReturn() and onErrorReturnItem()
- onErrorResumeNext()
- retry()
- Action operators
- doOnNext(), doOnComplete(), and doOnError()
- doOnSubscribe() and doOnDispose()
- doOnSuccess()
- Suppressing operators
-
Chapter 4: Combining Observables
- Merging
- Observable.merge() and mergeWith()
- flatMap()
- Concatenation
- Observable.concat() and concatWith()
- concatMap()
- Ambiguous
- Zipping
- Combine latest
- withLatestFrom()
- Grouping
- Merging
-
Chapter 5: Multicasting, Replaying, and Caching
- Understanding multicasting
- Multicasting with operators
- When to multicast
- Automatic connection
- autoConnect()
- refCount() and share()
- Replaying and caching
- Replaying
- Caching
- Subjects
- PublishSubject
- When to use Subjects
- When Subjects go wrong
- Serializing Subjects
- BehaviorSubject
- ReplaySubject
- AsyncSubject
- UnicastSubject
- Understanding multicasting
-
Chapter 6: Concurrency and Parallelization
- Why concurrency is necessary
- Concurrency in a nutshell
- Understanding parallelization
- Introducing RxJava concurrency
- Keeping an application alive
- Understanding Schedulers
- Computation
- IO
- New thread
- Single
- Trampoline
- ExecutorService
- Starting and shutting down Schedulers
- Understanding subscribeOn()
- Nuances of subscribeOn()
- Understanding observeOn()
- Using observeOn() for UI event threads
- Nuances of observeOn()
- Parallelization
- unsubscribeOn()
-
Chapter 7: Switching, Throttling, Windowing, and Buffering
- Buffering
- Fixed-size buffering
- Time-based buffering
- Boundary-based buffering
- Windowing
- Fixed-size windowing
- Time-based windowing
- Boundary-based windowing
- Throttling
- throttleLast() / sample()
- throttleFirst()
- throttleWithTimeout() / debounce()
- Switching
- Grouping keystrokes
- Buffering
-
Chapter 8: Flowables and Backpressure
- Understanding backpressure
- An example that needs backpressure
- Introducing the Flowable
- When to use Flowables and backpressure
- Use an Observable If...
- Use a Flowable If...
- Understanding the Flowable and Subscriber The Subscriber
- Creating a Flowable Using Flowable.create() and BackpressureStrategy Turning an Observable into a Flowable (and vice-versa)
- Using onBackpressureXXX() operators
- onBackPressureBuffer()
- onBackPressureLatest()
- onBackPressureDrop()
- [Using Flowable.generate()]
- Understanding backpressure
-
Chapter 9: Transformers and Custom Operators
- Transformers
- ObservableTransformer
- FlowableTransformer
- Avoiding shared state with Transformers
- Using to() for fluent conversion
- Operators
- Implementing an ObservableOperator
- FlowableOperator
- Custom Transformers and operators for Singles, Maybes, and Completables
- Using RxJava2-Extras and RxJava2Extensions
- Transformers
-
Chapter 10: Testing and Debugging
- Configuring JUnit
- Blocking subscribers
- Blocking operators
- blockingFirst()
- blockingGet()
- blockingLast()
- blockingIterable()
- blockingForEach()
- blockingNext()
- blockingLatest()
- blockingMostRecent()
- Using TestObserver and TestSubscriber
- Manipulating time with the TestScheduler
- Debugging RxJava code
Фундаментальная идея в основе реактивного программирования: события - это данные, данные - это события.
onNext()- передаёт каждый элемент вниз доObserver.onComplete()- свидетельствует о том, что никакихonNext()больше не будет.onError()- отправляет ошибку вниз по цепочке вплоть до самогоObserver, где , как правило, определена обработка ошибок. Если не использован операторretry()или один из методов видаonErrorResumeNext(), то по контракту больше никаких элементов не заэмитится.
Observable.create() - фабрика, позволяющая создать Observable и обозначить эмиттер. Нужна для хуков вокруг нереактивных источников данных/событий. В первой RxJava это был Observable.fromEmitter(). Стоит обратить внимание, что Observable одновременно может эмитить только один элемент. map() и filter() возвращают новый Observable.
Во второй RxJava нельзя эмиттить Null.
Методы onNext(), onComplete(), onError() определяют тип обзёрвера. В первой эрыксджаве это был Subscriber. Во второй Subscriber появляется только когда речь идёт о Flowables. Если при выпуске элементов в Observable произойдёт ошибка, то она упадёт в Observer.onError().
| Имя | single-abstract-method | Описание |
|---|---|---|
Action |
run() |
Запускает операцию, похоже на Runnable |
Callable<T> |
get() |
Возвращает элемент типа T |
Consumer<T> |
accept() |
Что-то делает над T, ничего не возвращает |
Function<T,R> |
apply() |
Принимает T, возвращает R |
Predicate<T> |
test() |
Принимает Т, возвращает boolean |
BiConsumer<T1,T2> |
accept() |
см.Consumer<T> |
BiFunction<T1,T2,R> |
apply() |
Принимает T1, T2, возвращает R |
BiPredicate<T1,T2> |
test() |
см.Predicate<T> |
Function3<T1,T2,T3,R> |
apply() |
Принимает три аргумента, возвращает R |
BooleanSupplier |
getAsBoolean() |
Возвращает булю |
LongConsumer |
accept() |
Что-то делает с входным лонгом и всё |
IntFunction |
apply() |
Принимает int и возвращает T |
Обсерваблы, которые эмитят конечные данные чаще всего cold (отрабатывают тогда, когда на них подписались). Горячие похожи на broadcast, вещают всем подписчикам. Работает, как радио. Если пропустил трэк, его уже не услышишь. Горячие чаще всего отображают события, нежели какие-то данные.
ConnectableObservable - полезная форма горячего обзёрвабла. Он берёт любой Observable и делает из него hot, чтобы он эмитил всем подписчикам одновременно.
Просто вызови publish() у любого обзёрвабла и он вернёт ConnectableObservable.
Но subscribe на такой обзёрвабл не вызовет эмиты. У него надо вызвать connect(), чтобы тот начал пулять. То есть можно подписать несколько на ConnectableObservable, потом ткнуть source.connect(), и только тогда все подписчики получат свои данные. Такая фиговина более известна, как Multicasting.
-
Observable.range()- для создания последовательности интов. Создаётся до тех пор, пока не будет достигнут конечный инт. Все инты передаются черезonNext(), что неудивительно. -
Observable.interval()- time-based обзёрвабл, который эмитит с заданным интервалом. Бесконечный. Cold. -
Observable.future()- обёртка надFuture. -
Observable.empty()- ничего не эмитит и вызываетonComplete(). Вместо null'ов. -
Observable.never()- похож наempty(), только никогда не вызываетonComplete(). -
Observable.error()- вызываетonError(). -
Observable.defer()- какая-то мощная фиговина, так как может создавать separate state(отдельное состояние) для каждогоObservable. Ну например нам нужно подтягивать изменившееся состояние в обзёрвабл. -
Observable.fromCallable()- если нам надо произвести какое-то вычисление или операцию, а затем заэмитить его/её, можно просто вызватьObservable.just(). Но иногда надо это проделать лениво. Также, если эта операция вызывает ошибку, лучше бы она её выкидывала только на момент выполнения чейна. Более того, если вObservable.just()случится ошибка, то она не будет передана вObserver. Короче говоря, если то, что вы оборачиваете вObservableможет вызывать ошибку, то оборачивать надо вObservable.fromCallable().
Есть три разных типа обзёрваблов, которые эмитят ноль или один элемент.
-
Single- эмитит, вот это поворот, один элемент.onSuccess()включает в себяonNext()иonComplete(). В лямбде, соответственно, ловитсяonSuccess()иonError(). -
Maybe- если количество элементов от 0 до 1. -
Completable- придуман для того, чтобы выполнять какое-то действие. Ничего не получает. ЕстьonError()иonComplete().
Когда мы вызываем subscribe(), создаётся поток событий и обрабатывает emissions в цепочке. Для этого выделяются определённые ресурсы. Слава Богу, Observable высвобождает эти ресурсы как только отрабатывает onComplete(). Но в случае, если у нас бесконечный или турбо долгий(его выполнение занимает много времени) поток, нам может понадобится конкретный dispose.
Короче, нельзя доверять GC чистку толстых потоков, надо диспозить самому, чтобы избежать мемори ликов
Disposable - это связующее звено между Observable и активным Observer. Можно вызывать его dispose() для того, чтобы прекратить эмитить элементы и высвободить память, затраченную на их выпуск.
В onSubscribe(Disposable d) у обзёрвера передан Disposable. Это нужно для того, чтобы обзёрвер мог контролировать подписку и у него был вариант отписаться в любой момент. Disposable передаётся по всей цепочке обзёрваблов. Вообще, передача Observer'a в subscribe() не вернёт Disposable (rxJava сама захэндлит это вот всё), но если очень надо, то можно юзануть subscribeWith(Disposable), тем самым получив дефолтный Disposable.
Использование CompositeDisposable. НужнО в случае, если у нас несколько подписок, а мы хотим манагить их (отписаться от всех разом, к примеру).
Тут важно вкурить, что операторы сами являются Observer'ом для своего Observable выше по цепочке.
Эти операторы просто тупо не вызывают onNext(), если не проходит какое-то условие. Соответственно, элемент не проходит вниз по цепочке.
-
filter()- принимаетPredicate<T>для обзёрваблаT. Каждый эмишн мапится в булю, которая говорит - подходит данный эмишн к условию или нет. ЭмишеныFalseдальше не проходят. Если вообще нет подходящих вариантов, то вернётся пустойObservable, ферштейн? -
take()- у него несколько реализаций, тривиально. -
skip()- противоположноtake(). -
takeWhile(),skipWhile()- принимает/пропускает, пока соответствует условию. -
takeUntil(),skipUntil()- принимают другойObservableв качестве параметра. Принимают/пропускают, пока другой обзёрвабл продолжает эмитить. -
distinct()- будет эмитить каждый уникальный элемент, но подавлять повторяющиеся. Сравнение работает наhashcode()/equals()выпускаемых объектов. Надо иметь в виду, что если если у нас дохрена уникальных элементов, то этот оператор будет есть память. Типа как если бы каждая подписка создавалаHashSet, отслеживающий предыдущие выпущенные элементы. Можно также кинуть вdistinct()лямбду, по которой будет отобран уникальный ключ. -
distinctUntilChanged()- полезная штуковина. Эмитит элементы, если входящие значения изменились. То есть она игнорит последовательно повторяющиеся элементы.2,2,3,3,3,1,1 -> 2,3,1. Также принимает лямбду на вход, по которой собственно сравнивает элементы. -
elementAt()- можно получить специфичный эмишн по его id'шнику (longот 0 доLong.MAX_VALUE). ВозвращаетMaybe<T>. У него есть разные реализации:elementAtOrError()- вернётSingleилиError(в случае, если по данному индексу нифига нет).singleElement()- вернёт Observable, обёрнутый в MaybefirstElement()иlastElement().
-
map()- для заданногоObservable<T>меняетTнаR, используя функциюFunction<T,R>. Конвертирует один-к-одному каждый эмишн. Если нужно конвертить один-к-нескольким эмишнов, то надо использоватьflatMap()илиconcatMap(). -
cast()- map-like оператор, который кастует каждый эмишн к заданному типу. Некий такой брут-форс для приведения типов. Лучше правильно юзать дженерики, разумеется. -
startWith()- позволяет нам впихнуть какой-нибудь эмишн, с которого начинаются другие эмишны. Например, у нас естьObservable<String>,выпускающий элементы, которые мы хотим напечатать в менюшке. Можно использоватьstartWith(), чтобы сначала написать заголовок менюшки.
Observable.just("Чай", "Кофе", "Лимонад")
.startWith("НАПИТКИ")
.subscribe(System.out::println)
>output:
НАПИТКИ
Чай
Кофе
Лимонад
Если нам нужно больше, чем один элемент для старта, можно бахнуть
-
startWithArray(), тогда отработают сначала элементы массива. Если нужно ждать выпуск всех эмишнов другогоObsaervable, стоит использоватьObservable.concat()илиconcatWith(). -
defaultIfEmpty()- подставляет дефолт, если пусто. Вот это да. -
switchIfEmpty()- переключается на другойObservable, если предыдущий не заэмитил ни одного элемента. -
sorted()- если у нас есть конечныйObservable<T>, который выпускает элементыComparable<T>, можно юзать этот оператор, чтобы сортировать эмишены. Под капотом он соберёт все эмишены, а потом перевыпустит их в заданном порядке. Если применить к бесконечномуObservable, получимOutOfMemory, естесна. Можно податьComparatorв качестве аргумента. -
delay()- собирает все элементы и выпускает затем один за одним с заданным интервалом. Так как оперирует на другом шедулере, то надо это иметь в виду и не тупить в посках выполненной операции. Она могла не успеть выполниться до завершения main-потока. Для применения продвинутогоdelay()можно подать в него другойObservable, и этот дилей отложит свои эмишены на время, пока данный обзёрвабл чё-нить не выпустит. Есть также такая штука, какdelaySubscription(), которая откладывает подписку, а не выпуск эмишнов. -
repeat()- повторит подписку послеonComplete()заданное количество раз. Есть ещёrepeatUntil(), который принимаетBooleanSupplierи продолжает повторения до тех пор, пока супплаер не выдастtrue. -
scan()- rolling аггрегатор. Аккумулирует каждый эмишн и добавляет его к следующему.
Observable.just(5, 3, 7, 10)
.scan((accumulator, next) -> accumulator + next)
.subscribe(System.out::println)
>output: 5 8 15 25Это не обязательно лепить для интеджеров, можно аккумулировать всё что угодно. Отличается от reduce() тем, что тот берёт один эмишн, когда отрабатывает onComplete(). Может использоваться в бесконечных обзёрваблах.
Будут случаи, когда необходимо взять серии эмишенов и сконсолидировать их в один элемент (зачастую какой-нибудь Single). Для этого есть несколько операторов. Заметьте, что эти операторы работают только с конечным Observable, который вызывает onComplete.
-
count()- простейший оператор для объединения множества эмишенов. Он подсчитает количество элементов и выпуститSingle, когда отработаетonComplete(). Если нужно посчитать эмишены на бесконечном обзёрвабле, юзатьscan(). -
reduce()- синтаксически похож наscan(), но только выдаёт последнее саккумулированное значение. Может выдатьSingleилиMaybeв зависимости от вашей имплементации. Например, если нужон общий интеджер:
Observable.just(5, 3, 7, 10, 2, 14)
.reduce((total, next) -> total + next)
.subscribe(System.out::println)
>output: 41all()- эта хреновена проверяет, что все элементы подпадают под определённый критерий и возвращаетSingle<Boolean>. Как только один из элементов не совпадает, сразу возвращаетFalse.
Не тупи и запомни: если вызвать
all()к пустому обзёрваблу, он выдастTrue.
any()- соответственно, если есть хотя бы один подходящий эмишн.
Кстати, если вызвать его у пустого обзёрвабла, то он выдаст
False.
contains()- тут всё просто: вырнёт булю, если в эмишеннах есть данный элемент. Работает наhashCode()/equals().
Аккумулируют все эмишены и собирают в одну коллекцию, типа листа или мапы. Затем выплёвывают эту коллекцию. Являются на самом деле ещё одной формой reducing operators, но достойны отдельного параграфа. Кстати да, не стоит злоупотреблять этими операторами, так как это может навредить вашей реактивщине: лучше обрабатывать ивенты один-за-другим, так же, как они эмитятся, нежели скидывать всё в кучу и потом разгребать её. Смысл эрыкса тогда?
-
toList()- собирает все эмишены изObservable<T>вList<T>и выдаёт его в видеSingle<List<T>>. После того, какObservableотстреливаетonComplete(), лист пушится в обзёрвер. По дефолту, этот лист будетArrayList'ом, можно передать capacity и задать таким образом ограничение на количество элементов. -
toSortedList()- собирает в сортированный лист (работает наComparator'ах). Соответственно, можно передать компаратор в качестве аргумента. -
toMap()- для заданногоObservable<T>соберёт вMap<K,T>, гдеK- ключ, полученный из лямбдыFunction<T,K>.
Observable.just("Раз", "Два", "Три")
.toMap(s -> s.charAt(0))
.subscribe(System.out::println)
>output: {Р=Раз, Д=Два, Т=Три}Можно и посложнее смаппить, если в лямбду передать через запятую правило для value.
...
.toMap(s-> s.charAt(0), String::length)
.subscribe(..)
>output: {Р=3, Д=3, Т=3}По дефолту toMap() использует HashMap, можно обеспечить ConcurrentHashMap: .toMap(s-> s.charAt(0), String::length, ConcurrentHashMap::new).
-
toMultiMap()- в случае, если хэши совпадают, образует список из значений, соответствующих данному ключу. -
collect()- нужен, чтобы собрать элементы в специфическую коллекцию:.collect(HashSet::new, HashSet::add)соберёт вHashSetи выдастSingle<HashSet>. Используйтеcollect()вместоreduce(), когда вы собираете эмишены в мутабельный объект, чтобы затем получить sealed объект. Хороший пример с guava:
Observable.just("Оп", "Оп", "Опана")
// собрали в билдер все элементы
.collect(ImmutableList::builder, ImmutableList::add)
// сбилдили неизменяемый гуавовский список
.map(ImmutableList.Builder::build)
.subscribe(..)В общем collect() хорош для сбора элементов в любую "нестандартную"
для rxjava коллекцию.
Так, ну мы уже знаем про метод onError(), который пронизывает всю цепочку от Observable до Observer. После его срабатываения, поток завершает выполнение и никаких эмишенов больше не происходит. Но иногда нам надо перехватить ошибку до того, как она провалится до Observer'a и поток завершится с ошибкой.
-
onErrorReturn()иonErrorReturnItem()- когда нужно вернуть дефолтный элемент в случае ошибки. Принимает лямбду сThrowable'ом. Важно, где стоит в цепочке. Чтобы ловить ошибку, должен стоять после обзёрвабла, который выдал эту ошибку (логично, потому как каждый следующий член цепочки являетсяObserver'ом предыдущего члена иObservabl'ом для следущего). -
onErrorResumeNext()- аналогичен предыдущим, только принимает ещё другойObservableв качестве параметра (который эмитится на случай ошибки). Можно по красоте бахнуть например.onErrorResumeNext(Observable.empty())и вызвать тем самымonComplete()в цепочке, где произошла ошибка. Искать только её потом устанем (прим.автора). -
retry()- переподписывается кObservabl'у в надежде, что ошибка рассосётся сама по себе. Есть несколько перегруженных методов. Можно, например, задать интом количество попыток. Можно пульнуть в негоPredicate<Throwable>илиBiPredicate<Integer,Throwable>, чтобы более точно описать случай, когда должен отработатьretry().retryUntil()будет пересабскрайбиться до тех пор, пока соотв.BooleanSupplierв лямбде выдаётfalse. Есть также крутойretryWhen(). С его помощью можно применить композицию из задач для ретрая (к примеру, выставитьdelayдляretry).
Помогают при дебаге, улучшают понимание того, что собственно проиходит в чейне. Вставлять между целевым обзёрвером и его обзёрваблом (прим.автора).
doOnNext()- позволяет брать эмишены предыдущегоObservable'а. Что-то типа мини-обзёрвабла в середине цепочки. Он никак не меняет эмишены. С помощьюdoOnNext()мы просто создаём side-effect для каждого ивента в цепочке. В этом примере мы просто выведем все стринги в консоль до того, как они упадут в операторmap():
Observable.just("One", "Four", "Twelve")
.doOnNext(s -> System.out::println)
.map(String::length)
.subscribe(System.out::println)
>output: One 3 Four 4 Twelve 6Есть также doAfterNext(), который предпринимет заданное действие ПОСЛЕ
того, как эмишн ушёл дальше по цепочке.
-
doOnComplete()- срабатывает, когда чейн вызывает свойonComplete(). Может быть полезен в случае, когда нужно понять, какая часть цепочки отработала нормально и завершиласьonComplete(). -
doOnError()- соответственно, срабатывает, когда по цепочке пробрасывается ошибка. Опять же, полезно вставлять между операторами. Тут лучше с примером:
Observable.just(5, 2, 4, 0, 3, 2, 8)
.doOnError(e -> System.out.println("Source failed")
.map(i -> 10/i)
.doOnError(e -> System.out.prinln("Division failed!")
.subscribe(i -> System.out::println,
e -> System.out.println("Observer gets error");
> output: 2 5 2 Division failed! Observer gets errorТо есть "Source failed" не отработал, так как не было выпущено никаких ошибок из .just(...).
Есть также doOnEach(), где мы можем определить действие для всех трёх предыдущих операторов. Это типа как влепить subscribe() посередине чейна.
-
doOnSubscribe()- выпускаетConsumer<Disposable>в момент, когда в чейне срабатывает подписка. Предоставляет доступ кDisposableна случай, если нам нужно вызватьdispose(). -
doOnDispose()- позволяет предпринять какую-то операцию в момент, когда в чейне отработалdispose().
Если нам нужно сделать что-то в любом случае, будь то
onComplete(),onError()илиdispose, нам понадобитсяdoFinally(), который отрабатывает во всех трёх вариантах.
doOnSuccess()- как мы помним,MaybeиSingleвызываютonSuccess(), у них нет никакихonNext(). Для них естьdoOnSuccess(), который работает также, какdoOnNext().
Втираем себе в виски понимание того, что action операторы ставятся между observer и observable и отрабатывают ДО observer'овских методов.
Распространённое действие по слиянию двух и более обзёрваблов <T> в один Observable<T>.
-
Observable.merge()- если у нас от двух до четырёх однотипных обзёрваблов, можно использовать этот оператор. Аналогичноsource1.mergeWith(source2).subscribe(..).Observable.merge()и операторmergeWith()подпишутся на все источники одновременно, но элементы будут эмититься в одном потоке. Если нам нужен поочерёдный(заданный) эмит элементов, то надо использоватьObservable.concat(). Не стоит полагаться на порядок эмитов в случае сmerge(), когда вам нужна какая-то конкретная последовательность элементов. Если такая необходимость всё-таки есть, юзатьconcat(). В случае, когда смёрджить надо больше четырёх обзёрваблов, использоватьObservable.mergeArray(), который принимает varargsObservable[]. Оказывается, можно подать такжеIterable<Observable<T>>в методmerge(), и он нормально отработает. Работает с бесконечными обзёрваблами. -
flatMap()- вот на этом операторе надо остановиться подробнее. Производит динамическийObservable.merge(): берёт каждый эмишн и мапит его вObservable. Затем он мёрджит эмишены получившихсяObservable'ов в один поток. Простейшее практическое применение дляflatMap()- смаппить один эмишн в несколько. К примеру, мы хотим заэмититьcharиз каждогоStringуObservable<String>. Для такой задачи можно использоватьflatMap(), которому передадимFunction<T, Observable<R>>в ламбду. Она смапит каждый стринг вObservable<String>, который будет эмитить буквы. Надо обратить внимание, что получившийсяObservable<R>может эмитить любой элементRтолько если он отличается от входного типаT.
Observable<String> source = Observable.just("по", "буквам");
source.flatMap(s -> Observable.fromArray(s.split("")))
.subscribe(char -> System.out.print(char + " ");
>output: п о б у к в а мМы разбили каждый стринг на char, обернули это в Observable и передали в flatMap(), который заэмитил все char'ы в один поток.
flatMap() также принимает лямбду в виде BiFunction<T,U,R>. Изначально заэмиченный тип T ассоциируется с flat-map'ленным значением U и оба эмитятся в значение R.
Observable<String> source = Observable.just("Один", "Два")
source.flatMap(s -> Observable.fromArray(s.split("")),
(s,r) -> s + "-" + r)
.subscribe(System.out::println);
>output:
Один-О
Один-д
Один-и
Один-н
Два-Д
Два-в
Два-аМожно также использовать flatMapIterable(), который смапит все T эмишены в Iterable<R> вместо Observable<R>. Потом выпустит все значения R для каждого Iterable<R>, позволяя пропустить оверхед с запаковыванием его в Observable. Есть варианты, которые мапят в Single, Maybe и Completable.
Похоже на слияние(merging), но с одним ньюансом: эмитит элементы каждого предоставленного обзёрвабла в определённом порядке. Выпуск не перейдёт к следующему обзёрваблу, пока текущий не стрельнёт onComplete(). Такой подход гарантирует, что объединяемые источники выпустят свои элементы в гарантированном порядке. Однако, это хреново, когда речь идёт о бесконечном обзёрвабле. Юзать объединение (concatenation), когда нужен чёткий порядок.
-
Observable.concat()- аналогObservable.merge(). Объеденит эмишены разныхObservableи будет пулять их поочерёдно, переключаясь к следующему источнику только после того, как текущий вызоветonComplete(). В простом примере с двумя обзёрваблами, отрабатывает в принципе так же, какObservable.merge(), но мы уже вкурили, что в данном случае порядок гарантирован. Автор пишет, что надо не тупить и не concat'ить бесконечные обзёрваблы, мол, RxJava будет ждатьonComplete(), которого по понятной причине не произойдёт. На крайняк, ставить такой infiniteObservableвторым в цепочке, чтобы он запускался после первого. (можно вывернуться, кинувtake(), и спровоцировать этим вызовonComplete()). -
Observable.concatArray()стригеррит выпуск каждогоObservableвObservable[]. ОбычныйObservable.concat()также спокойно справляется сIterable<Observable<T>>и эмитит их по-очереди. -
concatMapEager()подпишется на все полученныеObservable, скэширует эмишены до тех пор, пока не придёт их черёд в очереди. -
concatMap()- по аналогии с уже существующимflatMap()'ом, который мёрджит обзёрваблы, полученные из каждого эмишена, есть оператор для объединения таких вещей -concatMap(). Он предпочтительнее в случае, когда нам важен порядок объединения элементов. Каждый обзёрвабл мапится из эмишна до тех пор, пока тот не завершится. Только потом стартует следующий эмишн. Если источник выпускает больше обзёрваблов, чемconcatMap()может обработать за промежуток времени, они ставятся в очередь.
Предыдущие примеры flatMap()'а больше подходят для concatMap()'а, потому что в них важна очерёдность. И хотя результатом использования обоих операторов будет одно и то же, всё таки стоит правильно использовать каждый из них, чтобы потом не удивляться, почему порядок вдруг поплыл (прим.ред). Опять же, использовать concatMap() на бесконечных источниках тупо, потому что он работает через onComplete(). В таком случае надо брать flatMap().
После слияния(merge) и объединения(concatenation) надо бы пару слов сказать об ambiguous(хрен знает, яндекс.переводчик топит за "неоднозначный, двоякий").
Observable.amb()принимаетIterable<Observable<T>>и выпускает эмишены первого заэмитившегоObservable. Остальные в свою очередь высвобождаются. Первым считается такой обзёрвабл, чьи эмишены проходят через данное звено в чейне(толкование автора конспекта). Это полезно, когда у вас несколько источников одних и тех же данных, поэтому вам пофиг откуда их брать, лишь бы побыстрее.
Зиппинг позволяет взять несколько разных эмишенов и объеденить их в один. Типы входных эмишенов могут быть разные, но мы можем объеденить и их тоже.
-
Оператор
zip()работает по принципу молнии: эмишн одного обзёрвабла ждёт эмишн другого, чтобы зазиповаться с ним. Если один обзёрвабл выпалилonComplete(), а у другого есть элементы к выпуску, они просто напросто дропнутся. Можно с таким же успехом использоватьzipWith(). -
В фабрику
Observable.zip()вариант подать до девятиObsevable'ов. Если надо больше, то использоватьzip.Array().
Тут надо иметь в виду, что, если один из источников эмитит медленнее, чем остальные, другие источники данных будут тупить и ждать отстающего. Если пофиг на конкретный эмишн и надо взять последний, то можно использовать
combineLatest()(о нём чуть позже).
-
Observable.combineLatest()- что-то похожее наzip(), только отрабатывает с самым последним элементом, выпущенным из любого другого источника. Он не блочит и не ставит в очередь эмишены, которым не нашлось пары, но кэширует их и зипует с последним. Проще говоря, когда один источник эмитит, результат зипуется с последним заэмиченным элементом другого источника. Крайне актуально, когда речь идёт о комбинировании UI input'ов, так как там не важны предыдущие инпуты пользователя, а нужен только последний. -
withLatestFrom() - немного отличается отcombineLatest(). Он мапит каждыйTэлемент одного эмишена с последним элементом другого, но берёт из каждых эмишенов только ОДИН элемент. В итоге получится, что повторяющихся значений не будет.
Возможность группировать разные эмишены в отдельные Observable - мощнейший инструмент rxJava.
- Для таких манёвров у неё есть оператор
groupBy(), который принимает лямбду и маппит эмишн к соотв.ключу. Затем он возвращаетObservable<GroupedObservable<K,T>>, который эмитит специальный типGroupedObservable. Такой обзёрвабл почти ничем не отличается от обычного, только у него есть значение ключа в качестве свойства.
К примеру, у нас есть набор стрингов, которые мы хотим объединить по длине:
Observable<String> source = Observable.just("Кот", "Пёс", "Змея","Либерал");
Observable<GroupedObservable<Integer,String>> byLengths =
source.groupBy(s -> s.lenght());Скорее всего мы захотим использовать flatMap() к каждому элементу GroupedObservable, а внутри этого флэтмапа попробуем собрать результаты в кучу. В таком случае на выходе получится Single, поэтому и использовать надо flatMapSingle(). Давайте вызовем toList(), чтобы выпустить эмишены в виде листов с элементами, сгруппированными по длине:
byLengths.flatMapSingle(grp -> grp.toList())
.subscribe(System.out::prinln);
>output:
[Кот, Пёс]
[Змея]
[Либерал]На выходе у нас получились списки с элементами одинаковой длины. (вообще, либерал, конечено, должен был запоковаться вместе с змеёй, но RxJava далека от политики и срабатывает чётко, как автомат Калашникова).
У GroupedObservable есть метод getKey(), возвращающий ключ, с которым ассоциируется этот обзёрвабл.
В этой главе речь будет идти преимущественно о multicasting. Replaying и cashing - это на самом деле тоже про мультикаст. Будет и разбор Subjects, утилиты, предназначенной для decoupling при мультикасте. Она должна быть использована с умом и в конкретных случаях. Также рассмотрим несколько вариаций subjects.
Ранее мы уже имели дело с ConnectableObservable, который эмитит элементы всем подписанным Observer'aм при вызове метода connect(). Такая идея объединения стримов называется мультикастом.
Но есть некоторые ньюансы, когда речь заходит о применении операторов, которые могут опять создать отдельные стримы.
Тут соль в том, КОГДА консолидируются стримы.
Например, если вызвать мультикастинг(с помощью publish()) до оператора map(), то стримы будут объеденены в один прокси Observable до того, как будет применена мапа.
Если мы хотим, чтобы оператор map() не генерил нам разные стримы для каждого Observer'а, то publish() надо вызывать ПОСЛЕ map().
В целом эта фича полезна для того, чтобы избежать ненужного повторения одной и той же работы, а также в тех случаях, когда разные Observer'ы используют одни и те же данные. Плюс ко всему это позволяет разгрузить CPU.
Надо помнить, что мультикастинг создаёт горячий ConnectableObservable. Тут важно правильно вызвать connect(), чтобы ни один из подписчиков не проморгал данные.
Рекомендуется держать ваши обзёрваблы холодными и вызывать publish() только когда нужно, чтобы они стали погорячее.
Даже в случае с горячими обзерваблами (типа UI в андроиде), иногда не стоит лишний раз обвешивать их ненужными операторами. Это увеличивает стоимость операции, да и нафига, если у нас, к примеру, один только подписчик. Короче говоря: думать, прежде чем обмазывать всё publish()'ем. Вот если у нас несколько Observer'ов, то тут можно прикинуть, а не объединить ли их в один стрим.
Вам стопудово когда-нибудь захочется вручную вызывать connect(), чтобы типа контролировать эмишены и всё такое. Но есть также варианты из-под капота (с которыми надо быть аккуратнее).
-
autoConnect()- уConnectableObservableесть такой вот оператор. ВозвращаетObservable<T>, который автоматически пульнётconnect()после определённого числа подписчиков. Нужен пример? Наврядли, тут всё ясно. Ради прикола напишу, что автор уточняет: этот оператор не очень полезен, когда мы не знаем точно числоObserver'ов. Спасибо, чувак, мы это ценим. А вот это интересно: даже когда целевые обзёрверы отработали или высвободились,autoConnect()оставит свою подписку на источник. Если источник конечен и завершает своё выполнение,autoConnect()не подпишется на него снова, когда внизу чейна появится новыйObserver. То есть вновь прибывшие обзёрверы проморгают эмишены. По дефолту, аргумент - 1. -
refCount() и share()refCount()похож наautoconnect(1), который выстреливает сразу после того, как на него подписался первый обзёрвер. Но есть офигенно важное отличие: после того, как этот чейн отработает, он высвободится. И перезапустится, когда на него опять кто-нибудь подпишется. Он не сохраняет подписку на источник, когда у него больше нет обзёрверов.refCount()может быть полезен в случаях, когда надо объединить стримы для каких-то подписчиков, но высвободить эмишн, если вдруг их не осталось. Причём так, чтобы выпуск элементов стартанул вновь после того, как появятся новые подписчики.publish().refCount()может быть заменён операторомshare().
Мультикастинг также позволяет кэшировать значения, которые делятся между несколькими Observer'ами. Повторение и кэширование данных - это фича мультикастинга.
Оператор replay() - мощная штука, когда речь идёт о необходимости сохранить ранее заэмиченные элементы и перевыпустить их, когда коннектится новый Observer.
Он вернёт ConnectableObservable, который замультикастит эмишены, а также заэмитит ранее выпущенные данные в заданном скоупе (scope).
Закэшированные эмишены выстреливают сразу же после того, как появился новый подписчик, затем будут выпущены текущие элементы. То есть все "опоздавшие" подписчики ничего не пропустят и получат все элементы с самого начала.
Данная операция может быть несколько дороговата, так как replay() кэширует все эмишены.
Если задать bufferSize, то кэшироваться будет ограниченное количество элементов. bufferSize(2) - и вновь подписавшийся получит последние два элемента.
Обратите внимание: в случае, если мы хотим сохранять кэш для значений в
replay()даже если у нас нет подписчиков, надо использоватьreplay()в сочетании сautoConnect(), а неrefCount(). Дело в том, чтоrefCount()высвобождает и кэш и чейн в целом, поэтому новый подписчик инициирует срабатывание чейна с самого начала.
replay() также принимает временнОе значение в качестве аргумента. Повторять за последние T времени (в буфере хранится то, что было выпущено за последние две секунды, например).
Когда вы хотите кэшировать вообще гаддэмн всё, что было выпущено за продолжительный отрезок времени, и вам при этом не надо контролировать поведение подписчиков относительно источника ConnectableObservable, можно заюзать оператор cache().
Он подпишится к источнику вместе с первым обзёрвером и сохранит все значения. Это делает его не самым лучшим кандидатом к применению, когда речь идёт о бесконечных источниках.
Как вариант - cacheWithInitialCapacity() с количеством элементов, которые надо кэшировать. Важно: не используйте cache(), если вам реально не надо сохранять все элементы. Вместо этого лучше использовать replay(), так как он позволяет более тонко выбрать стратегию высвобождения чейна и правильно установить ограничения на сохранение элементов.
Прежде всего следует отметить, что у Subject'ов есть свои определённые use-case'ы, но новички часто усложняют и используют этот инструмент неправильно.
Они представляют из себя одновременно и Observer и Observable, выступая в роли proxy multicasting устройства (что-то типа event bus). Прежде, чем использовать Subject'ы, следует убедиться, что вы предприняли все остальные возможные меры. Erik Meijer, создатель RxJava, называет их "изменяемыми переменными в мире реактивного программирования". Как бы мы не стремились к immutable переменным, без изменяемых переменных всё-равно не обойтись. Аналогичным образом получается и с Subjects, которые порой необходимы, чтобы объединить императивное программирование с реактивным.
Прежде, чем объяснять, когда стоит использовать Subjects, а когда этого делать не следует, пройдёмся по тому, что они собственно делают.
Есть две абстрактные имплелемнтации Subject, которые реализуют одновременно и Observable и Observer. То есть мы можем вызвать onNext(), onComplete(), onError() у Subject'a, и он передаст эти ивенты вниз к Observer'ам.
Самый простой пример Subject - PublishSubject, который бродкастит (hotly выпускает элементы) своим обзёрверам. Другие Subject добавляют различные поведения к этой логике, в то время как PublishSubject можно считать своего рода ванилью.
Subject<String> subject = PublishSubject.create();
subject.map(String::length)
.subscribe(System.out::println);
subject.onNext("Раз");
subject.onNext("Четыре");
subject.onNext("Двадцать");
>output:
3
6
8Выглядит так, будто Subject - магический девайс, мост между императивщиной и реактивщиной. Так оно и есть. Теперь давайте посмотрим, когда его использовать, а когда лучше не стоит.
Допустим, мы используем DI или ещё какой инструмент для инвертирования зависимостей. При таком подходе есть вероятность, что разные модули предоставляют разные Observable'ы, которые могут понадобиться нам в рамках одного стрима. Без Subject'а это может быть довольно сложным для реализации, так как нам придётся особым образом варить эти обзёрваблы. В обычной ситуации мы бы просто взяли Observable.merge() и были бы правы, но это не всегда прокатывает. В целом, Subjects помогают избежать жёсткого связывания.
Subject'ы хорошо подходят для бесконечных event-driven(а это то же самое, что и action-driven) Observables.
В связи с тем, что Subject'ы - горячие обзёрваблы, важно вовремя вызывать subscribe(). Делать это следует до того, как мы передаём что-то в onNext(). Иначе Observer прохлопает эмишены.
Такое положение дел свидетельствует о том, что Subject'ы могут быть опасной приблудой, особенно, если вызов onNext() происходит где-нибудь хрен знает где. Реактивный подход подразумевает, что исходные Observable предоставляются хорошо определённым и предсказуемым источником. Более того, Subject'ы не имеют public метода dispose() и поэтому не высвободят переданные в них ресурсы, когда внизу по чейну отработает dispose().
Намного лучше держать такие data-driven источники холодными, а затем использовать мультикаст с помощью publish() или replay(), если вдруг понадобится, чтобы они стали горячими.
Если вам нужен
Subject, кастаните его кObservableили лучше вообще не используйте его. Можно также сделать обёртку надSubject'ом, где методы будут передавать в него соответствующие ивенты.
Важно понимать, что все операции над Subject'ом, типа onSubscribe(), onError() и onComplete() не являются потокобезопасными! Если несколько потоков вызывают эти методы как им вздумается, выпускаемые элементы могут сломать контракт Observable, который требует, чтобы ивенты происходили поочерёдно. Если такое происходит, то хорошей практикой считается вызвать toSerialized() к Subject'у для того, чтобы запаковать его в безопасный засериализованный вариант сабжекта. Это обезопасит его выполнение в разных потоках и ничего не поломается:
Subject<String> subject = PublishSubject.<String>create().toSerialized();У компилятора есть ограничения на такие манёвры и нужно указывать тип для метода
create(). Такая штука не прокатит, если в цепочке послеcreate()стоит более одного оператора. Произойдёт ошибка компиляции.
Ведёт себя примерно так же, как PublishSubject, только повторяет последний заэмиченный элемент для каждого вновь подписавшегося обзёрвера. Использование этого Subject равносильно использоанию replay(1).autoConnect() после PublishSubject'а.
Эквивалент PublishSubject'а, дополненного оператором cache(). Он незамедлительно отлавливает эмишены вне зависимости, подписан на него кто-нибудь или нет, и кэширует их. Каждый вновь подписанный обзёрвер получит ранее выпущенные и закэшированные элементы. Очевидно, использовать надо с умом, когда речь идёт о каких-то больших объёмах данных.
Пушит только последнее значение, которое получает перед тем, как отработает onComplete().
В принципе повторяет поведение
CompletableFutureиз восьмой джавы: произведёт необходимое нам вычисление над выбранным элментом, как только он отработает. Можно эмитировать с помощьюtakeLast(1).replay(1). Попробуйте сначала выполнить такой приём прежде, чем использоватьAsyncSubject.
Может оказаться полезным сабжектом, если нам надо поместить в буфер все эмишены в стриме до тех пор, пока на него кто-нибудь не подпишется, а затем выпалить все эмишены этому подписчику и освободить буфер.
Будет работать только с одним Observer, когда тот подпишется. Выкинет ошибку, если вдруг появятся ещё какие-то подписчики. Это логично, так как буфер очищается сразу после того, как первый подписчик получает все выпущенные элементы. Следующим обзёрверам просто напросто нечего получать. Если вы хотите, чтобы остальные тоже получили свои эмишены, надо использовать ReplaySubject. Плюс данного сабжекта в том, что он очищает память, которая была использована для буферизации.
Для глубокого понимания Java concurrency автор советует книгу Brian Goetz: Java Concurrency in Practise.
Создание потоков - дорогая операция. Лучше переиспользовать их и заставлять выполнять очередь из задач.
По дефолту, Observable выполняет работу в том же потоке, в котором был создан и подписан Observer. Почти во всех предыдущих примерах это был main thread, который запускался функцией public static void main().
Но были некоторые оговорки, к примеру, в случае с Observable.interval(). Этот обзёрвабл запустит поток вне main thread'а, который в свою очередь не будет ждать его выполнения. Такой обзёрвабл делает из нашей приложухи многопоточное приложение, так как речь уже идёт как минимум о двух thread'ах.
Зачастую, concurrency полезно, когда нужно выполнять какие-то жирные долгоиграющие операции.
subscribeOn()- позволяет задать конкретныйSchedulerдля обозначенного источника. ЗадачаScheduler'а - предоставить thread для выполнения той или иной операции. Он хранит пул thread'ов. После вызоваonComplete(), thread будет отдан обратно его шедулеру, таким образом его можно будет переиспользовать где-нибудь в другом месте.
Observable'ам безразлично на каком потоке они выполняются. RxJava неплохо справляется с многопоточностью даже в случаях, когда нам нужно скомбинировать результаты выполнения разных обзёрваблов из разных потоков.
Если нужно иметь дело с высоконагруженными
Observable'ами, которые эмитят 10К и более элементов, то стоит работать сFlowable.
Когда мы работаем с фрэймворками, которые поддерживают выполнение главного потока (Android, JavaFX, ещё там какие-нить), то мы не паримся насчёт того, живо наше приложение или нет. Эти фреймворки работают за нас. Ситауция обстоит несколько иначе, когда нам нужно работать из main()(или с тестами, прим.автора). Тут придётся заставлять главный поток спать, либо идти на ещё какие-нибудь ухищрения. Можно в принципе бесконечно усыпить его, подав Long.MAX_VALUE в Thread.sleep() (main поток уснёт на 292 года).
Есть способы поддержать жизнеспособность приложения ровно на столько, чтобы выполнились все операции в subscription. Brian Goetz писал про такие стандартные тулзы, как CountDownLatch, которые могут подождать выполнения двух подписок. Но намного проще заюзать блокирующие операторы(blocking operators) RxJava.
Обчычно, blocking operators используются в Unit-тестах. Могу породить анти-паттерны при бездумном использовании в продакшн-коде.
Thread-pools являются коллекциями потоков. Эти потоки сохраняются и переиспользуются в соответствии с политикой пула.
Некоторые пулы имеют фиксированное количество потоков (такие, как computation()), в то время, как другие динамически создают и уничтожают потоки по мере надобности. В стандартной джаве вы используете ExecutorService в качесестве thread pool'а. RxJava предоставляет свою абстракцию для таких нужд - Scheduler. Он определяет методы и правила, которые такой провайдер потоков, как ExecutorService, должен выполнять.
Большинство стандартных шедулеров могут быть получены из статического класса-фабрики Schedulers. Когда отработает onComplete(), операция высвободит ресурсы и поток вернётся обратно в свой пул, где он может быть сохранён и переиспользован другим Observer'ом.
Содержит фиксированное количество thread'ов, основанное на доступных процессорах для текущей сессии Java. Это делает данный шедулер более подходящим для выполнения вычислительных задач(таких, как математические, алгоритмические, сложные логические), которые могут занимать ядро чуть более чем полностью. К тому же, нет смысла выделять больше потоков, чем количество ядер, чтобы проделать такого рода работу.
Если не уверен, какой шедулер выбрать, пикай computation(), не прогадаешь.
Такие операции, как interval(), delay(), timer(), timeout(), buffer(), take(), skip(), takeWhile(), skipWhile() и некоторые другие использует этот шедулер по умолчанию.
Задачи по вводу-выводу, такие, как запись и чтение в БД, запросы в сеть - менее затратны для CPU и чаще всего занимают сравнительно немного времени, чтобы выполниться. Это значит, что можно более свободно создавать thread'ы и Schedulers.io() отлично для этого подходит. Он будет обеспечивать работу стольких потоков, сколько нужно. Этот шедулер поддерживает динамическое увеличение/уменьшение пула потоков и их кэширование.
Каждый новый subscription будет вызывать создание нового thread'а.
Schedulers.newThread() вернёт Scheduler, у которого нет пула потоков. Он будет создавать новый поток для каждого нового Observer и уничтожать этот поток, как только тот отработает. Эта логика отличается от Schedulers.io() тем, что данный шедулер не будет пытаться закэшировать и сохранить потоки для переиспользования.
Это может быть полезно, когды мы хотим создать поток и сразу уничтожить его после решения поставленной задачи. Использовать следует аккуратно, чтобы не крашнуть приложение.
Когда вы хотите обеспечить поочерёдное выполнение задачи на одном потоке, вам нужен этот шедулер. У него под капотом реализована однопоточная логика, что часто полезно для цикличных ивентов. Он может пригодиться, чтобы изолировать потоко-небезопасную работу.
Observable.just("We", "love", "Putin")
.subscribeOn(Schedulers.single());Это интересный шедулер. Он про незамедлительное выполнение, но предотвращает случаи рекурсивной работы, когда задача шедулит другую задачу на том же самом потоке. Вместо того, чтобы выдать stack overflow error, этот шедулер позволит сначала завершиться текущей задаче, и только потом начнёт выполнение следующей.
Можно запилить свой собственный шедулер поверх стандартного ExecutorService. Такой финт ушами понадобится в случае, если надо более тонко контролировать потоки, их выполнение и вот это вот всё. К примеру, нам нужен шедулер, который будет использовать 20 thread'ов. Можно создать ExecutorService с заданным количеством потоков и подать его в Schedulers.from():
int numberOfThreads = 20;
ExecutorService executor =
Executors.newFixedThreadPool(numberOfThreads);
Scheduler scheduler = Schedulers.from(executor);
Observable.just("Check", "this", "out")
.subscribeOn(scheduler)
.doFinally(executor::shutdown)
.subscribe(System.out::println);ExecutorService будет держать вашу приложуху живой до конца времён, поэтому надо вручную говорить ему о завершении. Вот почему в примере вызыван executor::shutdown.
Каждый дефолтный шедулер лениво инстанциируется, когда вы в первый раз его используете. Можно высвободить computation(), io(), newThread(), single(), io() в любое время, вызвав метод shutdown() у каждого, или можно освободить сразу все через Schedulers.shutdown(). Это остановит все выполняющиеся потоки и запретит выполнение новых задач. Чтобы возобновить работу стандартных шедулеров, надо будет вызывать Schedulers.start().
В десктопных и мобильных приложениях start и stop шедулеров не так актуален.
С помощью оператора subscribeOn() определяется исходный Observable и выбирается шедулер, который будет запускать операции на одном из своих потоков. Эмишены будут сделаны прямиком в этом потоке аж до самого Observer'а (если не указан observeOn). В случае, если для источника никакого шедулера ещё не указано, будет выбран тот, который мы задали в subscribeOn()(имеется в виду, что ранее нигде не был задан subscribeOn). Может быть поставлен в любом месте в чейне, RxJava сама найдёт исходный обзёрвабл(пробежится вверх по цепочке) и закрепит его работу за указанным потоком. Для ясности рекомендуется ставить как можно ближе к источнику.
Наличие нескольких Observer'ов приведёт к тому, что эмишены будут произведены в разных потоках, и каждый из них получит свои данные в отдельном потоке(ну или будет ждать, пока соотв.поток освободится и переключится на него).
В каком именно потоке
Observerполучает свои ништяки можно понять с помощью стандартногоThread.currentThread()и т.п.
Если вдруг нам понадобилось, чтобы всех обслуживал один поток, то следует применить мультикаст. Тут важно, чтобы оператор subscribeOn() стоял ДО мультикаста.
Надо знать, что некоторым обзёрваблам пофиг на шедулер, который вы задаёте в subscribeOn(). К примеру, Observable.interval() будет использовать Schedulers.computation(). Однако можно передать ему шедулер в качестве третьего аргумента.
Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread());Ещё один ньюанс: если вы задали(ну а вдруг) множество subscribeOn'ов в чейне, то выиграет тот, что ближе к источнику(обзёрваблу).
В заключении автор отмечает, что subscribeOn(), в целом, определяет, какой шедулер будет использован обзёрваблом для своих эмишенов.
Как уже было сказано, subscribeOn() определяет, в каком потоке будут выпущены элементы из Observable'а. А вот для того, чтобы перехватить эти элементы и перенаправить их на другой поток, вплоть до конечного Observer'а, нам понадобится observeOn().
В отличие от subscribeOn, которому пофиг, где его ставят в чейне, с observeOn "просто поставлю его здесь, вдруг прокатит" сработает, только если удача сегодня на вашей стороне. Выполнение всех процессов, стоящих выше по цепочке, он оставит на дефолтном шедулере (либо на том, который указан в subscribeOn()), а все последующие операции будут проведены уже на другом Scheduler'е.
Observable.just("blablabla")
.subscribeOn(Schedulers.io())
.flatMap(..)
.observeOn(Schedulers.computation())
.filter(..)
.subscribe(..);В примере выше flatMap() отработает на Schedulers.io(), а вот filter() будет запушен уже в Schedulers.computation().
Можно применять для того, чтобы производить разные операции на различных шедулерах, тем самым лучше задействуя CPU. Достигается путём многократного использования observeOn()в чейне.
Когда дело доходит до того, чтобы пилить приложения с UI выясняется, что для этого самого UI выделяют отдельный поток, в котором собственно и происходят изменения в интерфейсе. Пользователю совсем не в кайф смотреть, что приложение фризится по нажатию на кнопки (как этого добиться, мы с вами прекрасно знаем).
Спасибо RxJava, тут она нас буквально спасает. С её помощью можно любой UI-event переключить в другой поток, проделать все необходимые вычисления и выдать обратно. Для разных платформ были придуманы специальные шедулеры (RxAndroid, RxJavaFX, RxSwing_).
В примере про JavaFX автор говорит о том, что в случае с UI источниками
subscribeOn()не имеет никакого воздействия на чейн, надо проверить.
JavaFxObservable.actionEventsOf(refreshButton)
.observeOn(Schedulers.io())
.flatMapSingle(a ->
Observable.fromArray(/* запрос в сеть*/)
.observeOn(JavaFxScheduler.platform())
.subscribe(list -> /*обновить данные в View*/);В данном примере, после нажатия на кнопку, ивент переключится на IO шедулер, получит из сети свои данные и пульнёт их обратно в JavaFX'овский поток.
Существуют детали насчёт этого оператора, о которых лучше б знать, особенно, когда дело доходит до производительности, страдающей из-за отсутствия backpressure (разговор об этом будет в главе Flowables and Backpressure).
Допустим у нас есть две операции А и Б. Про операторы пока думать не будем, сейчас не о том. Если между ними нет никакого observeOn(), чейн будет передавать эмишены строго один-в-единицу-времени от источника до операции А,потом до операции Б, а затем до Observer'а. Даже если обозначен subscribeOn(), чейн не будет выпускать новый эмишн, пока текущий не долетит до Observer'а.
Такое положение дел меняется, когда мы добавляем observeOn(). Поставим его между операцией А и Б. В таком случае чейн не будет дожидаться, пока эмишн долетит до обзёрвера, а выпустит новый эмишн сразу после того, как текущий долетит до оператора observeOn(). Это фактически значит, что источник эмишнов и операция А могут производить элементы быстрее, чем операция Б способна обработать. Классическая ситуация - производитель выпускает больше, чем потребитель может съесть. В таком случае эмишены будут застревать в observeOn(), образуя тем самым очередь, что приведёт к проблемам с памятью.
Вот почему надо использовать Flowable, когда надо иметь дело с большим количеством элементов. Этот тип обзёрваблов поддерживает Backpressure, которая общается с ним и говорит притормозить в таких случаях.
Параллельным вычислением можно назвать любую многопоточную систему, но в рамках изучения RxJava под этим понятием мы будем иметь в виду одновременную обработку нескольких эмишенов для заданного Observable. Если обзёрвабл эмитит 1000 элементов, логично было бы обрабатывать за одну единицу времени 8 элементов вместо одного. Это существенно ускорит работу. Но мы ведь с вами помним, что для поддержки многопоточности, RxJava эмитит по одному элементу (если бы мы попытались запушить сразу восемь, то всё бы к чертям поломалось).
Тут вроде как вырисовываются некие ограничения, но RxJava на самом деле даёт самым умным, ловким и смелым парням тулзы и средства, чтобы строить реально крутые многопоточные приложухи. Действительно, мы не можем одновременно пушить много элементов в рамках одного Observable, но никто не мешает нам иметь много обзёрваблов, каждый из которых будет работать на своём потоке и эмитить элементы вместе с остальными! Секрет достижения parallelization таится в операторе flatMap().
Observable.range(1, 10)
.map(i -> /*что-то делается за секунду*/)
.subscribe(..);Выполнение этого примера займёт (сколько бы вы думали!?) 10 секунд, потому как каждый эмишн обрабатывается один-за-другим.
flatMap() смёрджит несколько обзёрваблов, полученных из эмишенов, даже если они concurrent. Давайте обернём каждый инт в Observable.just(), подпишемся на правильном шедулере и посмотрим, что будет:
Observable.range(1, 10)
.flatMap(i -> Observable.just(i)
.subscribeOn(Schedulers.computation())
.map(i2 -> /*что-то делается за секунду*/))
.subscribe(..)А будет то, что теперь этот чейн выполнится значительно быстрее, чем за 10 секунд(computation шедулер раскидает задачи по ядрам, так что скорость выполнения будет быстрее кратно количеству ядер). Мы реализовали мать его concurrency!
Что же мы сделали, давайте разберём. Мы обернули каждый выпущенный инт в Observable, сказали выполнятся каждому из них на computation шедулере, а затем предприняли какую-то операцию, которая выполняется за секунду.
flatMap()выпустит эмишены в один поток, что соответствует контрактуObservable, который требует, чтобы эмишены оставались сериализоваными. Ловкость флэтмапа в том, что он не проделывает никакой лишней работы с многопоточностью и ничего не блокирует для того, чтобы выполнять подобные операции. Если поток уже пушит элементы изflatMap()'а вниз по цепочке кObserver'у, любые другие потоки, которые ждут своей очереди, просто напросто заэмитят свои данные в этот исполняемый поток.
Не факт, что приведённый пример является оптимальным решением. Создание обзёрвабла для каждого выпущенного ранее элемента может привести к нежелательному оверхеду. Существуют более изящные способы работы с параллельными вычислениями. Если мы хотим избежать создание обзёрваблов для эмишенов, можно разбить исходный Observable на несколько, а потом объединить их с помощью flatMap(). У автора, например, восьмиядерный проц. Почему бы не разбить источник на восемь частей?
Это можно реализовать с помощью groupBy(). Если у нас 8 ядер, то можно использовать значения от 0 до 7 в качестве ключа. Это позволит нам создать 8 GroupedObservables, которые разделят эмишены на 8 потоков. GroupedObservable не обязательно связаны с subscribeOn(), поэтому автор реализовывает многопоточность с помощью observeOn(). Так как с количеством потоков мы определились, то можно заюзать newThread() шедулер, но мы впилим обычный io():
int coreCount = Runtime.getRuntime().availableProcessors();
AtomicInteger assigner = new AtomicInteger(0);
Observable.range(1,10)
.groupBy(i -> assigner.incrementAndGet() % coreCount)
.flatMap(grp -> grp.observeOn(Schedulers.io())
.map(i2 -> /*какая-то работа*/))
.subscribe(..)Каждый эмишн будет сгруппирован по номеру(ранее энкриментированному). Как только число достигнет семи, группировка опять начнёт с 0. Это гарантирует, что эмишены будут распределены самым оптимальным образом.
Все изменения переменных, находящихся за пределами чейна, должны быть проделаны потокобезопасно. Именно поэтому мы используем
AtomicInteger, который предоставляет соотв.методы.
Последний оператор про многопоточность. Иногда бывает так, что ликвидация Observable'а - достаточно дорогая операция (зависит от природы этого обзёрвабла).
Это может привести к тому, что поток, который вызывает dispose(), затупит и будет высвобождаться дольше, чем моментально. Такой затуп, в свою очередь, приведёт к нежелательным фризам(ну а вдруг у нас dispose() вызван из UI-потока) и мало ли ещё чему, за что мы потом получим по шее. На помощь приходит unsubscribeOn(), в котором вариант задать нужный нам шедулер.
Можно ставить в любом месте в чейне.
Что бы мы делали без заботливого Thomas'а Nield'а(автора книги), который не советует пихать этот оператор везде, где только можно! Спасибо, Томас, мы учтём, что это полезно только для толстых и затратных операций!
Можно ставить несколько в чейне. Всё, что обозначено вверх от оператора(вплоть до другого
unsubscribeOn()), будетdisposeиться с указанным в нём шедулером.
Вкратце, что было в главе:
subscribeOn()определяет шедулер для вышестоящегоObservable, в котором тот будет пушить свои эмишены.observeOn()поменяет thread эмишенов на поток из другого шедулера.unsubscribeOn()- для случаев, когда высвобождение потока - боль.
Нередко возникают ситуации, когда обзёрвабл эмитит быстрее, чем Observer может обработать. В основном это происходит, когда мы впиливаем concurrency, и различные операторы начинают работать в разных Scheduler'ах. Проблемы начинаются там, где эмишены собираются в очереди на выполнение в медленных операторах.
Лучшим решением таких ботлнеков (от англ. bottleneck - горлышко бутылки) будет использование Flowable, которые не сильно отличаются от Observable, только в части поддержки backpressure. Но, во-первых, не каждый источник подвержен backpressure(например, Observable.interval()), а во-вторых - об этом позже в отдельной главе.
Есть инструменты, позволяющие бороться с повышенной частотой эмитов. Об них и будет говориться в этой главе.
Оператор buffer() соберёт эмишены в пачку и выпустит каждую пачку в виде листа или другой коллекции. Скоуп (граница) может быть задан в виде фиксированного размера для буфера, временного окна или отрезков из эмишенов, определённых другим Observable.
Самая простая версия buffer()'а. Принимает в качестве аргумента count - размер конечной пачки эмишенов. Если надо собрать в пачки по восемь штук, к примеру, тупо пишем .buffer(8).
Вторым аргументом принимает коллекцию, в которую мы хотим собирать элементы. buffer(8, HashSet::new).
Чтобы сделать свой код чуточку повеселее, можно передать инт в качестве второго аргумента. Это скажет buffer()'у, какой элемент надо пропускать, прежде чем включать буферизацию: buffer(8, 3) будет пропускать каждый третий элемент. Если обозначить skip<buffer size, то получится что-то вроде аккумулирующего буффера, который будет отображать последние элементы типа: .buffer(2, 1) : [1,2], [2,3], [3,4] и т.д.
Аналогично обычному buffer(), только принимает TimeUnit'ы в качестве аргументов. Единственное, о чём стоит упомянуть: буффер, работающий на времени, оперирует на computation шедулере.
Самый мощный buffer(), который принимает другой Observable в качестве аргумента, описывающего границы буферизации. Когда этот обзёрвабл что-то эмитит, buffer() воспринимает это как сигнал к буферизации. Нагляднее показать это на примере:
Observable<Long> cutOffs = Observable.interval(1, TimeUnit.SECONDS);
Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i+1) * 300)
.buffer(cutOffs)
.subscribe(System.out::prinln);
>output:
[300, 600, 900]
[1200, 1500, 1800]
[2100, 2400, 2700]
[3000, 3300, 3600, 3900]Оператор window() чень похож на buffer(), за одним только исключением - он складывает элементы в другие обзёрваблы, а не коллекции. На выходе получается Observable<Obsevable<T>>, который эмитит обзёрваблы. Каждый такой обзёрвабл закэширует эмишены для каждого скоупа, а затем выдаст их, как только на него подпишутся. Это позволяет работать с эмишенами сразу же, как только они становятся доступны (что выгоднее, чем ждать, пока сформируются и заэмитятся коллекции).
Этот оператор удобно использовать, когда нужно проделать какие-то трансформации над пачками элементов.
В нижеприведённом примере мы возмём пачку стрингов, применим оператор window(), чтобы провести эмишены через буфер. Только теперь они будут обёрнуты в другие Observable'ы, с которыми мы можем что-нибудь проделать в реактивном стиле.
Observable.range(1, 50)
.window(8)
.flatMapSingle(obs -> obs.reduce("", (total, next) ->
total + (total.equals("") ? "" : "|") + next))
.subscribe(System.out::println)
>output:
1|2|3|4|5|6|7|8
9|10|11|12|13|14|15|16
17|18|19|20|21|22|23|24
25|26|27|28|29|30|31|32
33|34|35|36|37|38|39|40
41|42|43|44|45|46|47|48
49|50Принимает значение skip так же, как и предыдущий оператор buffer().
Самые догадливые поняли, что этот вариант основан на времени, идём дальше.
см. buffer(), который принимает другой обзёрвабл. Аналогично.
В отличие от операторов buffer() и window(), которые собирают эмишены в пачки и выпускают дальше, throttle() напротив, пропускает какие-то из элементов. Это полезно, когда частый выпуск эмишенов нежелателен или не несёт никакой пользы.
Чтобы лучше понять принцип действия, возьмём в качестве примера три обзёрвабла Observable.interval(). Первый будет эмитить каждые 100 миллисекунд, второй - каждые 300 и третий - 2000. Возьмём десять элементов из первого, три из второно и два из третьего. Для того, чтобы создать источник, который будет быстро эмитить элементы с чередующимися интервалам, используем Observable.concat():
Observable<String> source1 = Observable.interval(100, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 100) //спамили в затраченное время
.map(i -> "SOURCE 1: + i)
.take(10);
Observable<String> source2 = Observable.interval(300, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 300)
.map(i -> "SOURCE 2: + i)
.take(3);
Observable<String> source3 = Observable.interval(2000, TimeUnit.MILLISECONDS)
.map(i -> (i + 1) * 2000)
.map(i -> "SOURCE 3: + i)
.take(3);
Observable.concat(source1, source2, source3)
.subscribe(System.out::println);
/*не забыли усыпить главный поток*/
>output:
SOURCE 1: 100
SOURCE 1: 200
SOURCE 1: 300
...
SOURCE 1: 1000
SOURCE 2: 300
SOURCE 2: 600
SOURCE 2: 900
SOURCE 3: 2000
SOURCE 3: 4000Сначала первый обзёрвабл быстро пушит свои 10 эмишенов за секунду, потом отрабатывает второй и затем третий кое-как отдаёт свои элементы за 4 секунды. Давайте юзанём throttle(), чтобы взять лишь некоторые из этих элементов и заигнорить остальные.
throttleLast()(идентичен sample()) выпустит последний элемент, выпущенный за задданый период времени. Выпускать получившиеся элементы будет с той же частотой.
Observable.concat(source1, source2, source3)
.throttleLast(1, TimeUnit.SECONDS)
.subscribe(Sustem.out::println);
>output:
SOURCE 1: 900
SOURCE 2: 900
SOURCE 3: 2000Ну парни, не буду расжёвывать.
Чтобы описать прицип работы этого оператора, автор приводит в пример сцену из типичного голливудского боевика. Пока по главному герою выпускают рожок, тот стоит за урытием и ничего не предпринимает. Спустя некоторого затишья, он высовывается и начинает палить в ответ. То есть эти операторы ждут, пока источник перестанет выпускать эмишены, и начинают работать только спустя некоторое время "затишья".
Observable.concat(source1, source2, source3)
.throttleWithTimeout(1, TimeUnit.SECONDS)
.subscribe(System.out::prinln);
>output:
SOURCE 2: 900
SOURCE 3: 2000
SOURCE 3: 4000
900 эмишн из второго источника был последним элементом перед тем, как стартанул третий. Он попал в консольный принт потому, что пауза между эмишенами, последующая за этим элементом, была больше секунды. Аналогичным образом отработали элементы 2000 и 4000.
То есть мы тем самым говорим чейну: "Чувак, подожди, пока какое-то время ничего не будет происходить и возьми последний элемент, который был выпущен перед этим затишьем" (прим.автора)
debounce()(==throttleWithTimeout()) - эффективный способ обработать излишне производительные источники (такие, как тупые юзеры, клацающие по кнопкам со скоростью бешенной обезьяны). Единственный недостаток: эти операторы задержат нужный нам эмишн на определённое время. Особо нетерпеливые пользователи заметят некоторую задержку, нам же надо убедиться, что эти говнюки перестали что-то вводить, верно? Так вот, чтобы избежать такой задержки, можно использовать switchMap(), о котором речь пойдёт чуть позже.
Есть такая мощная приблуда, как switchMap(). Её использование очень похоже на работу с flatMap(), но с некоторыми оговорками. Она будет эмитить последний Observable из последнего эмишена и высвобождать выполнение всех остальных. Другими словами, switchMap() позволяет вам отменять выпуск Observable'ов и переключиться на новый, предотвращая тем самым ненужные вычисления.
Допустим, у нас есть обзёрвабл, который эмитит пять стрингов. Причём делает это так, что каждый стринг выстреливает с какой-то задержкой.
Observable<String> items = Observable.just("Раз", "Два", "Три", "Четыре", "Пять");
// задержим каждый стринг, чтобы показать какие-то вычисления
Observable<String> processedStrings = items.concatMap(s ->
Observable.just(s)
.delay(/*рандомное число*/,
TimeUnit.MILLISECONDS));
processedStrings.subscribe(System.out::println);
>output:
Раз
Два
Три
Четыре
ПятьТакой пример отработает по времени так, как ему вздумается.
А нам, допустим, надо повторять эту работу каждые 5 секунд. При чём так, чтобы все элементы, выпущенные позднее - дропались (и у них при этом отработал dispose()). Это довольно просто сделать с помощью switchMap(). Создадим обзёрвабл, который эмитит каждые пять секунд. Затем, используем на него switchMap() к обзёрваблу, который хотим обработать (в данном случае processedStrings). Каждые пять секунд эмишн, идущий в switchMap() высвободит выполняемый на данный момент Observable, к которому он мапится.
// продолжение предыдущего примера
Observable.interval(5, TimeUnit.SECONDS)
.switchMap(i -> processedStrings.doOnDispose(() -> /*напечатали в консоль*/)
.subscribe(/*напечатали в консоль*/);В итоге в консоли каждые пять секунд будет написано сообщение о том, что такой-то processedStrings отработал dispose(). Помимо этого туда будут печататься наши "Раз", "Два", "Три", "Четыре" в зависимости от того, кто из них успел заэмититься за эти пять секунд. После каждого сообщения о dispose() счёт будет начинаться сначала.
Ешё раз, switchMap() полезен, когда надо отсечь выполнение ненужной работы. Это чаще всего полезно при работе с UI, где быстрые действия юзера создают бессмысленные запросы в сеть/базу и т.п. Для того, чтобы отменить такие запросы и заменить из на выполнение новой задачи, используйте switchMap().
К примеру, ввод в EditText порождает запрос в сеть. В отличие от
debounce(), который привнесёт некоторый затуп по отправлению запроса в сеть,switchMap()просто отменит предыдущие запросы (рассуждения автора конспекта)
Для того, чтобы switchMap() работал эффективно, надо проследить, чтобы поток, который пушит в него элементы, не был занят в самом операторе. Это значит, что вам стоит использовать observeOn() или subscribeOn внутри оператора switchMap(). Если операции в свитчмапе дорогие, и dispose() занимает много времени, то неплохо бы вспомнить об unsubscribeOn().
Прикольный трюк: чтобы предотвратить выполнение какой-то бесконечной или очень затратной работы без старта новой - вернуть Observable.empty(). Автор книги приводит классный пример секундомера на JavaFX:
...
ToggleButton startStopButton = new ToggleButton();
// мультикаст для toggleButton'овского состояния true/false
Observable<Boolean> selectedStates = JavaFxObservable.valuesOf(
startStopButton.selectedProperty())
.publish()
.autoconnect(2);
// Если использовать `switchMap()` с состоянием кнопки, это будет говорить
// чейну, когда запускать `Observable.interval()`, а когда применять к нему
// `dispose()`, переключаясь при этом на эмишн пустого обзёрвабла.
selectedStates.switchMap(selected -> {
if (selected) {
return Observable.interval(1,
TimeUnit.MILLISECONDS);
} else {
return Observable.empty();
})
.observeOn(JavaFxScheduler.platform())
.map(Object::toString)
.subscribe(label::setText());
// Изменим текст кнопки в зависимости от её состояния
selectedStates.subscribe(selected -> startStopButton.setText(
selected ? "STOP" : "START"));
...Кусок кода описывает примерно такое приложение-секундомер:
Нажатие кнопки будет запускать и останавливать секундомер, который отображается в миллисекундах. Обратите внимение, что ToggleButton эмитит Boolean True/False через обзёрвабл selectedStates. Мы применяем к этому обзёрваблу мультикаст для того, чтобы предотвратить дублирование JavaFX'овских listener'ов, ну и потому, что у нас два Observer'а. Первый обзёрвер использует switchMap()'у, которая будет переключаться между Observable.interval() и Observable.empty() в зависимости от значения Boolean. Так как Observable.interval() работает на computation шедулере, мы должны вернуть результаты его работы обратно в мэйн поток JavaFX, для этого обозначили правильный observeOn(). Второй Observer просто меняет название кнопки на "START"/"STOP".
В завершении этой главы мы попробуем применить всё, что в ней было освещено на примере с нажатием клавиш. Достаточно распространённый кейс (к примеру, юзер вводит что-то в поиск и нам надо сразу же переключиться на элемент с наиболее совпадающим именем). Задача: сгруппировать эти нажатия в String'и без всякой задержки.
В качестве примера опять будет взят JavaFX с RxJavaFX.
...
Label typedTextLabel = new Label("");
// Мультикаст, применённый к нажимаемым клавишам
Observable<String> typedLetters = JavaFxObservable.eventsOf(
scene, KeyEvent.KEY_TYPED)
.map(KeyEvent::getCharacter)
.share();
// Сигнализирование того, что 300 мс ничего не происходило
Observable<String> restSignal = typedLetters.throttleWithTimeout(
300, TimeUnit.MILLISECONDS)
.startWith("");
// Теперь триггерим `switchMap()` на каждый период затишья для того, чтобы
// объединить ранее нажатые клавиши в один стринг с помощью `scan()`
restSignal.switchMap(s -> typedLetters.scan("", (rolling, next) ->
rolling + next))
.observeOn(JavaFxScheduler.platfomr())
.subscribe(s -> typedTextLabel.setText(s));
...Когда мы что-то печатаем, Label отображает саккумулированный String. Обратите внимание: если втечение 300 мс ничего не происходит, ввод сбрасывается и эмититься новый scan() с высвобождением предыдущего. Новый скан начинается с "". Такой приём может быть очень полезен в случае, когда нужно мгновенно искать что-то по запросу, выдавая предположения с автокомплитом.
Разберём чуть подробнее, как это работает. У нас есть один Observable, который эмитит нажатие клавиш. К нему применён мультикаст, так как этот обзёрвабл использован два раза. Сначала он нужен для создания другого Observable, сигнализируещого о том, что вот уже 300 мс ничего не вводится с клавиатуры. Затем мы используем к этому сигнализирующему обзёрваблу switchMap(), передавая в него всё тот же клавишный обзёрвабл - так мы бесконечно группируем результативные нажатия в стринги с помощью scan() до тех пор, пока он не заdispose()'ится спустя 300 мс безактивности. Затем выстрелит новый scan().
Короче говоря switching, throttling, windowing and buffering нужны в тех случаях, когда backpressure не работает. В частности, когда мы имеем дело с юзерским вводом.
В предыдущей главе мы разобрали каким образом можно перехаватить быстро выпускаемые элементы и объеденить/пропустить их. Но для некоторых случаев избыточного выпуска эмишенов лучшим решением будет притормозить сам выпуск до той степени, чтобы обзёрвер успевал их обрабатывать. Такая возможность более известна, как backpressure или flow control. Реализована в Flowable.
По ходу книги автор не раз подчёркивал, что природа Observable'а основана на пушах. Если убрать всякую многопоточность, то всё, что останется - это синхронные пуши элементов, один-за-другим, от Observable к Observer'у. Это происходит в сериализованном и последовательном flow(потоке).
Как только добавляются всякие observeOn()'ы, параллельные выполнения и прочие операторы типа delay(), наш чейн становится асинхронным. Это значит, что источники эмишенов и их потребители работают на разных потоках, а следовательно, их работа может быть не согласована в части количества произведённых/обработанных эмишенов. Проще говоря, обзёрваблы могут фигачить больше, чем надо. Происходит это потому, что как только эмишн упал в новый шедулер, изначальный Observer более не отвечает за то, что он дойдёт до обзёрвера. Более того, источник будет пушить элементы даже если предыдущие не дошли до пункта назначения.
Я не стал переносить приведённый автором пример, там всё просто. Обзёрвабл, который эмитит кучу интов. Всё это сыпется в обзёрвер, где печатается в консоль. Без
observeOn()всё вроде-бы норм, но с ним получается так, что в консоль печатается медленнее, чем эмитит обзёрвабл.
Вместо того, чтобы лепить костыли с ядерными джавовскими семафорами, просто возьмём Flowable (спасибо, RxJava). Это такой вариант Observable, в котором реализован backpressure.
На
Flowableподписывается неObserver, аSubscriber. Такая цепочкаFlowable-Subscriberдемонстрируетpullлогику, вместоpush. То есть это сабскрайбер говорит "Ок, чувак, давай свои элементы", а не источник, стреляющий их по принципу "Смотри, чё у меня есть, на на на".
Flowable - нормальный такой инструмент, который чётко сработает из-коробки. Нам не нужно определять какие-то особые политики для backpressure или задавать непонятные параметры. Но мы можем создать свой Flowable, если очень надо.
Теперь пробежимся по местам, в которых собственно надо его использовать.
Крайне важно вкуривать, когда использовать Flowable вместо Observable. Из-за того, что он может выкинуть OutOfMemoryError, MissingBackpressureException, использование Flowable может стать болью. Особенно, если источник, получающий эмишены, не имеет никакого backpressure протокола. Более того, Flowable добавляет некоторый оверхед к нашим операциям и срабатывает не так быстро, как Observable.
Используйте Observable, если:
- Вы ожидаете, что за свою жизнь обзёрвабл выпустит несколько эмишенов (менее 10К). Либо, если выпуски эмишенов преривистые и не жирные. В противном случае, когда ожидается овердохрена элементов, над которыми будут производиться пипец-какие-мать-их-операции, лучше взять
Flowable. - Ваши операции вполне себе синхронны и вы пользуетесь простеньким
subscribeOn()вначале чейна (при таком раскладе, как мы помним, процесс остаётся синхронным и его работа происходит в одном потоке от начала и до конца). Однако, ситуация меняется, когда мы зиппуем/комбинируем несколько стримов на разных потоках, или используем всякие тамobserveOn(),interval()сdelay()ями. В таких случаях наши чейны перестают быть синхронными и может так оказаться, что использованиеFlowable- предпочтительнее. - Вы хотите заэмитить действия пользователя, такие как: нажатия на кнопки, выбор элементов, много чего эти чудики ещё могут сделать с нашим приложением. К сожалению, мы не можем сказать юзерам, чтобы они притормозили, поэтому использование
Flowableявляется сомнительным мероприятием.
Используйте Flowable, если:
- Вы имеете дело с реально большим количеством элементов, и есть вероятность того, что эти элементы будут обрабатываться асинхронно.
- Вы хотите эмитить из IO операций, которые поддерживают blocking. Можно без труда контролировать источники данных, которые поочерёдно их перебирают (к примеру
ResultSetв JDBC или файлы) - мы можем приостановить или возобновить их работу тогда, когда нам это нужно. Запросы в сеть иStreamingAPI'шки обычно могут контролировать точное количество возвращаемых данных, поэтому легко сочетаются с backpressure.
В первой эрыксджаве Observable поддерживал backpressure. Причиной, по которой их разделили на два отдельных класса, явилась разница в использовании.
Можно с лёгкостью переключаться между этими двумя источниками, но надо держать ухо в остро - у такого применения есть свои узкие места.
Большинство фабрик из Observable применимы и к Flowable.
А что там с Flowable.interval(), который вроде как должен эмитить с заданной периодичностью? Может ли к нему быть применён backpressure? По идее, если тормознуть выпуск его элементов, то они перестанут быть привязанными к временным отрезкам, что потеряет всякий смысл. Именно поэтому, Flowable.interval() - один из немногих случаев, который приводит к MissingBackpressureException, когда вниз по чейну кому-то вдруг понадобилось притормозить. Для того, чтобы с этим бороться, есть свои операторы, но о них позже.
Как уже было отмечено, в качестве подписчика Flowable исползует Subscriber'а, а не Observer'a. Если передать в subscribe() лямбду (а не Subscriber), то он вернёт Subscription вместо Disposable. Для того, чтобы высвободить сабскрипшн, надо вызывать cancel(). В методе onSubscribe() есть возможность обратиться к текущему Subscription'у и вызвать у него request()(обозначает, что подписчик готов получать элементы).
Самый быстрый способ создать Subscriber - передать лямбду в subscribe(). Всё как с Observable'ом.
Можно, разумеется, запилить своего Subscriber'а с блэкджеком и шл*хами. Придётся только имплементить onNext(), onError() и onComplete()(onSubscribe у него тоже есть). Принцип будет немного отичаться от создания кастомного обзёрвера, так как нам надо будет request()'ить эмишены у Subscription'а.
Простейший способ заимплементить Subscriber'а - сунуть в request() значение Long.MAX_VALUE, что буквально скажет сабскрипшену: "Чувак, а ну ка давай сюда всё, что у тебя там есть!". Даже если операторы будут запрашивать свой собственный backpressure, никакого backpressure между последним оператором и Subscriber'ом не будет. В принципе это норм, так как вышестоящие операторы по-своему справятся с flow.
Если вы хотите, чтобы ваш кастомный Subscriber контролировал backpressure каким-то особенным образом, то придётся микроконтроллить вызовы request(). Завязать их на какой-нибудь AtomicInteger или тому подобное.
Вообще, по ходу конспекта куча примеров была опущена, потому что простыня большая, а смысл на поверхности (иногда и потому, что лень переписывать, как в этот раз).
Когда пилите кастомный Subscriber, помните, что request() не поднимается по всему чейну до самого Flowable, но только до первого оператора.
Ранее в книге был приведён пример создания своего собственного Observable.
Observable<Integer> customObservable = Observable.create(emitter -> {
for(int i=0; i<=100; i++) {
if (emitter.isDisposed()) {
return;
}
emitter.onNext(i);
}
emitter.onComplete();
});Обзёрвабл заэмитит 100 элементов и вызовет onComplete(). Он может быть остановлен с помощью dispose(), вызванного у полученного из subscribe() Disposable'а.
Создание Flowable с помощью фабрики Flowable.create() будет чуть сложнее, потому как надо продумать различные ситауции, в которых надо приостановить выпуск элементов. Обычного цикла for будет недостаточно. Есть утилиты, упрощающие создание кастомных Flowable.
При создании Flowable с помощью Flowable.create() нам необходимо указать BackpressureStrategy в качестве второго аргумента. Сам по себе этот enum ничего магического не сделает. Он всего лишь описывает способ, с которым должен отработать backpressure.
Тут мы используем BackpressureStrategy.BUFFER для того, чтобы поместить эмишены в буфер, как только стрельнёт backpressure:
Flowable<Integer> source = Flowable.create(emitter ->
for(int i=0; i<=100; i++) {
if (emitter.isCancelled()) {
return;
}
emitter.onNext(i);
}, BackpressureStrategy.BUFFER);Такое решение не является оптимальным, так как эмишены будут держаться в бесконечной очереди, и есть вероятность хапнуть OutOfMemoryError. Но так мы хотя бы избежим MissingBackpressureException.
Есть пять стратегий для работы с backpressure:
| BackPressureStratgy | Описание |
|---|---|
| MISSING | Стратегия отсутствует. Нижестоящий чейн должен сам справляться с backpressure overflow. Полезно в сочетании с операторами onBackpressureXXX() |
| ERROR | Сигнализирует о MissingBackpressireException в момент, когда чейн не может справляться с эмишенами. |
| BUFFER | Ставит эмишены в очередь до тех пор, пока Subscriber не будет опять в состоянии их принимать. Может вызвать OutOfMemoryError. |
| DROP | Если чейн не способен принимать элементы, они будут дропаться до тех пор, пока потребитель не очухается. |
| LATEST | Как только хвост чейна вновь сможет обрабатывать эмишены, он получит последний из них |
Превращаем Observable в Flowable и наоборот.
Есть способ применить стратегию backpressure к источнику, который изначально этого не поддерживает. Можно с лёгкостью конвертировать обзёрвабл в Flowable - достаточно ткнуть toFlowable(), который принимает аргумент BackpressureStrategy. Изначальный Observable не будет ничего знать о backpressure, поэтому продолжит пушить свои элементы настолько быстро, насколько он вообще может. А в это время toFlowable() отработает, как фильтрующий прокси для нежелательных эмишеннов.
Observable<Integer> source = Observable.range(1, 1000);
source.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(Schedulers.io())
.subscribe(..);Ещё раз: такая стратегия может привести к
OutOfMemoryError. В реальном мире лучше использоватьFlowable.range()с самого начала. Но иногда может получится так, что придётся иметь дело сObservable.
У Flowable тоже есть метод, делающий из него Observable - toObservable(). Незаменимая штука, когда надо использовать в чейне с обзёрваблами.
Flowable<Integer> integers = Flowable.range(1, 1000)
.subscribeOn(Schedulers.computation());
Observable.just("Раз", "Два", "Три")
.flatMap(s -> integers.map(i -> i + "-" + s).toObservable())
.subscribe(..);Даже после того, как мы обернули фловабл в Observable, он продолжит поддерживать backpressure. Но с тех пор, как он стал обзёрваблом, чейн запросит у него Long.MAX_VALUE элементов. Такой приём подходит, если ниже по цепочке нет никакого concurrency, и не проделывается никакой большой работы.
В общем и целом - если решили использовать Flowable, то стоит его и придерживаться.
Если мы создали Flowable, у которого нет никакой стратегии по работе с backpressure, можно применить таковую с помощью операторов типа onBackPressureЧтоТоТам(). Например, нельзя замедлить эмишены у Flowable.interval(), так как этот источник основан на времени. Но можно впилить что-то типа прокси между ним и Subscriber'ом.
Иногда Flowable могут быть созданы с BackpressureStrategy.MISSING затем, чтобы задать эту стратегию позже.
Данный оператор возьмёт существующий Flowable, у которого нет стратегии, и применит к нему BackpressureStrategy.BUFFER начиная от собственно оператора и далее по цепочке.
У этого метода есть ряд перегруженных вариантов, смотрите JavaDoc. Одним из аргументов может быть вместимость(capacity). Этот аргумент задаёт максимальное количество элементов, которое может быть помещено в буфер. С помощью onOverflowAction описывается действие, выстреливающее в случае, когда достигнут максимум вместимости, и места в буфере больше нет. Вы также можете обозначить BackpressureOverflowStrategy, которая скажет чейну как себя вести в данном случае.
| BackpressureOverflowStrategy | Описание |
|---|---|
| ERROR | Просто выбрасывает ошибку в момент, когда достигнут capacity |
| DROP_OLDEST | Выкидывает самое старое значение в буфере, чтобы освободить место для более нового элемента |
| DROP_LATEST | Дропает более новые значения |
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackPressureBuffer(10,
() -> System.out.println("oberflow!),
BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.io())
.subscribe(i -> {
sleep(5);
System.out.prinln(i);
});
>output:
...
overflow!
overflow!
135
overflow!
overflow!
overflow!
136
overflow!
...
492
overflow!
...Между 136 и 492 большой разрыв и там всё утыкано "overflow!". Это потому, что эмишены были дропнуты из очереди в соответствии со стратегией. Очередь была заполнена и ждала, пока потребитель освободится. Поэтому вновь приходившие эмишены выбпасывались.
Такой же, как и onBackPressureBuffer(), только оставляет себе последний выпущенный элемент. Все остальные элементы, заэмиченные в период, пока потребитель был занят, дропаются.
Данный оператор просто игнорирует эмишены, если Subscriber уже занят. Полезно, когда эмишены признаются лишними (ну, к примеру, какой-то процесс уже запущен, а запрос на его запуск продолжает сыпаться в чейн). Можно передать onDrop лямбду, которая скажет, что же бл*дь делать в такой нелёгкой ситуации. Таким образом, onBackPressureDrop() не выстраивает элементы в очередь, пока потребитель занят. Он их нафиг выкидывает. По-нашему.
Использование стандартной фабрики для создания Flowable избавит нас от головняков с придумыванием стратегий для обработки backpressure. Однако, в случае, если мы создаём свои кастомные источники эмишенов, Flowable.create() или onBackPressureXXX() - компромис между простотой создания и обработкой запросов на backpressure. Они являются быстрым и эффективным способом кэширования или игнора ненужных эмишенов, но такое поведение не всегда желательно. Иногда всё же следует запилить свой велосипед.
RxJava предоставляет достаточно удобный, хорошо абстрагированный инструмент для создания таких Flowable, которые будут реагировать на backpressure сообразно источникам - Flowable.generate(). Он принимает Consumer<Emitter<T>> и лямбду, котороая перегрузит onNext(), onComplete() и onError() для того, чтобы передать в них ивенты каждый раз, когда элемент запрошен из источника.
Перед тем, как использовать Flowable.generate(), попробуйте сначала сделать ваш источник Iterable<T> и передать его в Flowable.fromIterable(). Этот оператор уважает backpressure и с ним во многом будет проще. В противном случае, если вы всё-таки решили пойти во все тяжкие, дёргайте Flowable.generate().
Самый простой вариант Flowable.generate() принимает Consumer<Emitter<T>> и подразумевает, что нет никакого стэйта между выпуском элементов. Такая хрень может быть использована для создания генератора рандомных чисел, который с нежностью поддерживает backpressure:
/* честно, влом было переписывать эту дичь, см.книгу, стр.253 */В RxJava можно запилить собственные операторы с помощью методов compose() и lift(), оба есть и в Observable и Flowable. Чаще всего, желание собрать собственный велосипед будет посещать вас, когда вам вдруг захочется привнести какую-то новую логику в уже существующий оператор. В особенных случаях воспаления мозга, проявляется навязчивая идея сваять с нуля свой собственный оператор. В этой главе речь будет идти о том, как:
- Сочетать новые операторы с уже существующими, используя
compose()иTransformers. - Использовать
to() - Имплементить свои собственные костыли благодаря
lift() - Понять, что такое
RxJava2-ExtrasиRxJava2Extensions
Работая с эриксджавой, вы наверняка захотите переиспользовать какие-то куски Observable или Flowable. Хорошие разрабы находят способы переиспользования ранее написанного кода, и RxJava способствует такому подходу. Для этого в ней есть такие штуки, как ObservableTransformer и FlowableTransformer, которые можно подать в compose().
Мы ранее использовали пример с Google Guava ImmutableList'ом, когда собирали в него значения:
...
.collect(ImmutableList::builder, ImmutableList::add)
.map(ImmutableList.Builder::build)
...Логика, описанная в этих двух строчках, может понадобиться где-нибудь в другом месте. Чтобы не копипастить втупую, вспомним, что мы программисты и скомпонуем эти две операции.
Спецом для таких дел был релизован ObservableTransformer<T,R>, который принимает Observable<T> в apply() и пробрасывает вниз по чейну другой Observable<R>. В нашей имплементации мы можем вернуть чейн , который добавляет все необходимые операторы к изначальному OBservable'у.
Для примера с гуавовским листом, мы возьмём дженерик T для данного Obsevable<T>, а R'ом будет ImmutableList<T>, заэмиченный из Observable<ImmutableList<T>>. Поместим эту логику в ObservableTransformer<T,ImmutableList<T>>:
public static <T> ObservableTransormer<T, ImmutableList<T>> toImmutableList() {
return new ObservableTransformer<T, ImmutableList<T>>() {
@Override
public ObservableSource<ImmutableList<T>>
apply(Observable<T> upstream) {
return upstream.collect(ImmutableList::<T>builder,
ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toObservable(); // переводим Single в обзёрвабл
}
};
}Так как collect() возвращает сингл, надо перевести его в Observable - сигнатура метода обязывает.
С лямбдой выглядит немного симпатичнее:
public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
return upstream -> upstream.collect(ImmutableList::<T>builder,
ImmutableList.Builder::add)
.map(ImmutableList::build)
.toObservable();
}Для того, чтобы заюзать такой трансформер, надо передать его в compose(). Теперь его логику можно переиспользовать, шах и мат, грёбаные копипастеры!
Не стремайтесь хранить Transformers в фабричных классах, это норм. compose(GuavaTransformers.toImmutableList()) - и камон.
Для данного примера
toImmutableList()можно было реализовать в виде переиспользуемого синглтона, так как он не принимает никаких параматров.
Пример с стрингами:
public static void main(String[] args) {
Observable.just("Вариант1", "Вариант2", "Вариант3")
.compose(joinToString("/"))
.subscribe(System.out::println);
}
public static ObservableTransformer<String, String>
joinToString(String separator) {
return upstream -> upstream.collect(StringBuilder::new,
(b, s) -> {
if (b.length() == 0) {
b.append(s);
} else {
b.append(sepparator)
.append(s);
}
})
.map(Stringbuilder::toString)
.toObservable();
}
>output:
Вариант1/Вариант2/Вариант3Аналогично ObservableTransformer, только нужно организовать поддержку backpressure. В предыдущем примере просто меняем на Flowable и toObservable() -> toFlowable().
Избегайте передачи стэйта в трансформеры.
Хороший способ выстрелить себе в ногу - передавать какое-то состояние в трансформеры. Любите сайд-эффекты, нетестируемость кода и дебажные брэйкпоинты? Как можно чаще передавайте стэйт в Transformers!
Если нет, то избегайте передачи mutable переменных в разные subscriptions. Если всё-таки надо держать какой-то стэйт в чейне, то можно использовать .defer(), который создаст новую переменную для каждого подписчика:
static <T> ObservableTransformer<T, IndexedValue<T>> withIndex() {
return upstream -> Observable.defer( () -> {
AtomicInteger indexer = new AtomicInteger(-1);
return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(),
v));
});
}В целом избавьтесь от мыслей о том, чтобы добавлять в реактивщину изменяемые состояния и сайд-эффекты. Лучше не шарить mutable объекты между подписчиками, только если бизнес не требует писать что-то, что может в любой момент сломаться.
Оператор to() нужен для того, чтобы конвертить rx-ные штуки в что-то нереактивное. Принимает Function<Observable<T>, R>
Кусок из примера про JavaFX:
Binding<String> binding = Observable.interval(1, TimeUnit.SECONDS)
.map(i -> i.toString())
.observeOn(JavaFxScheduler.platform())
.to(JavaFxObserver::toBinding);
// И теперь можем пользоваться в рамках JavaFX
label.textProperty().bind(binding);На случай, если вдруг кто столкнулся с тем, что Transformer'ов не хватает, есть ObservableOperator и FlowableOperator, с помощью которых можно запилить свои собственные операторы.
Но автор всё-таки советует сначала изучить возможности из-под-капотной RxJava. По его мнению даже поход на Stackoverflow и изучение вопроса - лучше, чем костылить свой велосипед. Комьюнити у эрыксджавы достаточно себе большое и отзывчивое.
Обратите внимние, в
RxJava2Extensions(David Karnok) иRxJava2-Extras(Dave Moten) есть крутые трансформеры и операторы. Посмотрите эти либы, прежде чем обеспечивать себе rx-геморрой.
Автор столько раз советовал не делать этого, что я решил пропустить всё, что связано с созданием своих операторов. Если кто упарывается по таким вещам, смотрите источник, мужики, ну его в жопу.
/* пропущено */
Тут идея в том, что всё то же самое есть и для Single, Maybe и Completable. Если вдруг у них чего-то не хватает, переведите источник в toObservable() или toFlowable(). Создание трансформеров тоже не сильно отличается, просто возвращаемый тип меняется в соотв. с требованиями.
Если вы заинтересованы в том, чтобы добавить крутости стандартной эриксджаве, то рекомендуем ознакомиться с RxJava2-Extras и RxJava2Extensions.
Вот, например,toListWhile() и collectWhile() - пара полезных операторов. Будут складывать в коллекцию элементы, соответствующие определённым условиям.
Стоит потратить своё время на эти две либы, чтобы не изобретать то, что уже есть. Автору, вот например, нравится cache(), который может самоочищаться и переподписываться на эмишены.
Тестирование реактивщины на первый взгляд может показаться непростым. Это так потому, что RxJava - про поведение, а не про состояние. И всё же с этим можно что-то поделать. Будем использовать JUnit.
Также в этой главе будет выделено место для дебага. Он перестаёт быть таким однозначным при использовании эриксджавы: стэктрэйсы менее информативны и не совсем понятно, куда ставить брэйкпоинты. Но при правильном подходе всё будет норм. Появление проблем становится довольно линейным и их поиск не составляет большого труда.
Чё будет:
blockingSubscribe()- блокирующие операторы
TestObserverиTestSubdcriberTestScheduler- стратегии дебага
/* Подозреваю, что подключить JUnit - не проблема, пропускаю */
Почти во всех примерах в книге были использованы искусственные способы замораживания главного потока. Делалось это для того, чтобы rx-чейн успевал отработать до того, как приложуха завершит своё выполнение.
В случае с юнит-тестами часто выходит так, что один тест должен выполниться прежде, чем стартанёт другой. Чтобы избежать путаницы с разными потоками, надо блокировать выполнение теста перед тем, как отработает Observable или Flowable.
AtomicInteger hitCount = new AtomicInteger();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
.take(5);
source.blockingSubscribe(i -> hitcount.incrementAndGet());
assertTrue(hitcount.get() == 5);blockingSubscribe() - самый простой оператор для тестирования вашей реактивной логики. Однако, есть более элегантные инструменты.
Убедитесь, что ваш чейн не является бесконечным, прежде чем впиливать
blockingSubscribe(). Также не рекомендуется использовать его в боевом коде - это может негативно сказаться на реактивности вашей приложухи.
Есть целый набор операторов, которые ещё не обсуждались. Так называемые blocking operators(блокирующие операторы) - прокси между реактивным миром и скучной императивщиной. Они блокируют выполнение thread'a до тех пор, пока не отработает rx-чейн.
Использование блокирующих операторов в коде порождает анти-паттерны, так что атата.
Для полноценного тестирования также понадобятся TestSubscriber и TestObserver, о которых будет сказано позже.
Используется вместо subscribe() или blockingSubscribe() и возвращает конкретный элемент T, а не Observable<T>. Блокирует поток до тех пор, пока чейн не вернёт первый элемент. Бросает ошибку, если до него не докатилось ни одного эмишена. Аналогично работает и blockingSingle().
У Maybe и Single нет blockingFirst(), потому как они подразумевают 0 <= количество элементов <= 1. Логично, что подобный оператор будет возвращать единственный элемент. Называется он blockingGet().
/* Д - дедукция */
Интересный блокирующий оператор, который возвращает эмишены в виде Iterable<T>. Итератор, предоставленный этим Iterable<T> будет блокировать итерирующий поток до тех пор, пока не будет доступен следующий элемент. Итерация завершится onComplete().
Observable<String> source = Observable.just("Vote", "for", "Grudinin");
Iterable<String> allWithLengthFour = source.filter(s -> s.length() == 4)
.blockingIterable();
for (String s: allWithLengthFour) {
assertTrue(s.length() == 4);
}blockingIterable() поставит в очередь все полученные элементы до тех пор, пока Iterator не сможет их обработать. Может привести к OutOfMemoryException.
Может быть полезным для работы с работы с джавовскими стримами(гляньте RxJava2Jdk8Interop) или котлиновскими sequence.
Блокирует поток, вызывающий чейн, для каждого элемента: он будет ждать, пока выстрелит каждый эмишн. Так, для предыдущего примера можно написать:
source.filter(s -> s.length() == 4)
.blockingForEach(s -> assertTrue(s.length() == 4));
Вернёт Iterable<T> и будет блочить метод next() у каждого запроса к итератору. Эмишены перед первым и после последнего вызовов next() будут проигнорированы.
Запрашивает последний выпущенный элемент.
Какая-то дичь ненужная.
Настоящие rx-ниндзи используют всю силу реактивщины, даже в тестировании. Им недостаточно просто заблочить исполняющий поток до тех пор, пока не отработает чейн. Они хотят прописать логику для onNext(), onError(), onComplete().
Они познали TestObserver и TestSubscriber - их тестовые rx-сюрикены. Первый для Observable источников, второй - для Flowable.
Методы TestObserver:
| Метод | Описание |
|---|---|
assertNotSubscribed() |
Проверка на отсутствие подписчиков |
subscribe() |
Обычный subscribe() |
assertSubscribed() |
Проверка на наличие подписчиков |
awaitTerminalEvent() |
Блокировка и ожидание, пока отработает Observable |
assertComplete() |
Отработал ли onComplete() |
assertNoErrors() |
Не было ли ошибок |
assertValueCount(N) |
Проверка на количество полученных значений == N |
assertValues(0L, 1L, 2L) |
Полученные значения: OL, 1L, 2L |
Это немногие методы из арсенала тестового обзёрвера. Большинство из них возвращает TestObserver, поэтому можно составлять целые тестовые чейны.
Использование
TestObserverиTestSubscriberпредпочтительнее использования блокирующих операторов - тестирование проходит с учётом жизненного цикла обзёрваблов и всего спектра ивентов, которые могут произойти в чейне.
Если у нас много чейнов, основанных на времени, то их тестирование с помощью блокирующих операторов может стать слишком долгим. TestScheduler придуман для того, чтобы симмулировать нужные временные отрезки. С его помощью можно как бы промотать вперёд выполнение чейна, чтобы узнать результат его работы.
В примере ниже у нас есть обзёрвабл, который эмитит 90 элементов за 90 минут. Вместо того, чтобы ждать его выполнение полтора часа, мы используем необходимый тестовый обзёрвер:
TestScheduler testScheduler = new TestScheduler();
TestObserver<Long> testObserver = new TestObserver<>();
// Скажем нашему обзёрваблу, чтобы он работал на правильном шедулере
Observable<Long> minuteTicker = Observable.interval(1, TimeUnit.MINUTES,
testScheduler);
minuteTicker.subscribe(testObserver);
// Мотанём шедулер вперёд
testScheduler.advanceTimeBy(90, TimeUnit.MINUTES);
// Тест пройдёт нормально
testObserver.assertValueCount(90);Есть ещё такой оператор, как advanceTimeTo(), который прыгает к точному времени с момента подписки.
Тестовый шедулер не является потокобезопасным, поэтому не стоит его юзать, когда вам нужна реальная многопоточность. При использовании каких-то сложных чейнов, которые работают там-сям, используйте RxJavaPlugins.setComputationScheduler() и аналогичные ему методы для того, чтобы инжектить нужный вам шедулер.
Для дебага достаточно придерживаться определённого подхода, чтобы достаточно быстро и безболезненно находить косяки в чейнах. Используйте doOnNext() сразу после источника эмишенов, чтобы понимать, какие элементы были выпущены дельше в чейн, а какие привели к ошибке. Постепенное продвижение doOnNext() вниз по цепочке рано или поздно даст свои плоды, и вы нащупаете проблемный код.
В IDEA брэйкпоинты отрабатывают и в лябмда-варажениях, поэтому никто не мешает дебажить ими.
Автор приводит достаточно простые примеры по использованию эрыксджавы в Android, поэтому решено пропустить эту главу, в ней ничего особо интересного нет. Разве что перечисление Rx-библиотек для андроида:
- SqlBrite - rx-враппер для SQL
- RxLocation - Reactive Location API
- rx-preferences - Reactive Shared Preferences
- RxFit - 0o
- RxWear
- ReactiveNetwork - реактивно слушает сеть
- ReactiveBeacons - про Bluetooth Low Energy.
Ну ещё автор предупреждает о том, что надо особенное внимение уделить контролю жизненного цикла обзёрваблов, высвобождать их и прочее. В onPause() - диспозить, подписываться заново в onResume(). Неплохо диспозить и в onDestroy() тоже.
Не стоит забывать о мультикасте, когда нужно создать несколько слушателей для одного и того же источника. Делаешь много Observer'ов для одного и того же ивента - юзай мультикастинг.
/* Конпект этой главы откладывается до момента, когда я буду пересаживаться на котёл */



