Поиск…


Вступление

Противодавление происходит, когда в конвейере обработки Observable некоторые асинхронные этапы не могут обрабатывать значения достаточно быстро и требуют способа замедлить работу восходящего производителя.

Классический случай необходимости противодавления заключается в том, что производитель является горячим источником:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

В этом примере основной поток будет производить 1 миллион элементов для конечного потребителя, который обрабатывает его на фоновом потоке. Вероятно, метод compute(int) занимает некоторое время, но накладные расходы цепной цепи Observable также могут увеличивать время, затрачиваемое на обработку элементов. Тем не менее, производящий поток с циклом for не может этого знать и продолжает onNext .

Внутри асинхронные операторы имеют буферы для хранения таких элементов до тех пор, пока они не будут обработаны. В классических Rx.NET и ранних RxJava эти буферы были неограниченными, что означает, что они, вероятно, будут содержать почти все 1 миллион элементов из примера. Проблема начинается, когда есть, например, 1 миллиард элементов или одна и та же 1 миллионная последовательность, которая появляется 1000 раз в программе, что приводит к OutOfMemoryError и, как правило, замедлению из-за чрезмерной нагрузки на OutOfMemoryError .

Подобно тому, как обработка ошибок стала первоклассным гражданином и получила операторов для работы с ней (через операторы onErrorXXX ), противодавление - это еще одно свойство потоков данных, о которых программист должен думать и обрабатывать (через операторы onBackpressureXXX ).

Помимо вышеперечисленного PublishSubject существуют другие операторы, которые не поддерживают противодавление, в основном из-за функциональных причин. Например, interval оператора interval испускает значения, что приводит к сдвигу в периоде относительно настенных часов.

В современной RxJava большинство асинхронных операторов теперь имеют ограниченный внутренний буфер, например, как observeOn выше, и любая попытка переполнения этого буфера завершает всю последовательность с помощью MissingBackpressureException . В документации каждого оператора есть описание противодавления.

Однако противодавление присутствует более тонко в обычных холодных последовательностях (которые не дают и не должны давать MissingBackpressureException ). Если первый пример переписан:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

Нет ошибки, и все работает плавно при использовании небольшой памяти. Причиной этого является то, что многие операторы источников могут «генерировать» значения по требованию, и, таким образом, observeOn за оператором. Можно сказать, что range генерирует не более чем столько значений, observeOn буфер observeOn может удерживать сразу без переполнения.

Это согласование основано на концепции компьютерной науки о совместных подпрограммах (я называю вас, вы называете меня). Оператор range посылает обратный вызов, в форме осуществления Producer интерфейса, к observeOn путем вызова (внутренний Subscriber «ов) setProducer . В свою очередь, observeOn вызывает Producer.request(n) со значением, чтобы сообщить range которому разрешено создавать (т. onNext it), что многие дополнительные элементы. Именно тогда ответственность observeOn заключается в observeOn , чтобы вызвать метод request в нужное время и с правильным значением, чтобы сохранить поток данных, но не переполняться.

Выражение противодавления в конечных потребителях редко необходимо (поскольку они синхронны по отношению к их непосредственному восходящему и обратному давлению, естественно, происходит из-за блокировки стека вызовов), но может быть легче понять его работу:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Здесь реализация onStart указывает range для создания своего первого значения, которое затем принимается в onNext . После завершения compute(int) другое значение запрашивается из range . В наивной реализации range такой вызов рекурсивно вызывает onNext , что приводит к StackOverflowError что, конечно, нежелательно.

Чтобы предотвратить это, операторы используют так называемую логику батутинга, которая предотвращает такие повторные вызовы. В терминах range он будет помнить, что был request(1) когда он вызывал onNext() и как только onNext() возвращает, он сделает другой раунд и вызовет onNext() со следующим целочисленным значением. Поэтому, если они меняются, пример по-прежнему работает одинаково:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

