rx-java
Предметы
Поиск…
Синтаксис
- Тема <T, R> subject = AsyncSubject.create (); // По умолчанию AsyncSubject
- Тема <T, R> subject = BehaviorSubject.create (); // Default BehaviorSubject
- Тема <T, R> subject = PublishSubject.create (); // Default PublishSubject
- Тема <T, R> subject = ReplaySubject.create (); // По умолчанию ReplaySubject
- mySafeSubject = новый SerializedSubject (unSafeSubject); // Преобразование unsafeSubject в safeSubject - как правило, для многопоточных объектов
параметры
| параметры | подробности |
|---|---|
| T | Тип ввода |
| р | Тип выхода |
замечания
В этой документации приводятся подробные сведения и пояснения по Subject . Для получения дополнительной информации и дальнейшего ознакомления, пожалуйста, посетите официальную документацию .
Основные темы
Subject в RxJava - это класс, который является Observable и Observer . Это в основном означает, что он может выступать в качестве Observable и передавать входные данные подписчикам и в качестве Observer получать данные от другого наблюдаемого.
Subject<String, String> subject = PublishSubject.create();
subject.subscribe(System.out::print);
subject.onNext("Hello, World!");
Вышеприведенные отпечатки «Привет, мир!» для консольного использования Subjects .
объяснение
Первая строка кода определяет новый
SubjectтипаPublishSubjectSubject<String, String> subject = PublishSubject.create(); | | | | | subject<input, output> name = default publish subjectВторая строка подписывается на объект, отображая поведение
Observer.subject.subscribe(System.out::print);Это позволяет
Subjectпринимать входные данные, такие как обычный абонентТретья строка вызывает метод
onNextобъекта, отображающий поведениеObservable.subject.onNext("Hello, World!");Это позволяет
Subjectдавать входные данные всем подписчикам на него.
Типы
Subject (в RxJava) может быть любого из этих четырех типов:
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
Кроме того, объект Subject может иметь тип SerializedSubject . Этот тип гарантирует, что Subject не нарушит Договор о наблюдении (который указывает, что все вызовы должны быть сериализованы)
Дальнейшее чтение:
- Использовать или не использовать тему из блога Дейва Секстона
PublishSubject
PublishSubject Observer только те элементы, которые испускаются источником. Observable после времени подписки.
Простой пример PublishSubject :
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
Выход:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
В приведенном выше примере PublishSubject подписывается на Observable который действует как часы, и испускает элементы (Long) каждые 500 миллисекунд. Как видно на выходе, PublishSubject передает значения, которые он получает от источника ( clock ) до его подписчиков ( sub1 и sub2 ).
PublishSubject может запускать испускающие элементы, как только он будет создан, без какого-либо наблюдателя, который рискует потерять один или несколько предметов до тех пор, пока наблюдатель не сможет записаться.
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
Выход:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
Обратите внимание, что sub1 испускает значения, начиная с 10 . Внесенная 5-секундная задержка вызвала потерю предметов. Они не могут воспроизводиться. Это по существу делает PublishSubject Hot Observable .
Также обратите внимание, что если наблюдатель подписывается на PublishSubject после того, как он выпустил n элементов, эти n элементов не могут быть воспроизведены для этого наблюдателя.
Ниже представлена мраморная диаграмма PublishSubject
PublishSubject элементы всем, кто подписался, в любой момент времени до onCompleted источника Observable .
Если источник Observable завершается с ошибкой, PublishSubject не будет PublishSubject какие-либо элементы последующим наблюдателям, а просто будет передавать уведомление об ошибке из источника Observable.
Случай использования
Предположим, вы хотите создать приложение, которое будет контролировать цены акций определенной компании и перенаправлять их всем клиентам, которые запрашивают ее.
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
В приведенном выше примере использования, PublishSubject действует как мост для передачи значений с вашего сервера всем клиентам, которые подписываются на ваш watcher .
Дальнейшее чтение:

