サーチ…
前書き
バックプレッシャーは、 Observable処理パイプラインでは、一部の非同期ステージでは値を十分速く処理できず、上流のプロデューサに遅くするよう指示する方法が必要です。
バックプレッシャーの必要性の古典的なケースは、生産者が熱源であるときです:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
この例では、メインスレッドは、バックエンドスレッド上でそれを処理しているエンドコンシューマに100万アイテムを生成します。 compute(int)メソッドには時間がかかる可能性がありますが、 Observable演算子チェーンのオーバーヘッドがアイテムの処理にかかる時間を増やすこともあります。しかし、forループの生成スレッドはこれを知ることができず、 onNextを保持します。
内部的には、非同期演算子には、処理できるようになるまでそのような要素を保持するバッファがあります。古典的なRx.NETと初期のRxJavaでは、これらのバッファには制限がありませんでした。つまり、この例からほぼ100万の要素すべてを保持する可能性があります。この問題は、例えば10億個の要素が存在する場合、または100万個のシーケンスがプログラムに1000回出現し、 OutOfMemoryErrorにつながり、過剰なGCオーバーヘッドのために一般的に減速するOutOfMemoryError発生します。
エラー処理は、第一級オブジェクトとなり、(を経由して、それに対処するための演算子を受信する方法と同様にonErrorXXXオペレーター)、背圧は(経由プログラマが考えると処理するために持っているデータフローの別の特性であるonBackpressureXXXオペレーターが)。
上記のPublishSubject以外にも、背圧をサポートしていない他の演算子があります。これは主に関数的な理由によるものです。たとえば、オペレータのintervalは定期的に値を出力し、バックプレッシャは壁時計との相対的な周期のシフトにつながります。
現代のRxJavaでは、ほとんどの非同期演算子が上記のobserveOnような有界内部バッファをobserveOnようになりました。このバッファをオーバーフローさせると、シーケンス全体がMissingBackpressureException終了します。各オペレータのドキュメントには、背圧の動作についての説明があります。
しかし、背圧は通常のコールドシーケンス( MissingBackpressureExceptionていないし、そうでなければならない)にもっと微妙に存在します。最初の例が書き直された場合:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
エラーはなく、小さなメモリ使用量ですべてがスムーズに実行されます。この理由は、多くのソースオペレーターはオンデマンド値を「生成」することができますので、オペレータということですobserveOn伝えることができるrange 、せいぜい非常に多くの値が生成observeOnバッファがオーバーフローすることなく、一度に保持することができます。
この交渉はコンピュータ・サイエンスのコルーチンの概念に基づいています(私はあなたを私に呼びます)。オペレータrange実施の形態では、コールバックを送信するProducerに、インターフェースobserveOnその(内側呼び出すことによって、 Subscriberの) setProducer 。見返りに、 observeOn呼び出しProducer.request(n)指示する値とrange (すなわち、生成するように許可されているがonNext多くの追加要素ことを)。適切な時間にrequestメソッドを呼び出し、データが流れてもオーバーフローしないように正しい値を設定するのはobserveOnの責任です。
エンド・コンシューマのバックプレッシャの表現はほとんど必要ありません(コールスタック・ブロッキングのために直上流とバックプレッシャに関して同期しているので自然に発生します)が、その動作を理解することは簡単です。
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
ここで、 onStart実装は、最初の値を生成するrangeを示し、その後、 onNext受け取られonNext 。 compute(int)終了すると、 rangeから別の値が要求されrange 。 range純粋な実装でrange 、そのような呼び出しは再帰的にonNext呼び出すonNext 、もちろん望ましくないStackOverflowError発生します。
これを防ぐために、オペレータは、このようなリエントラントコールを防止する、いわゆるトランポリングロジックを使用します。 rangeの条件では、 onNext()を呼び出していてonNext()が戻ったときにrequest(1)コールがあったことを覚えていて、次の整数値でonNext()を呼び出して呼び出します。したがって、2つがスワップされても、例は同じように動作します。
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
しかし、これはonStartは当てはまりません。 Observableインフラストラクチャでは、各Subscriberで最大でも1回呼び出されることが保証されていますが、 request(1)呼び出しはすぐに要素の排出をトリガーする可能性があります。 onNextが必要とするrequest(1)呼び出しの後に初期化ロジックがある場合、例外が発生する可能性があります。
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
この同期の場合、 onStart実行中にすぐにNullPointerExceptionがスローされます。 request(1)呼び出しによって、他のスレッドでonNextへの非同期呼び出しがonNextれ、 onNextレースのnameがonStartポストrequest書き込まれる場合、より微妙なバグが発生します。
したがって、 onStartフィールド初期化を行うか、それより前であってもrequest()最後に呼び出す必要があります。オペレータのrequest()実装は、必要にrequest()適切な先行関係(またはメモリ解放または完全フェンス)を保証します。
onBackpressureXXX演算子
ほとんどの開発者は、アプリケーションがMissingBackpressureExceptionで失敗したときに背圧に遭遇し、例外は通常observeOn演算子を指しています。実際の原因は、通常、 PublishSubject 、 timer()またはinterval()やcreate()作成されたカスタム演算子の非PublishSubject使用です。
このような状況に対処するにはいくつかの方法があります。
バッファサイズの増加
時にはそのようなオーバーフローはバースト的なソースのために起こります。突然、ユーザーは画面をあまりにも速くタップし、Androidのデフォルトの16要素の内部バッファーがオーバーフローすることをobserveOnます。
最近のバージョンのRxJavaの背圧に敏感な演算子のほとんどは、プログラマが内部バッファのサイズを指定できるようになりました。関連するパラメータは通常、 bufferSize 、 prefetchまたはcapacityHintと呼ばれます。導入の例があふれているので、 observeOnのバッファサイズをobserveOnて、すべての値に十分な余裕を持たせることができます。
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
.subscribe(e -> { }, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
ただし、ソースが予測されたバッファサイズを過剰生成すると、オーバーフローが発生する可能性があるため、これは一般的に一時的な修正に過ぎません。この場合、次の演算子のいずれかを使用できます。
標準演算子で値をバッチ/スキップする
ソースデータをより効率的にバッチ処理できる場合は、標準のバッチ演算子の1つ(サイズおよび/または時間による)を使用してMissingBackpressureExceptionの可能性を減らすことができます。
PublishSubject<Integer> source = PublishSubject.create();
source
.buffer(1024)
.observeOn(Schedulers.computation(), 1024)
.subscribe(list -> {
list.parallelStream().map(e -> e * e).first();
}, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
値の一部を安全に無視できる場合は、サンプリング(時間別または別のObservable)と絞り込み演算子( throttleFirst 、 throttleLast 、 throttleWithTimeout )を使用できます。
PublishSubject<Integer> source = PublishSubject.create();
source
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation(), 1024)
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
これらの演算子は、下流側の値の受信率を低下させるだけなので、 MissingBackpressureExceptionが発生する可能性があります。
onBackpressureBuffer()
パラメータなしの形式のこの演算子は、上流の演算子と下流の演算子の間に無制限のバッファを再導入します。無制限とは、JVMのメモリが不足している限り、バースト的なソースからのほとんどすべての量を処理できます。
Observable.range(1, 1_000_000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), 8)
.subscribe(e -> { }, Throwable::printStackTrace);
この例では、 observeOn非常に低いバッファサイズと行くまだありませんMissingBackpressureExceptionとonBackpressureBufferへの小さなバッチを超えるすべての100万点の値と手を浸すobserveOn 。
ただし、 onBackpressureBufferはソースを無制限に消費します。つまり、 onBackpressureBufferを適用しません。これは、 rangeなどの背圧支持源が完全に実現されるという結果をもたらす。
onBackpressureBufferさらに4つのオーバーロードがありonBackpressureBuffer
onBackpressureBuffer(int capacity)
これは、そのバッファが所定の容量に達した場合に、 BufferOverflowErrorする有界バージョンです。
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
ますます多くの演算子がバッファサイズを設定できるようになれば、この演算子の関連性は低下しています。残りの部分では、これはonBackpressureBufferをonBackpressureBufferより大きな数をデフォルト値より大きくすることによって、 "内部バッファを拡張する"機会を与えます。
onBackpressureBuffer(int capacity、Action0 onOverflow)
このオーバーロードは、オーバーフローが発生した場合に備えて(共有)アクションを呼び出します。その有用性は、現在の呼び出しスタックよりもオーバーフローに関して提供される他の情報がないので、かなり限られています。
onBackpressureBuffer(int capacity、Action0 onOverflow、BackpressureOverflow.Strategy戦略)
この過負荷は、実際には容量に達した場合に何をすべきかを定義するので、実際にはより有用です。 BackpressureOverflow.Strategyは実際にはインターフェイスですが、 BackpressureOverflowクラスには典型的なアクションを表す4つの静的フィールドが実装されています。
-
ON_OVERFLOW_ERROR:これは前の2つのオーバーロードのデフォルトの動作で、BufferOverflowException通知します -
ON_OVERFLOW_DEFAULT:現在はON_OVERFLOW_ERRORと同じON_OVERFLOW_ERROR -
ON_OVERFLOW_DROP_LATEST:オーバーフローが発生した場合、現在の値は単純に無視され、古い値だけがダウンストリーム要求後に配信されます。 -
ON_OVERFLOW_DROP_OLDEST:バッファ内の最も古い要素を削除し、現在の値を追加します。
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
最後の2つのストラテジは、要素が削除されるときにストリームに不連続性を引き起こすことに注意してください。さらに、彼らはBufferOverflowException通知しません。
onBackpressureDrop()
ダウンストリームが値を受け取る準備ができていないときはいつでも、この演算子はシーケンスからそのエレメンツを削除します。 1つのストラテジをON_OVERFLOW_DROP_LATEST onBackpressureBufferして、0の容量のonBackpressureBufferと考えることができます。
この演算子は、後でさらに最新の値があるため、ソースからの値(マウス移動や現在のGPS位置信号など)を安全に無視できる場合に便利です。
component.mouseMoves()
.onBackpressureDrop()
.observeOn(Schedulers.computation(), 1)
.subscribe(event -> compute(event.x, event.y));
ソース演算子のinterval()と組み合わせて使用すると便利です。たとえば、定期的なバックグラウンド・タスクを実行したいが、各反復が期間よりも長く続く場合は、あとで行うように余分な間隔通知を削除することが安全です。
Observable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);
onBackpressureDrop(Action1<? super T> onDrop)ここで、(shared)アクションが呼び出され、値が削除されます。この変形では、値そのものをクリーンアップすることができます(例えば、関連リソースの解放)。
onBackpressureLatest()
最後の演算子は最新の値のみを保持し、実際には古い値、未送出の値を上書きします。これは、1の容量とON_OVERFLOW_DROP_OLDEST戦略を持つonBackpressureBuffer変種と考えることができます。
onBackpressureDropとは異なり、ダウンストリームが遅れてしまった場合、常に消費可能な値があります。これは、データがバースト的なパターンで現れることがありますが、非常に最新のものだけが処理に興味があるテレメトリのような状況では役に立ちます。
たとえば、ユーザーが画面上でたくさんのボタンをクリックした場合でも、最新の入力に反応したいと考えています。
component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
この場合、 onBackpressureDropを使用すると、最後のクリックがonBackpressureDrop 、ビジネスロジックが実行されなかった理由がユーザーにonBackpressureDropます。
背圧データソースの作成
ライブラリが既に開発者のバックObservableを扱う静的メソッドをObservable提供しているので、背圧のデータソースを作成することは、バックプレッシャを扱う際の比較的簡単な作業です。下流の需要に基づいて要素を返却して生成する冷たい「ジェネレータ」と、通常、非反応性のデータソースおよび/または非バックラブルのデータソースを橋渡しするホット「プッシャ」と、それら。
ちょうど
最も基本的な背圧を意識ソースが経由で作成されたjust 。
Observable.just(1).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(0);
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
// the rest is omitted for brevity
}
私たちは明示的にonStartでリクエストしないので、何も印刷されません。私たちがシーケンスをジャンプ・スタートしたいという一定の価値があるときには、 justです。
残念ながら、 just多くの場合、動的で消費されるために何かを計算する方法を間違えているSubscriber秒:
int counter;
int computeValue() {
return ++counter;
}
Observable<Integer> o = Observable.just(computeValue());
o.subscribe(System.out:println);
o.subscribe(System.out:println);
驚くべきことに、これは印刷1と2の代わりに1を2回印刷します。コールが書き換えられた場合、なぜそれが機能するのかが明白になります。
int temp = computeValue();
Observable<Integer> o = Observable.just(temp);
computeValueは、メインルーチンの一部として呼び出され、サブスクライバのサブスクリプcomputeValueへの応答ではありません。
fromCallable
人々が実際に必要とするのは、メソッドfromCallableです。
Observable<Integer> o = Observable.fromCallable(() -> computeValue());
ここでは、 computeValueはサブスクライバがサブスクライブするときにのみ実行され、それぞれに対して期待される1と2をfromCallableます。もちろん、 fromCallableもfromCallableを適切にサポートし、要求されない限り計算値を出力しません。しかし、計算はとにかく起こることに注意してください。実際に下流が実際に要求するまで計算自体を遅らせる必要がある場合は、 map justを使うことができます:
Observable.just("This doesn't matter").map(ignored -> computeValue())...
just 、それが結果にマッピングされるときに要求されるまでその一定値を放出しないであろうcomputeValue 、依然として個々の加入者を求めました。
から
データがオブジェクトの配列、オブジェクトのリスト、または任意のIterableソースとして既に利用可能な場合、それぞれの過負荷from Iterableおよびそのようなソースの放出が処理されます。
Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);
便宜上、ジェネリック配列の作成に関する警告を避けるために、そこfrom内部的に委譲されるjustに2〜10引数のオーバーロードがあります。
from(Iterable)も面白い機会を与えます。多くの価値生成は、状態機械の形で表現することができます。要求された各要素は、状態遷移と戻り値の計算をトリガーします。
IterableのようなステートマシンをIterableはやや複雑ですが( Observableを消費するためにObservableを書くよりも簡単です)、C#とは異なり、Javaはコンパイラからクラシックなコードを書くだけでコンパイラからのサポートはありません( yield returnとyield break )。一部のライブラリは、Google GuavaのAbstractIterableやIxJavaのIx.generate()やIx.forloop()などのヘルプを提供しています。これらはそれ自体で完全なシリーズにふさわしいので、ある一定の値を無期限に繰り返す非常に基本的なIterableソースを見てみましょう:
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return 1;
}
};
Observable.from(iterable).take(5).subscribe(System.out::println);
古典的なfor-loopを使用してiteratorを使用すると、無限ループになります。 Observableを構築するので、最初の5つだけを消費し、何も要求しない意思を表明することができます。これはObservableの内部で怠惰な評価と計算を行う本当の力です。
作成(SyncOnSubscribe)
時には、反応的な世界自体に変換されるデータソースは、同期(ブロッキング)とプル型です。つまり、次のデータを取得するために、いくつかのgetメソッドまたはreadメソッドを呼び出す必要getあります。もちろん、それをIterable変えることはできますが、そのようなソースがリソースに関連付けられている場合、ダウンストリームが終了する前にシーケンスをアンIterableすると、それらのリソースがリークする可能性があります。
このようなケースを処理するために、RxJavaにはSyncOnSubscribeクラスがあります。それを拡張してメソッドを実装するか、ラムダベースのファクトリメソッドの1つを使用してインスタンスを構築することができます。
SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
() -> new FileInputStream("data.bin"),
(inputstream, output) -> {
try {
int byte = inputstream.read();
if (byte < 0) {
output.onCompleted();
} else {
output.onNext(byte);
}
} catch (IOException ex) {
output.onError(ex);
}
return inputstream;
},
inputstream -> {
try {
inputstream.close();
} catch (IOException ex) {
RxJavaHooks.onError(ex);
}
}
);
Observable<Integer> o = Observable.create(binaryReader);
一般に、 SyncOnSubscribeは3つのコールバックを使用します。
最初のコールバックでは、この例のFileInputStreamなど、サブスクライバごとの状態を作成できます。ファイルは個々の加入者ごとに独立して開かれます。
2番目のコールバックはこの状態オブジェクトをとり、 onXXXメソッドが呼び出されて値を出力できる出力Observerを提供します。このコールバックは、ダウンストリームで要求された回数だけ実行されます。呼び出しごとにonErrorまたはonCompletedいずれかが後に続く場合は、多くてもonNextを呼び出す必要があります。この例では、読み込みバイトが負の場合はonCompleted()を呼び出し、ファイルの終了と終了を示し、読み込みがIOExceptionスローする場合はonErrorを呼び出します。
ダウンストリームのサブスクライブ解除(入力ストリームを閉じる)または前のコールバックがターミナルメソッドを呼び出したときに、最後のコールバックが呼び出されます。リソースを解放することができます。すべてのソースがこれらの機能をすべて必要とするわけではないので、 SyncOnSubscribeの静的メソッドを使用すると、インスタンスなしでインスタンスが作成されます。
残念なことに、JVMや他のライブラリのメソッド呼び出しの多くは、チェック例外をスローし、このクラスで使用される機能インタフェースではチェック例外を投げられないため、 try-catchラップする必要があります。
もちろん、無限の範囲など、他の典型的なソースを模倣することもできます。
SyncOnSubscribe.createStateful(
() -> 0,
(current, output) -> {
output.onNext(current);
return current + 1;
},
e -> { }
);
この設定では、 currentは0始まり、ラムダが呼び出された次回はパラメータcurrentが1ます。
ミドルコールバックにはダウンストリームからのリクエスト量を表す長い値が必要で、コールバックはまったく同じ長さのObservableを生成する必要があるという例外と非常によく似たSyncOnSubscribeというAsyncOnSubscribeがあります。このソースは、これらのObservableをすべて1つのシーケンスに連結します。
AsyncOnSubscribe.createStateful(
() -> 0,
(state, requested, output) -> {
output.onNext(Observable.range(state, (int)requested));
return state + 1;
},
e -> { }
);
このクラスの有用性についての継続的な議論があり、一般的に推奨されません。なぜなら、それらの生成された値がどのように実際にどのように出力され、どのように応答するか、より複雑な消費者シナリオ
作成(エミッタ)
Observableラップされるソースは、マウスの動きなどで既に暑いですが、非同期のネットワークコールバックなど、APIではバックプレッシャーではありません。
このようなケースを処理するために、RxJavaの最近のバージョンではcreate(emitter)ファクトリメソッドが導入されました。 2つのパラメータが必要です。
- 着信加入者ごとに
Emitter<T>インタフェースのインスタンスで呼び出されるコールバック、 - 開発者が背圧の動作を適用するように指定する
Emitter.BackpressureMode列挙体。onBackpressureXXXに似た通常のモードがあり、MissingBackpressureExceptionを通知するだけでなく、内部でそのようなオーバーフローを単純に無視することもできます。
現在のところ、これらのバックプレッシャモードへの追加パラメータはサポートしていないことに注意してください。これらのカスタマイズが必要な場合は、 onBackpressureXXXプレッシャーモードとしてNONEを使用し、関連するonBackpressureXXXを結果としてのObservableに適用する方法があります。
GUIイベントなど、プッシュベースのソースと対話したい場合の最初の典型的なケースです。これらのAPIには、利用できるaddListener / removeListener呼び出しのいくつかの形式があります。
Observable.create(emitter -> {
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
emitter.setCancellation(() ->
button.removeListener(al));
}, BackpressureMode.BUFFER);
Emitterは比較的簡単に使用できます。 onNext 、 onErrorおよびonCompletedを呼び出すことができ、オペレータはonCompletedおよびonCompleted管理を単独で処理します。さらに、ラップされたAPIが取り消し(この例ではリスナーの削除など)をサポートしている場合、ダウンストリームのサブスクsetSubscriptionやonError / setSubscriptionが呼び出されたときに呼び出される取り消しコールバックを登録するためにsetCancellation (またはSubscriptionようなリソースに対してsetSubscription ) onCompletedは、提供されたEmitterインスタンスで呼び出されます。
これらのメソッドでは、一度に1つのリソースだけをエミッタに関連付けることができ、新しいリソースを設定すると、古いものを自動的に取り消します。複数のリソースを処理する必要がある場合は、 CompositeSubscription作成し、それをエミッタに関連付けてから、 CompositeSubscription自体にさらにリソースを追加します。
Observable.create(emitter -> {
CompositeSubscription cs = new CompositeSubscription();
Worker worker = Schedulers.computation().createWorker();
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
cs.add(worker);
cs.add(Subscriptions.create(() ->
button.removeActionListener(al));
emitter.setSubscription(cs);
}, BackpressureMode.BUFFER);
2番目のシナリオでは通常、 Observable変換する必要のある非同期のコールバックベースのAPIが含まれています。
Observable.create(emitter -> {
someAPI.remoteCall(new Callback<Data>() {
@Override
public void onSuccess(Data data) {
emitter.onNext(data);
emitter.onCompleted();
}
@Override
public void onFailure(Exception error) {
emitter.onError(error);
}
});
}, BackpressureMode.LATEST);
この場合、委任は同じように機能します。残念なことに、通常、これらの古典的なコールバックスタイルのAPIはキャンセルをサポートしていませんが、そうした場合、previoiusの例のように(おそらくより複雑な方法で)キャンセルをセットアップできます。 LATESTプレッシャモードの使用に注意してください。単一の値しか存在しないことが分かっている場合は、決して完全に利用されないデフォルト128要素のロングバッファ(必要に応じて増加する)を割り当てるため、 BUFFER戦略は必要ありません。