rx-java
RxJava2 유동 및 가입자
수색…
소개
이 항목에서는 rxjava version2에서 소개 된 Flowable 및 Subscriber의 사후 대응 개념과 관련된 예제 및 문서를 보여줍니다.
비고
이 예제에서는 rxjava2가 종속성으로 필요하며 사용 된 버전의 관리 좌표는 다음과 같습니다.
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
생산자에서 배압을 지원하는 생산자 소비자 사례
이 예제의 TestProducer 는 주어진 범위에서 Integer 개체를 생성하고 Subscriber 개체로 푸시합니다. Flowable<Integer> 클래스를 확장합니다. 새 구독자의 request(long) 메서드를 사용하여 Integer 값을 만들고 게시하는 Subscription 개체를 만듭니다.
그것은 중요하다 Subscription 에 전달되는 subscriber request() 메소드 호출 onNext() 가입자에 재귀이 내에서 호출 할 수 있습니다 onNext() 호출. 스택 오버 플로우를 방지하기 위해, 도시 된 구현은 사용 outStandingRequests 카운터와 isProducing 플래그.
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
이 예제에서 Consumer는 DefaultSubscriber<Integer> 를 확장하고 DefaultSubscriber<Integer> 를 사용하면 시작 및 다음에 다음 요청을 요청합니다. Integer 값을 소비 할 때 약간의 지연이 있기 때문에 배압은 생산자를 위해 구성됩니다.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
테스트 클래스의 다음 주요 메소드에서 생산자와 소비자가 생성되고 유선화됩니다.
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
예제를 실행할 때 로그 파일은 소비자가 계속 실행되는 것을 보여 주지만 rxjava2의 내부 Flowable 버퍼가 다시 채워질 필요가있을 때만 생성자가 활성화됩니다.