Однако это не так для onStart . Хотя инфраструктура Observable гарантирует, что она будет вызываться не более одного раза на каждого Subscriber , запрос на request(1) может сразу вызвать эмиссию элемента. Если у вас есть логика инициализации после запроса на request(1) который необходим для onNext , у вас могут быть исключения:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

В этом синхронном случае onStart NullPointerException будет onStart немедленно, пока выполняется onStart . Более тонкая ошибка возникает, если запрос на request(1) вызывает асинхронный вызов onNext в другом потоке и чтение name в onNext гонках, записывающих его в onStart post request .

Таким образом, нужно выполнить всю инициализацию поля в onStart или даже до этого и request() последним. Реализации request() в операторах обеспечивают надлежащее выполнение - до отношения (или, в других отношениях, освобождения памяти или полного забора), когда это необходимо.

Операторы onBackpressureXXX

Большинство разработчиков сталкиваются с противодавлением, когда их приложение терпит неудачу с MissingBackpressureException и исключение обычно указывает на оператор observeOn . Фактической причиной обычно является не- PublishSubject использование PublishSubject , timer() или interval() или пользовательских операторов, созданных с помощью create() .

Существует несколько способов решения таких ситуаций.

Увеличение размеров буфера

Иногда такие переполнения происходят из-за взрывоопасных источников. Внезапно пользователь слишком быстро observeOn экран и observeOn за внутренним буфером 16-элементного стандарта по умолчанию на переполнениях Android.

Большинство чувствительных к давлению операторов в последних версиях RxJava теперь позволяют программистам указывать размер своих внутренних буферов. Соответствующие параметры обычно называются bufferSize , prefetch или capacityHint . Учитывая переполненный пример во введении, мы можем просто увеличить размер буфера для observeOn чтобы иметь достаточно места для всех значений.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Обратите внимание, однако, что в общем случае это может быть только временное исправление, поскольку переполнение может все же произойти, если источник перепроизводит размер предсказанного буфера. В этом случае можно использовать один из следующих операторов.

Группировка / пропускание значений со стандартными операторами

В случае, если исходные данные могут быть обработаны более эффективно в пакетном режиме, можно уменьшить вероятность MissingBackpressureException с использованием одного из стандартных операторов MissingBackpressureException обработки (по размеру и / или по времени).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Если некоторые из значений можно безопасно игнорировать, можно использовать выборку (со временем или другим наблюдаемым) и операторы дросселирования ( throttleFirst , throttleLast , throttleWithTimeout ).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Обратите внимание на то, что эти операторы только снижают скорость приема значений по нисходящему потоку и, следовательно, могут по-прежнему приводить к MissingBackpressureException .

onBackpressureBuffer ()

Этот оператор в своей безпараметрической форме повторно вводит неограниченный буфер между исходным источником и оператором нисходящего потока. Быть неограниченным означает, что пока JVM не исчерпывает память, он может обрабатывать практически любую сумму, поступающую из разрывающегося источника.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

В этом примере observeOn идет с очень низким размером буфера, но отсутствует MissingBackpressureException как onBackpressureBuffer впитывает все 1 миллион значений и передает небольшим партиям его для observeOn .

Обратите внимание, однако, что onBackpressureBuffer потребляет свой источник неограниченным образом, то есть без применения к нему противодавления. Это приводит к тому , что даже противодавлению поддерживающего источник , таким как range будет полностью реализован.

Есть 4 дополнительных перегрузки onBackpressureBuffer

onBackpressureBuffer (int capacity)

Это ограниченная версия, которая сигнализирует BufferOverflowError в том случае, если ее буфер достигает заданной емкости.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Уместность этого оператора уменьшается, поскольку все больше и больше операторов теперь позволяют устанавливать размеры своих буферов. В остальном это дает возможность «расширить свой внутренний буфер», имея большее число с onBackpressureBuffer чем их значение по умолчанию.

