rx-java
наблюдаемый
Поиск…
Создать наблюдаемый
Существует несколько способов создания Observable в RxJava. Самый мощный способ - использовать метод Observable.create . Но это также самый сложный способ . Поэтому вы должны избегать его использования , насколько это возможно.
Исходящее значение
Если у вас уже есть значение, вы можете использовать Observable.just чтобы исправить ваше значение.
Observable.just("Hello World").subscribe(System.out::println);
Вычисление значения, которое должно быть вычислено
Если вы хотите испустить значение, которое еще не было вычислено, или что может потребоваться много времени для вычисления, вы можете использовать Observable.fromCallable для испускания следующего значения.
Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);
longComputation() будет вызываться только при подписке на ваш Observable . Таким образом, вычисление будет ленивым .
Альтернативный способ испускать значение, которое должно быть вычислено
Observable.defer создает Observable же, как Observable.fromCallable но используется, когда вам нужно вернуть Observable вместо значения. Это полезно, когда вы хотите управлять ошибками в своем вызове.
Observable.defer(() -> {
try {
return Observable.just(longComputation());
} catch(SpecificException e) {
return Observable.error(e);
}).subscribe(System.out::println);
Горячие и холодные наблюдения
Наблюдения широко классифицируются как Hot или Cold , в зависимости от их поведения выбросов.
« Cold Observable - это тот, который начинает излучать по запросу (подписке), тогда как « Hot Observable - это тот, который испускает независимо от подписки.
Холодный наблюдаемый
/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2
Вывод вышеуказанного кода выглядит (может варьироваться):
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0 -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2
Обратите внимание: несмотря на то, что sub2 запускается с опозданием, он получает значения с самого начала. В заключение, Cold Observable только испускает предметы по запросу. Несколько запросов запускают несколько конвейеров.
Горячие наблюдаемые
Примечание. Горячие наблюдаемые значения излучают независимо от отдельных подписей. У них есть своя временная шкала, и события происходят независимо от того, слушает кто-то или нет. Cold Observale может быть преобразована в Hot Observable с простой publish .
Observable.interval(500, TimeUnit.MILLISECONDS)
.publish(); // publish converts cold to hot
publish возвращает ConnectableObservable который добавляет функции для подключения и отключения от наблюдаемого.
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Вышеизложенное дает:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
Обратите внимание, что даже несмотря на то, что sub2 начинает наблюдение в последнее время, он синхронизируется с sub1 .
Отключение немного сложнее! Отключение происходит в Subscription а не в Observable .
ConnectableObservable<Long> hot = Observable
.interval(500, TimeUnit.MILLISECONDS)
.publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription
System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();
Вышеизложенное дает:
sub1, 0 -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2 -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting -> reconnect after unsubscribe
sub1, 0
...
При отключении Observable существу «завершается» и перезапускается при добавлении новой подписки.
EventBus Hot Observable может использоваться для создания EventBus . Такие EventBuses, как правило, легкие и супер быстрые. Единственным недостатком RxBus является то, что все события должны быть вручную реализованы и переданы в шину.