rx-java
RxJava2 Flowable och abonnent
Sök…
Introduktion
Detta ämne visar exempel och dokumentation med avseende på de reaktiva koncepten för Flowable och Subscribe som introducerades i rxjava version2
Anmärkningar
exemplet behöver rxjava2 som ett beroende, de maven koordinaterna för den använda versionen är:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
producentkonsumentexempel med stöd för mottryck hos producenten
TestProducer från detta exempel producerar Integer inom ett visst intervall och skjuter dem till dess Subscriber . Det utökar Flowable<Integer> -klassen. För en ny prenumerant skapar det ett Subscription vars request(long) metod används för att skapa och publicera heltal.
Det är viktigt för Subscription som skickas till subscriber att request() -metoden som ringer onNext() på abonnenten kan rekursivt anropas från detta onNext() . För att förhindra ett outStandingRequests använder den visade implementeringen outStandingRequests räknaren och isProducing flaggan.
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
Konsumenten i detta exempel utökar DefaultSubscriber<Integer> och vid start och efter att ha konsumerat ett heltal begär det nästa. När du konsumerar heltalvärdena finns det en liten fördröjning, så mottrycket kommer att byggas upp för tillverkaren.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
i följande huvudmetod för en testklass skapas och kopplas upp producenten och konsumenten:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
När exemplet körs visar loggfilen att konsumenten körs kontinuerligt medan producenten bara blir aktiv när den interna flödbara bufferten från rxjava2 måste fyllas på.