onBackpressureBuffer (int capacity, Action0 onOverflow)

Эта перегрузка вызывает (совместное) действие в случае переполнения. Его полезность довольно ограничена, поскольку нет другой информации о переполнении, чем текущий стек вызовов.

onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflow.Strategy strategy)

Эта перегрузка на самом деле более полезна, поскольку позволяет определить, что делать, если емкость была достигнута. BackpressureOverflow.Strategy - это интерфейс на самом деле, но класс BackpressureOverflow предлагает 4 статических поля, реализация которых представляет собой типичные действия:

  • ON_OVERFLOW_ERROR : это поведение по умолчанию предыдущих двух перегрузок, сигнализирующее об BufferOverflowException
  • ON_OVERFLOW_DEFAULT : в настоящее время это то же самое, что и ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : если произойдет переполнение, текущее значение будет просто проигнорировано, и только старые значения будут доставлены после запросов нисходящего потока.
  • ON_OVERFLOW_DROP_OLDEST : удаляет самый старый элемент в буфере и добавляет к нему текущее значение.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Обратите внимание, что последние две стратегии вызывают разрыв в потоке по мере удаления элементов. Кроме того, они не будут сигнализировать об BufferOverflowException .

onBackpressureDrop ()

Всякий раз, когда нисходящий поток не готов принимать значения, этот оператор отбрасывает этот элемент из последовательности. Можно подумать об этом как о вместимости 0 onBackpressureBuffer со стратегией ON_OVERFLOW_DROP_LATEST .

Этот оператор полезен, когда можно смело игнорировать значения из источника (например, перемещения мыши или текущие сигналы GPS-местоположения), так как в дальнейшем они будут иметь более современные значения.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

Это может быть полезно в сочетании с interval() оператора источника interval() . Например, если вы хотите выполнить некоторую периодическую фоновую задачу, но каждая итерация может длиться дольше, чем период, можно с уверенностью отказаться от уведомления об избыточном интервале, поскольку будет более поздно:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

Существует одна перегрузка этого оператора: onBackpressureDrop(Action1<? super T> onDrop) где onBackpressureDrop(Action1<? super T> onDrop) (совместно используемое) действие с отбрасываемым значением. Этот вариант позволяет самостоятельно очищать значения (например, освобождать связанные ресурсы).

onBackpressureLatest ()

Конечный оператор сохраняет только последнее значение и практически перезаписывает старые, недопустимые значения. Можно подумать об этом как о варианте onBackpressureBuffer с емкостью 1 и стратегии ON_OVERFLOW_DROP_OLDEST .

В отличие от onBackpressureDrop всегда есть ценность, доступная для потребления, если нисходящий onBackpressureDrop оказался отстающим. Это может быть полезно в некоторых ситуациях, связанных с телеметрией, где данные могут возникать в виде нескольких всплесков, но только самые последние интересны для обработки.

Например, если пользователь нажимает много на экране, мы все равно хотим отреагировать на его последний ввод.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

Использование onBackpressureDrop в этом случае приведет к ситуации, когда последний клик будет сброшен и не позволит пользователю задаться вопросом, почему бизнес-логика не была выполнена.

Создание резервных источников данных

Создание резервных источников данных является относительно простой задачей при обратном противодавлении вообще, потому что библиотека уже предлагает статические методы Observable которые обрабатывают противодавление для разработчика. Мы можем различать два типа заводских методов: холодные «генераторы», которые либо возвращают, либо генерируют элементы, основанные на потреблении вниз по течению и горячих «толкателях», которые обычно соединяют нереактивные и / или невосстановимые источники данных и накладывают некоторое обратное давление на верх их.

просто

Самый основное противодавление известен источник создается с помощью just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

Поскольку мы явно не запрашиваем в onStart , это ничего не печатает. just отлично, когда есть постоянная ценность, которую мы хотели бы начать с начала.

