수색…
통사론
- Subject <T, R> subject = AsyncSubject.create (); // 기본 AsyncSubject
- Subject <T, R> subject = BehaviorSubject.create (); // 기본 BehaviorSubject
- Subject <T, R> subject = PublishSubject.create (); // 기본 PublishSubject
- Subject <T, R> subject = ReplaySubject.create (); // 기본 ReplaySubject
- mySafeSubject = 새로운 SerializedSubject (unSafeSubject); // unsafeSubject를 safeSubject로 변환합니다 - 일반적으로 멀티 스레드 주제
매개 변수
| 매개 변수 | 세부 |
|---|---|
| 티 | 입력 유형 |
| 아르 자형 | 출력 유형 |
비고
이 문서는 Subject 에 대한 세부 사항과 설명을 제공합니다. 자세한 정보 및 추가 정보는 공식 문서를 참조하십시오.
기본 과목
RxJava의 Subject 는 Observable 과 Observer 클래스입니다. 이것은 기본적으로 Observable 역할을 할 수 있고 입력을 가입자에게 전달할 수 있고 Observer 는 다른 Observable로부터 입력을받을 수 있음을 의미합니다.
Subject<String, String> subject = PublishSubject.create();
subject.subscribe(System.out::print);
subject.onNext("Hello, World!");
위의 그림은 "Hello, World!"입니다. Subjects 사용하여 콘솔에.
설명
코드의 첫 번째 행은
PublishSubject유형의 새Subject를 정의합니다.Subject<String, String> subject = PublishSubject.create(); | | | | | subject<input, output> name = default publish subject두 번째 줄은 주제를 구독하여
Observer행동을 보여줍니다.subject.subscribe(System.out::print);Subject는 일반 가입자와 같은 입력을받을 수 있습니다.세 번째 줄은
onNext메서드를 호출하여Observable비헤이비어를 보여줍니다.subject.onNext("Hello, World!");이것에 의해,
Subject는,Subject를 구독하고있는 모든 사람들에게 입력을 실시 할 수가 있습니다.
유형
Subject (RxJava에서)는 다음 네 가지 유형 중 하나 일 수 있습니다.
- AsyncSubject
- 행동 과제
- PublishSubject
- ReplaySubject
또한 Subject 는 SerializedSubject 유형일 수 있습니다. 이 유형은 Subject 가 Observable Contract (모든 호출이 직렬화되어야 함을 지정 함)에 위배되지 않도록합니다.
추가 읽기 :
- Dave Sexton의 블로그에서 주제를 사용하거나 사용하지 않으려면
PublishSubject
PublishSubject 는 구독 시간 이후에 Observable 소스에 의해 생성 된 항목 만 Observer 에 방출합니다.
간단한 PublishSubject 예제 :
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
산출:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
위의 예제에서 PublishSubject 는 시계처럼 작동하는 Observable 을 구독하고 500 밀리 초마다 항목 (Long)을 방출합니다. 출력에서 볼 수 있듯이 PublishSubject 는 소스 ( clock )에서 구독자 ( sub1 및 sub2 )로 전달되는 값을 전달합니다.
PublishSubject 는 관찰자없이 항목이 생성되는 즉시 관찰을 시작할 수 있습니다. 그러면 관찰자가 햇볕을받을 때까지 하나 이상의 항목이 손실 될 위험이 있습니다.
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
산출:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
sub1 은 10 부터 시작하는 값을 방출합니다. 소개 된 5 초 지연으로 인해 항목이 손실 되었습니다. 이것들은 재생산 될 수 없습니다. 이것은 본질적으로 PublishSubject 를 Hot Observable 만듭니다.
또한 관찰자가 n 개의 항목을 방출 한 후에 PublishSubject 등록하면이 n 개의 항목을이 관찰자에게 재현 할 수 없습니다 .
아래는 PublishSubject 의 대리석 다이어그램입니다.
PublishSubject 는 Observable 소스의 onCompleted 가 호출되기 전에 구독 한 모든 항목에 항목을 방출합니다.
Observable 원본이 오류로 종료되면 PublishSubject 는 후속 관찰자에게 항목을 방출하지 않지만 Observable 원본의 오류 알림을 전달합니다.
사용 사례
특정 회사의 주가를 모니터링하고이를 요청한 모든 고객에게 전달할 응용 프로그램을 만들고 싶다고 가정 해보십시오.
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
위의 예제 사용 사례에서 PublishSubject 는 서버의 값을 watcher 를 구독하는 모든 클라이언트로 전달하는 브리지 역할을합니다.
추가 읽기 :

