rx-java
RxJava2 Flowable и подписчик
Поиск…
Вступление
В этом разделе приведены примеры и документация в отношении реактивных концепций Flowable и Subscriber, которые были введены в rxjava version2
замечания
для примера нужен rxjava2 как зависимость, координаты maven для используемой версии:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
пример потребителя-производителя с поддержкой противодавления у производителя
TestProducer из этого примера создает объекты Integer в заданном диапазоне и подталкивает их к своему Subscriber . Он расширяет класс Flowable<Integer> . Для нового абонента он создает объект Subscription , метод request(long) используется для создания и публикации значений Integer.
Важно, чтобы Subscription передана subscriber что метод request() который вызывает onNext() на подписчике, может быть рекурсивным образом вызван из этого onNext() . Для того, чтобы предотвратить переполнение стека, показанная реализация использует outStandingRequests счетчик и isProducing флаг.
class TestProducer extends Flowable<Integer> {
static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
final int from, to;
public TestProducer(int from, int to) {
this.from = from;
this.to = to;
}
@Override
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
/** the next value. */
public int next = from;
/** cancellation flag. */
private volatile boolean cancelled = false;
private volatile boolean isProducing = false;
private AtomicLong outStandingRequests = new AtomicLong(0);
@Override
public void request(long n) {
if (!cancelled) {
outStandingRequests.addAndGet(n);
// check if already fulfilling request to prevent call between request() an subscriber .onNext()
if (isProducing) {
return;
}
// start producing
isProducing = true;
while (outStandingRequests.get() > 0) {
if (next > to) {
logger.info("producer finished");
subscriber.onComplete();
break;
}
subscriber.onNext(next++);
outStandingRequests.decrementAndGet();
}
isProducing = false;
}
}
@Override
public void cancel() {
cancelled = true;
}
});
}
}
Потребитель в этом примере расширяет DefaultSubscriber<Integer> и в начале и после использования Integer запрашивает следующий. При потреблении значений Integer существует небольшая задержка, поэтому противодавление будет создано для производителя.
class TestConsumer extends DefaultSubscriber<Integer> {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
protected void onStart() {
request(1);
}
@Override
public void onNext(Integer i) {
logger.info("consuming {}", i);
if (0 == (i % 5)) {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
// can be ignored, just used for pausing
}
}
request(1);
}
@Override
public void onError(Throwable throwable) {
logger.error("error received", throwable);
}
@Override
public void onComplete() {
logger.info("consumer finished");
}
}
в следующем основном методе тестового класса создаются и подключаются производитель и потребитель:
public static void main(String[] args) {
try {
final TestProducer testProducer = new TestProducer(1, 1_000);
final TestConsumer testConsumer = new TestConsumer();
testProducer
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.blockingSubscribe(testConsumer);
} catch (Throwable t) {
t.printStackTrace();
}
}
При запуске примера файл журнала показывает, что потребитель работает непрерывно, в то время как производитель только активируется, когда внутренний буфер Flowable rxjava2 необходимо пополнить.