rx-java
RxJava2 Flowable en Subscriber
Zoeken…
Invoering
Dit onderwerp toont voorbeelden en documentatie met betrekking tot de reactieve concepten van Flowable en Subscriber die werden geïntroduceerd in rxjava-versie2
Opmerkingen
het voorbeeld heeft rxjava2 nodig als afhankelijkheid, de maven-coördinaten voor de gebruikte versie zijn:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
producent consument voorbeeld met tegendruk ondersteuning in de producent
De TestProducer uit dit voorbeeld produceert Integer objecten in een bepaald bereik en stuurt deze naar de Subscriber . Het breidt de klasse Flowable<Integer> . Voor een nieuwe abonnee maakt het een object Subscription waarvan de request(long) methode wordt gebruikt om de Integer-waarden te maken en te publiceren.
Het is belangrijk voor de Subscription die aan de doorgegeven subscriber dat de request() methode waarin wordt opgeroepen onNext() op de abonnee recursief kunnen worden opgeroepen vanuit deze onNext() gesprek. Om een outStandingRequests te voorkomen, gebruikt de getoonde implementatie de teller outStandingRequests en de vlag isProducing .
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
De consument breidt in dit voorbeeld DefaultSubscriber<Integer> en vraagt bij het opstarten en na het consumeren van een geheel getal de volgende aan. Bij het consumeren van de Integer-waarden is er een kleine vertraging, zodat de tegendruk wordt opgebouwd voor de producent.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
in de volgende hoofdmethode van een testklasse worden de producent en de consument gecreëerd en aangesloten:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
Bij het uitvoeren van het voorbeeld laat het logbestand zien dat de consument continu draait, terwijl de producent alleen actief wordt wanneer de interne Flowable buffer van rxjava2 opnieuw moet worden gevuld.