К сожалению, just ошибочно принимают за способ вычислить что-то динамически, чтобы потреблять Subscriber s:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

Удивительно для некоторых, это печатает 1 дважды вместо печати 1 и 2 соответственно. Если вызов переписан, становится очевидным, почему он работает так:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue называется частью основной процедуры, а не в ответ на подписку подписчиков.

fromCallable

То, что действительно нужно людям, это метод fromCallable :

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

Здесь computeValue выполняется только тогда, когда абонент подписывается и для каждого из них печатает ожидаемые 1 и 2. Естественно, fromCallable также правильно поддерживает противодавление и не будет fromCallable вычисленное значение, если не запрошено. Обратите внимание, однако, что вычисления все равно происходят. В случае, если само вычисление должно быть отложено до тех пор, пока на самом деле не потребуются нисходящие потоки, мы можем использовать just с map :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just не будет выдавать свое постоянное значение до тех пор, пока не будет запрошено, когда оно будет сопоставлено с результатом computeValue , все равно будет computeValue для каждого абонента индивидуально.

от

Если данные уже доступны в виде массива объектов, список объектов или любого Iterable источника, соответствующий from перегрузок будет обрабатывать противодавления и излучение таких источников:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

Для удобства (и избежать предупреждений о создании общего массива) есть 2 до 10 аргументов перегруженных к just что внутренне делегируют к from .

The from(Iterable) также дает интересную возможность. Многие генерирующие ценности могут быть выражены в форме государственной машины. Каждый запрошенный элемент запускает переход состояния и вычисляет возвращаемое значение.

Написание таких состояний машин, как Iterable s, несколько сложнее (но все же проще, чем писать Observable для его потребления), и в отличие от C #, Java не имеет никакой поддержки от компилятора для создания таких состояний машин, просто написав классически выглядящий код (с yield return и yield break ). Некоторые библиотеки предлагают некоторую помощь, такие как Google гуавы в AbstractIterable и IxJava в Ix.generate() и Ix.forloop() . Они сами по себе достойны полной серии, поэтому давайте посмотрим на очень простой исходный источник Iterable который бесконечно повторяет некоторое постоянное значение:

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

Если бы мы использовали iterator через классический цикл for, это привело бы к бесконечному циклу. Поскольку мы создаем Observable из этого, мы можем выразить нашу волю потреблять только первые 5 из них, а затем прекратить запрашивать что-либо. Это истинная сила ленивой оценки и вычисления внутри Observable s.

создание (SyncOnSubscribe)

Иногда источник данных, который должен быть преобразован в самый реактивный мир, является синхронным (блокирующим) и pull-like, то есть мы должны вызвать некоторый метод get или read чтобы получить следующий фрагмент данных. Разумеется, можно было бы превратить это в Iterable но когда такие источники связаны с ресурсами, мы можем утечка этих ресурсов, если нисходящий поток не подписывает последовательность до ее окончания.

Для обработки таких случаев RxJava имеет класс SyncOnSubscribe . Его можно расширить, реализовать его методы или использовать один из его основанных на лямбдах методов для создания экземпляра.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

Как правило, SyncOnSubscribe использует 3 обратных вызова.

Первые обратные вызовы позволяют создать состояние для каждого абонента, такое как FileInputStream в примере; файл будет открыт независимо каждому отдельному абоненту.

Второй обратный вызов принимает этот объект состояния и предоставляет выходной Observer чьи методы onXXX могут быть вызваны для испускания значений. Этот обратный вызов выполняется столько раз, сколько запрашивается нисходящий поток. При каждом вызове он должен вызывать onNext не более одного раза, по выбору, либо с помощью onError либо onCompleted . В примере мы вызываем onCompleted() если прочитанный байт отрицателен, указывает и заканчивает файл, и вызывает onError в случае, если read вызывает onError IOException .

