サーチ…


構文

  • サブジェクト<T、R> subject = AsyncSubject.create(); //デフォルトのAsyncSubject
  • Subject <T、R> subject = BehaviorSubject.create(); //デフォルトのBehaviorSubject
  • サブジェクト<T、R> subject = PublishSubject.create(); //デフォルトのPublishSubject
  • Subject <T、R> subject = ReplaySubject.create(); //デフォルトのReplaySubject
  • mySafeSubject =新しいSerializedSubject(unSafeSubject); // unsafeSubjectをsafeSubjectに変換する - 一般的にはマルチスレッド対象

パラメーター

パラメーター詳細
T 入力方式
R 出力タイプ

備考

このドキュメントは、 Subjectに関する詳細と説明を提供します。詳細および詳細は、 公式文書をご覧ください。

基本科目

RxJavaのSubjectObservableObserver両方のクラスです。これは基本的にObservableとして機能し、入力を加入者に渡し、 Observerとして別のObservableから入力を得ることを意味します。

Subject<String, String> subject = PublishSubject.create(); 
subject.subscribe(System.out::print);
subject.onNext("Hello, World!"); 

上記の "Hello、World!" Subjectsを使ってコンソールに

説明

  1. コードの最初の行は、 PublishSubject型の新しいSubjectを定義します

    Subject<String, String> subject = PublishSubject.create();
        |     |       |       |                 |
     subject<input, output>  name   = default publish subject
    
  2. 2行目はサブジェクトにサブスクライブし、 Observer動作を示します。

    subject.subscribe(System.out::print);
    

    これにより、 Subjectは通常のサブスクライバのように入力を受け取ります

  3. 3行目では、 onNextメソッドを呼び出し、 Observableビヘイビアを示します。

    subject.onNext("Hello, World!"); 
    

    これにより、 SubjectSubjectにサブスクライブするすべての人に入力を与えることができます。

タイプ

Subject (RxJava内)は、次の4つのタイプのいずれかになります。

  • AsyncSubject
  • 行動課題
  • PublishSubject
  • ReplaySubject

また、 SubjectSerializedSubject型でもSerializedSubjectません。このタイプは、 SubjectObservable Contract (すべての呼び出しをシリアライズする必要があることを指定する)に違反しないことを保証します。

参考文献:

PublishSubject

PublishSubjectは、サブスクリプションの時間に続いてObservableソースによってPublishSubjectされたアイテムのみをObserver送信します。

単純なPublishSubject例:

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

出力:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

上記の例では、 PublishSubjectは時計のように動作するObservableをサブスクライブし、500ミリ秒ごとにアイテム(Long)をPublishSubjectます。出力に見られるように、 PublishSubject 、ソース(から取得バレスに通過clockその加入者(へ) sub1およびsub2 )。

PublishSubjectは、オブザーバーがなくても、オブザーバーが日没するまで1つまたは複数のアイテムが失われる危険性がある、作成されるとすぐにアイテムの発光を開始することができます。

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

出力:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

sub110から始まる値を出力するsub1注意してください。導入された5秒の遅れは、アイテムの損失を引き起こした。これらは再現することはできません。これは本質的にPublishSubjectHot Observableます。

また、オブザーバーがn個のアイテムをPublishSubject後にオブザーバーがPublishSubjectにサブスクライブすると、これらのn個のアイテムこのオブザーバーのために再現できません

以下は、 PublishSubjectの大理石図です

ここに画像の説明を入力

onCompletedは、 ObservableソースのonCompletedが呼び出される前の任意の時点で、購読したすべてのPublishSubjectアイテムを送信します。

Observableソースがエラーで終了した場合、 PublishSubjectは後続のオブザーバにアイテムをPublishSubjectませんが、単にObservableソースからのエラー通知を渡します。

ここに画像の説明を入力

使用事例
特定の会社の株価を監視し、それを要求したすべてのクライアントに転送するアプリケーションを作成するとします。

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

上記の使用例では、 PublishSubjectは、サーバーからwatcher登録しているすべてのクライアントに値を渡すためのブリッジとして機能します。

参考文献:



Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow