rx-java
RxJava2 Fließfähig und Abonnent
Suche…
Einführung
Dieses Thema zeigt Beispiele und Dokumentation zu den reaktiven Konzepten von Flowable und Subscriber, die in rxjava Version2 eingeführt wurden
Bemerkungen
Das Beispiel benötigt rxjava2 als Abhängigkeit, die Maven-Koordinaten für die verwendete Version sind:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
Beispiel eines Produzenten-Verbrauchers mit Unterstützung des Gegendrucks beim Hersteller
Der TestProducer aus diesem Beispiel erzeugt Integer Objekte in einem bestimmten Bereich und überträgt sie an seinen Subscriber . Es erweitert die Klasse Flowable<Integer> . Bei einem neuen Abonnenten wird ein Subscription , dessen Methode request(long) zum Erstellen und Veröffentlichen der Integer-Werte verwendet wird.
Für das Subscription , das an den subscriber , ist es wichtig, dass die request() Methode, die onNext() des Abonnenten onNext() aus diesem onNext() Aufruf rekursiv aufgerufen werden kann. Um einen Stapelüberlauf zu verhindern, verwendet die gezeigte Implementierung den Zähler outStandingRequests und das Flag isProducing .
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
Der Consumer in diesem Beispiel erweitert DefaultSubscriber<Integer> und fordert beim Start und nach Verbrauch einer Integer-Anforderung die nächste an. Beim Verbrauch der Integer-Werte tritt eine kleine Verzögerung auf, sodass der Gegendruck für den Hersteller aufgebaut wird.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
In der folgenden Hauptmethode einer Testklasse werden Hersteller und Verbraucher erstellt und verkabelt:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
Beim Ausführen des Beispiels zeigt die Protokolldatei, dass der Consumer kontinuierlich ausgeführt wird, während der Produzent nur dann aktiv wird, wenn der interne fließfähige Puffer von rxjava2 nachgefüllt werden muss.