Последний обратный вызов активируется, когда нисходящий поток отменяет подписку (закрытие входного потока) или когда предыдущий обратный вызов называется терминальными методами; он позволяет освободить ресурсы. Поскольку не всем источникам нужны все эти функции, статические методы SyncOnSubscribe позволяют создавать экземпляры без них.

К сожалению, многие вызовы методов в JVM и других библиотеках выдают проверенные исключения и должны быть завернуты в try-catch es, поскольку функциональные интерфейсы, используемые этим классом, не позволяют бросать проверенные исключения.

Конечно, мы можем имитировать другие типичные источники, такие как неограниченный диапазон:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

В этой настройке current начинается с 0 а в следующий раз, когда вызывается лямбда, current равен 1 .

Существует вариант SyncOnSubscribe под названием AsyncOnSubscribe который выглядит очень похожим, за исключением того, что средний обратный вызов также принимает длинное значение, которое представляет собой сумму запроса из нисходящего потока, а обратный вызов должен генерировать Observable с такой же длиной. Этот источник затем объединяет все эти Observable в одну последовательность.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

Существует продолжительная (горячая) дискуссия о пользе этого класса и вообще не рекомендуется, поскольку она регулярно нарушает ожидания относительно того, как она действительно будет генерировать эти сгенерированные ценности и как она будет реагировать, или даже какие значения запроса, которые она получит в более сложные потребительские сценарии.

создание (эмиттер)

Иногда источник, который должен быть обернут в Observable , уже горячий (например, перемещение мыши) или холодный, но не протирающийся в его API (такой как асинхронный обратный вызов сети).

Чтобы обработать такие случаи, в последней версии RxJava был введен заводский метод create(emitter) . Он принимает два параметра:

  • обратный вызов, который вызывается с экземпляром интерфейса Emitter<T> для каждого входящего абонента,
  • перечисление Emitter.BackpressureMode которое обязывает разработчика указывать поведение противодавления, которое должно применяться. Он имеет обычные режимы, похожие на onBackpressureXXX в дополнение к MissingBackpressureException или просто игнорируя такое переполнение внутри него.

Обратите внимание, что в настоящее время он не поддерживает дополнительные параметры для этих режимов противодавления. Если вам нужна эта настройка, использование NONE в качестве режима противодавления и применение соответствующего onBackpressureXXX на полученном Observable - это путь.

Первый типичный случай для его использования, когда вы хотите взаимодействовать с источником на основе push, таким как события GUI. Эти API имеют некоторую форму addListener / removeListener которые можно использовать:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter относительно прост в использовании; можно вызвать onNext , onError и onCompleted а оператор самостоятельно обрабатывает управление противодавлением и отменой подписки. Кроме того, если обернута API - поддерживает отмену (например, удаление слушателя в данном примере), можно использовать setCancellation (или setSubscription для Subscription -подобных ресурсов) для регистрации отмены обратного вызова , которая вызывается , когда вниз по течению отписывается или onError / onCompleted вызывается на предоставленный экземпляр Emitter .

Эти методы позволяют одновременно связывать только один ресурс с эмиттером и устанавливать новый, который автоматически не подписывается на старый. Если нужно обрабатывать несколько ресурсов, создайте CompositeSubscription , свяжите ее с эмиттером, а затем добавьте дополнительные ресурсы в саму CompositeSubscription подписку:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

Второй сценарий обычно включает в себя некоторый асинхронный API с обратным вызовом, который должен быть преобразован в Observable .

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

В этом случае делегация работает одинаково. К сожалению, как правило, эти классические API обратного вызова не поддерживают отмену, но если это так, можно настроить их отмену, как в примерах previoius (хотя, возможно, более увлекательный способ). Обратите внимание на использование режима LATEST противодавления; если мы знаем, что будет только одно значение, нам не нужна стратегия BUFFER , поскольку он выделяет стандартный буфер длиной 128 элементов (который растет по мере необходимости), который никогда не будет полностью использован.



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow