Поиск…


Синтаксис

  • Тема <T, R> subject = AsyncSubject.create (); // По умолчанию AsyncSubject
  • Тема <T, R> subject = BehaviorSubject.create (); // Default BehaviorSubject
  • Тема <T, R> subject = PublishSubject.create (); // Default PublishSubject
  • Тема <T, R> subject = ReplaySubject.create (); // По умолчанию ReplaySubject
  • mySafeSubject = новый SerializedSubject (unSafeSubject); // Преобразование unsafeSubject в safeSubject - как правило, для многопоточных объектов

параметры

параметры подробности
T Тип ввода
р Тип выхода

замечания

В этой документации приводятся подробные сведения и пояснения по Subject . Для получения дополнительной информации и дальнейшего ознакомления, пожалуйста, посетите официальную документацию .

Основные темы

Subject в RxJava - это класс, который является Observable и Observer . Это в основном означает, что он может выступать в качестве Observable и передавать входные данные подписчикам и в качестве Observer получать данные от другого наблюдаемого.

Subject<String, String> subject = PublishSubject.create(); 
subject.subscribe(System.out::print);
subject.onNext("Hello, World!"); 

Вышеприведенные отпечатки «Привет, мир!» для консольного использования Subjects .

объяснение

  1. Первая строка кода определяет новый Subject типа PublishSubject

    Subject<String, String> subject = PublishSubject.create();
        |     |       |       |                 |
     subject<input, output>  name   = default publish subject
    
  2. Вторая строка подписывается на объект, отображая поведение Observer .

    subject.subscribe(System.out::print);
    

    Это позволяет Subject принимать входные данные, такие как обычный абонент

  3. Третья строка вызывает метод onNext объекта, отображающий поведение Observable .

    subject.onNext("Hello, World!"); 
    

    Это позволяет Subject давать входные данные всем подписчикам на него.

Типы

Subject (в RxJava) может быть любого из этих четырех типов:

  • AsyncSubject
  • BehaviorSubject
  • PublishSubject
  • ReplaySubject

Кроме того, объект Subject может иметь тип SerializedSubject . Этот тип гарантирует, что Subject не нарушит Договор о наблюдении (который указывает, что все вызовы должны быть сериализованы)

Дальнейшее чтение:

PublishSubject

PublishSubject Observer только те элементы, которые испускаются источником. Observable после времени подписки.

Простой пример PublishSubject :

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

Выход:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

В приведенном выше примере PublishSubject подписывается на Observable который действует как часы, и испускает элементы (Long) каждые 500 миллисекунд. Как видно на выходе, PublishSubject передает значения, которые он получает от источника ( clock ) до его подписчиков ( sub1 и sub2 ).

PublishSubject может запускать испускающие элементы, как только он будет создан, без какого-либо наблюдателя, который рискует потерять один или несколько предметов до тех пор, пока наблюдатель не сможет записаться.

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

Выход:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

Обратите внимание, что sub1 испускает значения, начиная с 10 . Внесенная 5-секундная задержка вызвала потерю предметов. Они не могут воспроизводиться. Это по существу делает PublishSubject Hot Observable .

Также обратите внимание, что если наблюдатель подписывается на PublishSubject после того, как он выпустил n элементов, эти n элементов не могут быть воспроизведены для этого наблюдателя.

Ниже представлена ​​мраморная диаграмма PublishSubject

введите описание изображения здесь

PublishSubject элементы всем, кто подписался, в любой момент времени до onCompleted источника Observable .

Если источник Observable завершается с ошибкой, PublishSubject не будет PublishSubject какие-либо элементы последующим наблюдателям, а просто будет передавать уведомление об ошибке из источника Observable.

введите описание изображения здесь

Случай использования
Предположим, вы хотите создать приложение, которое будет контролировать цены акций определенной компании и перенаправлять их всем клиентам, которые запрашивают ее.

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

В приведенном выше примере использования, PublishSubject действует как мост для передачи значений с вашего сервера всем клиентам, которые подписываются на ваш watcher .

Дальнейшее чтение:

  • PublishSubject javadocs
  • Блог Томаса Нильда (расширенное чтение)


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow