Поиск…


Создать наблюдаемый

Существует несколько способов создания Observable в RxJava. Самый мощный способ - использовать метод Observable.create . Но это также самый сложный способ . Поэтому вы должны избегать его использования , насколько это возможно.

Исходящее значение

Если у вас уже есть значение, вы можете использовать Observable.just чтобы исправить ваше значение.

 Observable.just("Hello World").subscribe(System.out::println);

Вычисление значения, которое должно быть вычислено

Если вы хотите испустить значение, которое еще не было вычислено, или что может потребоваться много времени для вычисления, вы можете использовать Observable.fromCallable для испускания следующего значения.

Observable.fromCallable(() -> longComputation()).subscribe(System.out::println);

longComputation() будет вызываться только при подписке на ваш Observable . Таким образом, вычисление будет ленивым .

Альтернативный способ испускать значение, которое должно быть вычислено

Observable.defer создает Observable же, как Observable.fromCallable но используется, когда вам нужно вернуть Observable вместо значения. Это полезно, когда вы хотите управлять ошибками в своем вызове.

Observable.defer(() -> {
          try {
                return Observable.just(longComputation());
          } catch(SpecificException e) {
                return Observable.error(e);
          }).subscribe(System.out::println);

Горячие и холодные наблюдения

Наблюдения широко классифицируются как Hot или Cold , в зависимости от их поведения выбросов.
« Cold Observable - это тот, который начинает излучать по запросу (подписке), тогда как « Hot Observable - это тот, который испускает независимо от подписки.

Холодный наблюдаемый

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

Вывод вышеуказанного кода выглядит (может варьироваться):

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

Обратите внимание: несмотря на то, что sub2 запускается с опозданием, он получает значения с самого начала. В заключение, Cold Observable только испускает предметы по запросу. Несколько запросов запускают несколько конвейеров.

Горячие наблюдаемые

Примечание. Горячие наблюдаемые значения излучают независимо от отдельных подписей. У них есть своя временная шкала, и события происходят независимо от того, слушает кто-то или нет.

Cold Observale может быть преобразована в Hot Observable с простой publish .

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish возвращает ConnectableObservable который добавляет функции для подключения и отключения от наблюдаемого.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

Вышеизложенное дает:

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

Обратите внимание, что даже несмотря на то, что sub2 начинает наблюдение в последнее время, он синхронизируется с sub1 .
Отключение немного сложнее! Отключение происходит в Subscription а не в Observable .

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

Вышеизложенное дает:

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

При отключении Observable существу «завершается» и перезапускается при добавлении новой подписки.

EventBus Hot Observable может использоваться для создания EventBus . Такие EventBuses, как правило, легкие и супер быстрые. Единственным недостатком RxBus является то, что все события должны быть вручную реализованы и переданы в шину.



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow