サーチ…
構文
- サブジェクト<T、R> subject = AsyncSubject.create(); //デフォルトのAsyncSubject
- Subject <T、R> subject = BehaviorSubject.create(); //デフォルトのBehaviorSubject
- サブジェクト<T、R> subject = PublishSubject.create(); //デフォルトのPublishSubject
- Subject <T、R> subject = ReplaySubject.create(); //デフォルトのReplaySubject
- mySafeSubject =新しいSerializedSubject(unSafeSubject); // unsafeSubjectをsafeSubjectに変換する - 一般的にはマルチスレッド対象
パラメーター
| パラメーター | 詳細 |
|---|---|
| T | 入力方式 |
| R | 出力タイプ |
備考
このドキュメントは、 Subjectに関する詳細と説明を提供します。詳細および詳細は、 公式文書をご覧ください。
基本科目
RxJavaのSubjectはObservableとObserver両方のクラスです。これは基本的にObservableとして機能し、入力を加入者に渡し、 Observerとして別のObservableから入力を得ることを意味します。
Subject<String, String> subject = PublishSubject.create();
subject.subscribe(System.out::print);
subject.onNext("Hello, World!");
上記の "Hello、World!" Subjectsを使ってコンソールに
説明
コードの最初の行は、
PublishSubject型の新しいSubjectを定義しますSubject<String, String> subject = PublishSubject.create(); | | | | | subject<input, output> name = default publish subject2行目はサブジェクトにサブスクライブし、
Observer動作を示します。subject.subscribe(System.out::print);これにより、
Subjectは通常のサブスクライバのように入力を受け取ります3行目では、
onNextメソッドを呼び出し、Observableビヘイビアを示します。subject.onNext("Hello, World!");これにより、
SubjectはSubjectにサブスクライブするすべての人に入力を与えることができます。
タイプ
Subject (RxJava内)は、次の4つのタイプのいずれかになります。
- AsyncSubject
- 行動課題
- PublishSubject
- ReplaySubject
また、 SubjectはSerializedSubject型でもSerializedSubjectません。このタイプは、 SubjectがObservable Contract (すべての呼び出しをシリアライズする必要があることを指定する)に違反しないことを保証します。
参考文献:
- Dave SextonのブログからSubjectを使用するか使用しないか
PublishSubject
PublishSubjectは、サブスクリプションの時間に続いてObservableソースによってPublishSubjectされたアイテムのみをObserver送信します。
単純なPublishSubject例:
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
出力:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
上記の例では、 PublishSubjectは時計のように動作するObservableをサブスクライブし、500ミリ秒ごとにアイテム(Long)をPublishSubjectます。出力に見られるように、 PublishSubject 、ソース(から取得バレスに通過clockその加入者(へ) sub1およびsub2 )。
PublishSubjectは、オブザーバーがなくても、オブザーバーが日没するまで1つまたは複数のアイテムが失われる危険性がある、作成されるとすぐにアイテムの発光を開始することができます。
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
出力:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
sub1は10から始まる値を出力するsub1注意してください。導入された5秒の遅れは、アイテムの損失を引き起こした。これらは再現することはできません。これは本質的にPublishSubjectをHot Observableます。
また、オブザーバーがn個のアイテムをPublishSubject後にオブザーバーがPublishSubjectにサブスクライブすると、これらのn個のアイテムはこのオブザーバーのために再現できません 。
以下は、 PublishSubjectの大理石図です
onCompletedは、 ObservableソースのonCompletedが呼び出される前の任意の時点で、購読したすべてのPublishSubjectアイテムを送信します。
Observableソースがエラーで終了した場合、 PublishSubjectは後続のオブザーバにアイテムをPublishSubjectませんが、単にObservableソースからのエラー通知を渡します。
使用事例
特定の会社の株価を監視し、それを要求したすべてのクライアントに転送するアプリケーションを作成するとします。
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
上記の使用例では、 PublishSubjectは、サーバーからwatcher登録しているすべてのクライアントに値を渡すためのブリッジとして機能します。
参考文献:

