rx-java
Contrapresion
Buscar..
Introducción
La contrapresión es cuando en un proceso de procesamiento Observable , algunas etapas asíncronas no pueden procesar los valores con la rapidez suficiente y necesitan una forma de decirle al productor ascendente que disminuya la velocidad.
El caso clásico de la necesidad de contrapresión es cuando el productor es una fuente candente:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
En este ejemplo, el hilo principal producirá 1 millón de artículos para un consumidor final que lo procesa en un hilo de fondo. Es probable que el método de compute(int) tome algún tiempo, pero la sobrecarga de la cadena de operadores Observable también puede aumentar el tiempo que se tarda en procesar los elementos. Sin embargo, el subproceso que produce con el bucle for no puede saber esto y continúa en onNext .
Internamente, los operadores asíncronos tienen buffers para mantener dichos elementos hasta que puedan procesarse. En el Rx.NET clásico y el primer RxJava, estos buffers no tenían límites, lo que significa que probablemente obtendrían casi todos los 1 millón de elementos del ejemplo. El problema comienza cuando hay, por ejemplo, 1 billón de elementos o la misma secuencia de 1 millón aparece 1000 veces en un programa, lo que lleva a OutOfMemoryError y generalmente se ralentiza debido a una sobrecarga de GC excesiva.
De manera similar a cómo el manejo de errores se convirtió en un ciudadano de primera clase y en operadores recibidos para lidiar con él (a través de los operadores onErrorXXX ), la contrapresión es otra propiedad de los flujos de datos que el programador debe considerar y manejar (a través de los operadores onBackpressureXXX ).
Más allá del PublishSubject anterior, hay otros operadores que no admiten la contrapresión, principalmente debido a razones funcionales. Por ejemplo, el interval del operador emite valores periódicamente, lo que hace que la contrapresión lleve a cambios en el período en relación con un reloj de pared.
En RxJava moderno, la mayoría de los operadores asíncronos ahora tienen un búfer interno acotado, como observeOn arriba y cualquier intento de desbordar este búfer terminará toda la secuencia con MissingBackpressureException . La documentación de cada operador tiene una descripción sobre su comportamiento de contrapresión.
Sin embargo, la contrapresión está presente más sutilmente en las secuencias frías regulares (que no lo hacen y no deberían producir la MissingBackpressureException ). Si el primer ejemplo es reescrito:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
No hay error y todo funciona sin problemas con el uso de memoria pequeña. La razón de esto es que muchos operadores de origen pueden "generar" valores a pedido y, por lo tanto, observeOn del operador puede decir que el range genera a lo sumo tantos valores que el búfer de observeOn puede contener de una vez sin desbordamiento.
Esta negociación se basa en el concepto de informática de co-rutinas (yo te llamo, tú me llamas). El operador range envía una devolución de llamada, en la forma de una implementación del Producer interfaz, a la observeOn llamando a su (interior Subscriber 's) setProducer . A cambio, observeOn llama a Producer.request(n) con un valor para indicar el range que tiene permitido producir (es decir, onNext ) muchos elementos adicionales . Entonces, es responsabilidad de observeOn llamar al método de request en el momento adecuado y con el valor correcto para mantener los datos fluyendo pero sin desbordarse.
Rara vez es necesario expresar la contrapresión en los consumidores finales (porque son síncronos respecto a su flujo ascendente inmediato y la contrapresión ocurre naturalmente debido al bloqueo de la pila de llamadas), pero puede ser más fácil comprender su funcionamiento:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
Aquí la implementación de onStart indica el range para producir su primer valor, que luego se recibe en onNext . Una vez que finaliza la compute(int) , el otro valor se solicita desde el range . En una implementación ingenua de range , dicha llamada recurriría recursivamente onNext , lo que llevaría a StackOverflowError que por supuesto no es deseable.
Para evitar esto, los operadores utilizan la llamada lógica de trampolín que evita tales llamadas reentrantes. En términos de range , recordará que hubo una request(1) mientras que llamó onNext() y una vez que onNext() devuelve, realizará otra ronda y llamará a onNext() con el siguiente valor entero. Por lo tanto, si se intercambian los dos, el ejemplo sigue funcionando igual:
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
Sin embargo, esto no es cierto para onStart . Aunque la infraestructura Observable garantiza que se llamará como máximo una vez en cada Subscriber , la llamada a request(1) puede desencadenar la emisión de un elemento de inmediato. Si uno tiene lógica de inicialización después de la llamada a request(1) que necesita onNext , puede terminar con excepciones:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
En este caso síncrono, se lanzará una NullPointerException inmediatamente mientras se sigue ejecutando onStart . Un error más sutil ocurre si la llamada a request(1) desencadena una llamada asíncrona a onNext en algún otro hilo y lee el name en onNext carreras onNext escriben en la request posterior al onStart .
Por lo tanto, uno debe hacer toda la inicialización de campo en onStart o incluso antes de eso y llamar a request() último lugar. Las implementaciones de request() en los operadores aseguran una relación correcta antes del suceso (o en otros términos, liberación de memoria o valla completa) cuando sea necesario.
Los operadores onBackpressureXXX
La mayoría de los desarrolladores encuentran una contrapresión cuando su aplicación falla con MissingBackpressureException y la excepción generalmente apunta al operador de observeOn . La causa real suele ser el uso no PublishSubject de PublishSubject , timer() o interval() u operadores personalizados creados a través de create() .
Hay varias maneras de lidiar con tales situaciones.
Aumentar los tamaños de búfer.
A veces, tales desbordamientos ocurren debido a fuentes ráfagas. De repente, el usuario toca la pantalla con demasiada rapidez y observeOn el búfer interno de 16 elementos predeterminado de observeOn en los desbordamientos de Android.
La mayoría de los operadores sensibles a la contrapresión en las versiones recientes de RxJava ahora permiten a los programadores especificar el tamaño de sus búferes internos. Los parámetros relevantes generalmente se llaman bufferSize , prefetch o capacityHint . Dado el ejemplo desbordante en la introducción, solo podemos aumentar el tamaño del búfer de observeOn para tener suficiente espacio para todos los valores.
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
.subscribe(e -> { }, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Sin embargo, tenga en cuenta que, en general, esto puede ser solo una solución temporal, ya que el desbordamiento todavía puede ocurrir si la fuente produce en exceso el tamaño del búfer previsto. En este caso, uno puede usar uno de los siguientes operadores.
Valores por lotes / saltos con operadores estándar
En caso de que los datos de origen puedan procesarse más eficientemente en lotes, se puede reducir la probabilidad de MissingBackpressureException mediante el uso de uno de los operadores de procesamiento por lotes estándar (por tamaño y / o por tiempo).
PublishSubject<Integer> source = PublishSubject.create();
source
.buffer(1024)
.observeOn(Schedulers.computation(), 1024)
.subscribe(list -> {
list.parallelStream().map(e -> e * e).first();
}, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Si algunos de los valores se pueden ignorar de manera segura, uno puede usar el muestreo (con tiempo u otro Observable) y los operadores de regulación ( throttleFirst , throttleLast , throttleWithTimeout ).
PublishSubject<Integer> source = PublishSubject.create();
source
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation(), 1024)
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Tenga en cuenta, sin embargo, que estos operadores solo reducen la tasa de recepción de valor en sentido descendente y, por lo tanto, aún pueden llevar a la MissingBackpressureException .
onBackpressureBuffer ()
Este operador en su forma sin parámetros reintroduce un búfer ilimitado entre la fuente ascendente y el operador descendente. Ser ilimitado significa que mientras la JVM no se quede sin memoria, puede manejar casi cualquier cantidad que provenga de una fuente ráfaga.
Observable.range(1, 1_000_000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), 8)
.subscribe(e -> { }, Throwable::printStackTrace);
En este ejemplo, observeOn va con un tamaño de búfer muy bajo, sin embargo, no hay una MissingBackpressureException ya que onBackpressureBuffer absorbe todos los valores de 1 millón y entrega lotes pequeños de este para observeOn .
Tenga en cuenta, sin embargo, que onBackpressureBuffer consume su fuente de manera ilimitada, es decir, sin aplicarle ninguna contrapresión. Esto tiene la consecuencia de que incluso una fuente de soporte de contrapresión como el range se realizará completamente.
Hay 4 sobrecargas adicionales de onBackpressureBuffer
onBackpressureBuffer (capacidad int)
Esta es una versión limitada que señala a BufferOverflowError en caso de que su buffer alcance la capacidad dada.
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
La relevancia de este operador está disminuyendo a medida que más y más operadores ahora permiten configurar sus tamaños de búfer. Para el resto, esto brinda la oportunidad de "ampliar su búfer interno" al tener un número mayor con onBackpressureBuffer que su valor predeterminado.
onBackpressureBuffer (capacidad int, Action0 onOverflow)
Esta sobrecarga llama a una acción (compartida) en caso de que ocurra un desbordamiento. Su utilidad es bastante limitada, ya que no se proporciona otra información sobre el desbordamiento que la pila de llamadas actual.
onBackpressureBuffer (capacidad int, Action0 onOverflow, BackpressureOverflow.Strategy estrategia)
Esta sobrecarga es en realidad más útil, ya que permite definir qué hacer en caso de que se haya alcanzado la capacidad. BackpressureOverflow.Strategy es una interfaz en realidad, pero la clase BackpressureOverflow ofrece 4 campos estáticos con implementaciones que representan acciones típicas:
-
ON_OVERFLOW_ERROR: este es el comportamiento predeterminado de las dos sobrecargas anteriores, que señala unaBufferOverflowException -
ON_OVERFLOW_DEFAULT: actualmente es lo mismo queON_OVERFLOW_ERROR -
ON_OVERFLOW_DROP_LATEST: si ocurriera un desbordamiento, el valor actual simplemente se ignorará y solo se entregarán los valores antiguos una vez que las solicitudes posteriores. -
ON_OVERFLOW_DROP_OLDEST: elimina el elemento más antiguo del búfer y le agrega el valor actual.
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
Tenga en cuenta que las dos últimas estrategias causan discontinuidad en el flujo a medida que abandonan los elementos. Además, no señalarán la BufferOverflowException .
onBackpressureDrop ()
Cuando el flujo descendente no esté listo para recibir valores, este operador eliminará ese elemento de la secuencia. Uno puede pensar en ello como una capacidad de 0 en el onBackpressureBuffer con la estrategia ON_OVERFLOW_DROP_LATEST .
Este operador es útil cuando uno puede ignorar de forma segura los valores de una fuente (como los movimientos del mouse o las señales de ubicación de GPS actuales) ya que habrá valores más actualizados más adelante.
component.mouseMoves()
.onBackpressureDrop()
.observeOn(Schedulers.computation(), 1)
.subscribe(event -> compute(event.x, event.y));
Puede ser útil en conjunto con el interval() operador de origen interval() . Por ejemplo, si uno desea realizar alguna tarea periódica en segundo plano, pero cada iteración puede durar más que el período, es seguro eliminar la notificación de intervalo de exceso, ya que habrá más adelante:
Observable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);
Existe una sobrecarga de este operador: onBackpressureDrop(Action1<? super T> onDrop) donde se llama a la acción (compartida) y se descarta el valor. Esta variante permite limpiar los propios valores (por ejemplo, liberar recursos asociados).
onBackpressureLatest ()
El operador final conserva solo el último valor y prácticamente sobrescribe los valores antiguos y no entregados. Uno puede pensar en esto como una variante del onBackpressureBuffer con una capacidad de 1 y la estrategia de ON_OVERFLOW_DROP_OLDEST .
A diferencia de onBackpressureDrop siempre hay un valor disponible para el consumo si el flujo descendente se está quedando atrás. Esto puede ser útil en algunas situaciones similares a la telemetría en las que los datos pueden venir en un patrón de ráfagas, pero solo el último es interesante para el procesamiento.
Por ejemplo, si el usuario hace mucho clic en la pantalla, todavía queremos reaccionar a su última entrada.
component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
El uso de onBackpressureDrop en este caso llevaría a una situación en la que el último clic se cae y deja al usuario preguntándose por qué no se ejecutó la lógica empresarial.
Creando fuentes de datos con presión
Crear fuentes de datos con presión es una tarea relativamente más fácil cuando se trata de la presión en general, ya que la biblioteca ya ofrece métodos estáticos en Observable que manejan la presión para el desarrollador. Podemos distinguir dos tipos de métodos de fábrica: los "generadores" fríos que devuelven y generan elementos basados en la demanda descendente y los "impulsores" en caliente que generalmente conectan las fuentes de datos no reactivos y / o no recuperables y superponen el manejo de la contrapresión. ellos.
sólo
La fuente de conciencia de contrapresión más básica se crea a través de just :
Observable.just(1).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(0);
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
// the rest is omitted for brevity
}
Como no solicitamos explícitamente en onStart , esto no imprimirá nada. just es genial cuando hay un valor constante que nos gustaría iniciar una secuencia.
Desafortunadamente, just se confunde a menudo con una forma de calcular algo de forma dinámica para que lo consuman los Subscriber :
int counter;
int computeValue() {
return ++counter;
}
Observable<Integer> o = Observable.just(computeValue());
o.subscribe(System.out:println);
o.subscribe(System.out:println);
Sorprendente para algunos, esto imprime 1 dos veces en lugar de imprimir 1 y 2 respectivamente. Si la llamada se reescribe, se vuelve obvio por qué funciona así:
int temp = computeValue();
Observable<Integer> o = Observable.just(temp);
El computeValue se llama como parte de la rutina principal y no en respuesta a la suscripción de los suscriptores.
de Callable
Lo que la gente realmente necesita es el método de fromCallable :
Observable<Integer> o = Observable.fromCallable(() -> computeValue());
Aquí, computeValue se ejecuta solo cuando un suscriptor se suscribe y para cada uno de ellos, imprime el 1 y el 2. fromCallable . Desde fromCallable también es compatible con la contrapresión y no emitirá el valor calculado a menos que se solicite. Tenga en cuenta, sin embargo, que el cálculo sucede de todos modos. En caso de que el cálculo en sí se retrase hasta que el flujo descendente solicite, podemos usar just con map :
Observable.just("This doesn't matter").map(ignored -> computeValue())...
just no emitirá su valor constante hasta que se solicite cuando se asigna al resultado de computeValue , que aún se llama para cada suscriptor individualmente.
desde
Si los datos ya está disponible como un conjunto de objetos, una lista de objetos o cualquier Iterable fuente, el respectivo from sobrecargas se encargará de la contrapresión y la emisión de dichas fuentes:
Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);
Para mayor comodidad (y evitar las advertencias sobre creación de la matriz genérica) existen 2 a 10 sobrecargas argumento para just que el delegado internamente a from .
El from(Iterable) también da una oportunidad interesante. La generación de muchos valores puede expresarse en forma de una máquina de estado. Cada elemento solicitado desencadena una transición de estado y el cálculo del valor devuelto.
Escribir máquinas de estado como Iterable s es un tanto complicado (pero aún más fácil que escribir un Observable para consumirlo) y, a diferencia de C #, Java no tiene ningún soporte del compilador para construir dichas máquinas de estado simplemente escribiendo código de aspecto clásico (con yield return y yield break ). Algunas bibliotecas ofrecen ayuda, como AbstractIterable Google Guava e Ix.generate() Ix.forloop() e Ix.forloop() . Estos son por sí mismos dignos de una serie completa, así que veamos una fuente de Iterable muy básica que repite un valor constante de forma indefinida:
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return 1;
}
};
Observable.from(iterable).take(5).subscribe(System.out::println);
Si consumiéramos el iterator través del bucle for clásico, daríamos como resultado un bucle infinito. Ya que construimos un Observable partir de él, podemos expresar nuestra voluntad de consumir solo los primeros 5 y dejar de solicitar cualquier cosa. Este es el verdadero poder de evaluar y computar perezosamente dentro de Observable s.
crear (SyncOnSubscribe)
A veces, la fuente de datos que se convertirá en el mundo reactivo en sí es sincrónica (de bloqueo) y de tipo pull, es decir, tenemos que llamar a algún método de get o read para obtener el siguiente dato. Uno podría, por supuesto, convertir eso en un Iterable pero cuando tales fuentes están asociadas con recursos, podemos filtrar esos recursos si el flujo descendente anula la suscripción de la secuencia antes de que finalice.
Para manejar tales casos, RxJava tiene la clase SyncOnSubscribe . Uno puede ampliarlo e implementar sus métodos o usar uno de sus métodos de fábrica basados en lambda para crear una instancia.
SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
() -> new FileInputStream("data.bin"),
(inputstream, output) -> {
try {
int byte = inputstream.read();
if (byte < 0) {
output.onCompleted();
} else {
output.onNext(byte);
}
} catch (IOException ex) {
output.onError(ex);
}
return inputstream;
},
inputstream -> {
try {
inputstream.close();
} catch (IOException ex) {
RxJavaHooks.onError(ex);
}
}
);
Observable<Integer> o = Observable.create(binaryReader);
En general, SyncOnSubscribe utiliza 3 devoluciones de llamada.
Las primeras devoluciones de llamada permiten crear un estado por suscriptor, como FileInputStream en el ejemplo; El archivo se abrirá de forma independiente para cada suscriptor individual.
La segunda devolución de llamada toma este objeto de estado y proporciona un Observer salida cuyos métodos onXXX pueden llamarse para emitir valores. Esta devolución de llamada se ejecuta tantas veces como se solicite en sentido descendente. En cada invocación, debe llamar onNext como máximo una vez, opcionalmente seguido de onError o onCompleted . En el ejemplo, llamamos onCompleted() si el byte de lectura es negativo, lo que indica el final del archivo, y llamamos onError en caso de que la lectura lance una IOException .
La devolución de llamada final se invoca cuando las subscripciones subsiguientes (cerrando la entrada) o cuando la devolución de llamada anterior llamó a los métodos del terminal; Permite liberar recursos. Dado que no todas las fuentes necesitan todas estas características, los métodos estáticos de SyncOnSubscribe permiten crear instancias sin ellos.
Desafortunadamente, muchas llamadas de métodos a través de la JVM y otras bibliotecas arrojan excepciones comprobadas y deben ajustarse en try-catch ya que las interfaces funcionales utilizadas por esta clase no permiten lanzar excepciones controladas.
Por supuesto, podemos imitar otras fuentes típicas, como un rango ilimitado con él:
SyncOnSubscribe.createStateful(
() -> 0,
(current, output) -> {
output.onNext(current);
return current + 1;
},
e -> { }
);
En esta configuración, la current comienza con 0 y la próxima vez que se invoca la lambda, el parámetro current ahora tiene 1 .
Hay una variante de SyncOnSubscribe llamada AsyncOnSubscribe que parece bastante similar, con la excepción de que la devolución de llamada central también toma un valor largo que representa la cantidad solicitada y la devolución de llamada debe generar un Observable con la misma longitud. Esta fuente luego concatena todos estos Observable en una sola secuencia.
AsyncOnSubscribe.createStateful(
() -> 0,
(state, requested, output) -> {
output.onNext(Observable.range(state, (int)requested));
return state + 1;
},
e -> { }
);
Existe una discusión continua (acalorada) sobre la utilidad de esta clase y, en general, no se recomienda porque rompe las expectativas sobre cómo emitirá esos valores generados y cómo responderá, o incluso qué tipo de valores de solicitud recibirá. Escenarios de consumo más complejos.
crear (emisor)
A veces, la fuente que se envuelve en un Observable ya está caliente (como los movimientos del mouse) o fría, pero no se puede volver a comprimir en su API (como una devolución de llamada de red asíncrona).
Para manejar estos casos, una versión reciente de RxJava introdujo el método de fábrica create(emitter) . Toma dos parámetros:
- una devolución de llamada que se llamará con una instancia de la interfaz del
Emitter<T>para cada suscriptor entrante, - una enumeración
Emitter.BackpressureModeque obliga al desarrollador a especificar el comportamiento de contrapresión que se aplicará. Tiene los modos habituales, similares aonBackpressureXXXademás de señalar unaMissingBackpressureExceptiono simplemente ignorar dicho desbordamiento dentro de él.
Tenga en cuenta que actualmente no admite parámetros adicionales para esos modos de contrapresión. Si se necesita esa personalización, usar NONE como modo de contrapresión y aplicar onBackpressureXXX relevante en el Observable resultante es el camino a seguir.
El primer caso típico para su uso cuando uno quiere interactuar con una fuente basada en push, como los eventos GUI. Esas API presentan alguna forma de llamadas addListener / removeListener que se pueden utilizar:
Observable.create(emitter -> {
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
emitter.setCancellation(() ->
button.removeListener(al));
}, BackpressureMode.BUFFER);
El Emitter es relativamente sencillo de usar; se puede llamar a onNext , onError y onCompleted en él y el operador se encarga de la gestión de la contrapresión y la baja de suscripción por su cuenta. Además, si la API ajustada admite la cancelación (como la eliminación del oyente en el ejemplo), se puede usar setCancellation (o setSubscription para los recursos de Subscription ) para registrar una devolución de llamada de cancelación que se invoca cuando el onError la onError la suscripción o onError / onCompleted se llama en el proporcionado Emitter ejemplo.
Estos métodos permiten que solo se asocie un solo recurso con el emisor a la vez, y al configurar uno nuevo se cancela la suscripción del antiguo de forma automática. Si uno tiene que manejar múltiples recursos, cree una CompositeSubscription a CompositeSubscription , asóciela con el emisor y luego agregue más recursos a la CompositeSubscription a CompositeSubscription sí:
Observable.create(emitter -> {
CompositeSubscription cs = new CompositeSubscription();
Worker worker = Schedulers.computation().createWorker();
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
cs.add(worker);
cs.add(Subscriptions.create(() ->
button.removeActionListener(al));
emitter.setSubscription(cs);
}, BackpressureMode.BUFFER);
El segundo escenario usualmente involucra alguna API asíncrona basada en la devolución de llamada que se debe convertir en un Observable .
Observable.create(emitter -> {
someAPI.remoteCall(new Callback<Data>() {
@Override
public void onSuccess(Data data) {
emitter.onNext(data);
emitter.onCompleted();
}
@Override
public void onFailure(Exception error) {
emitter.onError(error);
}
});
}, BackpressureMode.LATEST);
En este caso, la delegación trabaja de la misma manera. Desafortunadamente, por lo general, estas API de estilo de devolución de llamada clásicas no admiten la cancelación, pero si lo hacen, se puede configurar su cancelación como en los ejemplos anteriores (aunque quizás sea una forma más complicada). Observe el uso de la LATEST modo de contrapresión; Si sabemos que solo habrá un único valor, no necesitamos la estrategia BUFFER , ya que asigna un búfer de 128 elementos predeterminado (que crece según sea necesario) que nunca se utilizará por completo.