수색…
소개
Backpressure 는 Observable 프로세싱 파이프 라인에서 일부 비동기 스테이지가 값을 충분히 빠르게 처리 할 수 없으며 업스트림 프로듀서에게 속도 저하를 지시 할 방법이 필요합니다.
배압이 필요한 전형적인 경우는 생산자가 뜨거운 출처 인 경우입니다.
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
이 예에서 주 스레드는 백그라운드 스레드에서이를 처리하는 최종 사용자에게 1 백만 항목을 생성합니다. compute(int) 메서드는 시간이 좀 걸릴 가능성이 있지만 Observable 연산자 체인의 오버 헤드가 항목 처리에 걸릴 시간을 더할 수 있습니다. 그러나 for 루프가있는 생성 스레드는이를 알 수 없으며 onNext ing을 유지합니다.
내부적으로, 비동기 연산자는 처리 될 때까지 이러한 요소를 보유하는 버퍼를 가지고 있습니다. 고전적인 Rx.NET과 초기 RxJava에서 이러한 버퍼는 제한이 없었습니다. 즉, 예제에서 거의 백만 개의 요소를 모두 보유 할 가능성이 높습니다. 문제는 예를 들어 10 억 개의 요소가 있거나 프로그램에서 1000 만 개의 시퀀스가 1000 번 나타나면 OutOfMemoryError 이어지고 일반적으로 과도한 GC 오버 헤드로 인해 속도가 느려질 때 시작됩니다.
오류 처리가 일류 시민이 된 것과 onErrorXXX 연산자를 통해 onErrorXXX 연산자를 onErrorXXX 배압은 프로그래머가 onBackpressureXXX 연산자를 통해 생각하고 처리 onBackpressureXXX 데이터 흐름의 또 다른 속성입니다.
위의 PublishSubject 이외에도 기능상의 이유로 주로 배압을 지원하지 않는 다른 연산자가 있습니다. 예를 들어, 운전자 interval 은 주기적으로 값을 내고, 다시 압력을 가하면 벽 시계에 비례하여 시간이 이동합니다.
현대의 RxJava에서 대부분의 비동기 연산자는 위의 observeOn 과 같은 바운드 된 내부 버퍼를 observeOn 버퍼를 오버플로하려는 시도는 MissingBackpressureException 전체 시퀀스를 종료합니다. 각 작동기의 문서에는 배압 작동에 대한 설명이 있습니다.
그러나 배압은 정기적 인 콜드 시퀀스 ( MissingBackpressureException 이 발생하지 MissingBackpressureException )에서 더 세밀하게 MissingBackpressureException . 첫 번째 예제가 다시 작성된 경우 :
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
오류가없고 작은 메모리 사용으로 모든 것이 원활하게 실행됩니다. 그 이유는 많은 소스 연산자가 필요에 따라 값을 "생성"할 수 있기 때문에 연산자 observeOn 은 observeOn 버퍼가 오버플로없이 한 번에 유지할 수있는 많은 값을 생성 range 알릴 수 있습니다.
이 협상은 컴퓨터 루틴 공동 개념의 컴퓨터 과학 개념에 기반합니다 (저는 당신을 부릅니다, 당신은 저를 부름이라고합니다). 연산자 range 는 Producer 인터페이스 구현 형태의 콜백을 해당 (내부 Subscriber 의) setProducer 를 호출하여 observeOn 으로 observeOn . 대신에, observeOn 은 Producer.request(n) 에 값을 지정하여 range 에서 많은 추가 요소를 생성 (즉, onNext )하도록 onNext 합니다. 적절한 시간에 request 메소드를 호출하여 데이터가 흐르지 만 넘치지 않도록 올바른 값으로 호출하는 것은 observeOn 의 책임입니다.
최종 사용자의 배압 표현은 거의 필요하지 않습니다 (호출 스택 차단으로 인해 자연스럽게 발생하는 상류 및 배압에 대해 동기식이기 때문에). 그러나이를 이해하는 것이 더 쉬울 수 있습니다.
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
여기서 onStart 구현은 첫 번째 값을 생성하는 range 를 나타내며 그 값은 onNext 에서 수신됩니다. compute(int) 가 끝나면 range 에서 다른 값을 요청 range . 순진한 range 구현에서 그러한 호출은 재귀 적으로 onNext 호출하여 StackOverflowError 시키는 것은 당연히 바람직하지 않습니다.
이를 방지하기 위해 운영자는 이러한 재진입 호출을 방지하는 소위 트램폴린 (trampolining) 논리를 사용합니다. range 의 용어로는 onNext() 호출하는 동안 request(1) 호출이 있었고 onNext() 반환하면 다른 라운드를 만들고 다음 정수 값으로 onNext() 를 호출 onNext() 입니다. 따라서 두 개가 바뀌면 예제는 여전히 동일하게 작동합니다.
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
그러나 onStart 경우에는 그렇지 않습니다. Observable 인프라가 각 Subscriber 에서 최대 한 번 호출된다는 것을 보장하지만 request(1) 호출은 즉시 요소 방출을 트리거 할 수 있습니다. onNext 가 필요로하는 request(1) 호출 이후에 초기화 논리가 있으면 예외가 발생할 수 있습니다.
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
이 동기식의 경우, onStart 실행 중에 NullPointerException 가 즉시 슬로우됩니다. request(1) 대한 호출이 다른 스레드에서 onNext 에 대한 비동기 호출을 트리거하고 onNext 레이스에서 name 을 onNext onStart post request 에 쓰는 경우보다 미묘한 버그가 발생합니다.
따라서 onStart 에서 모든 필드 초기화를 수행해야하며 그 전에도 request() 호출해야합니다. 연산자의 request() 구현은 필요시 적절한 발생 전 관계 (또는 다른면에서는 메모리 해제 또는 전체 펜스)를 보장합니다.
onBackpressureXXX 연산자
대부분의 개발자는 MissingBackpressureException 인해 응용 프로그램이 실패 할 때 MissingBackpressureException 압력이 발생하며 일반적으로 예외는 observeOn 연산자를 가리 킵니다. 실제 원인은 일반적으로 PublishSubject , timer() 또는 interval() 또는 create() 를 통해 create() 사용자 정의 연산자의 PublishSubject 사용입니다.
이러한 상황을 다루는 여러 가지 방법이 있습니다.
버퍼 크기 늘리기
때로는 오버런이 폭발하는 원인으로 인해 발생합니다. 갑자기 사용자가 화면을 너무 빠르게 observeOn 안드로이드의 기본 16 개 요소 내부 버퍼가 오버플로를 observeOn 합니다.
최신 버전의 RxJava에서 대부분의 배압에 민감한 운영자는 이제 프로그래머가 내부 버퍼의 크기를 지정할 수 있습니다. 관련 매개 변수는 일반적으로 bufferSize , prefetch 또는 capacityHint 라고 capacityHint . 소개에서 넘치는 예제를 감안할 때 observeOn 의 버퍼 크기를 observeOn 모든 값에 대해 충분한 공간을 확보 할 수 있습니다.
PublishSubject<Integer> source = PublishSubject.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
.subscribe(e -> { }, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
그러나 소스가 예측 된 버퍼 크기를 과다하게 생성하는 경우에도 오버플로가 여전히 발생할 수 있으므로 일반적으로 임시 수정 일 수 있습니다. 이 경우 다음 연산자 중 하나를 사용할 수 있습니다.
표준 연산자로 값 일괄 처리 / 건너 뛰기
원본 데이터를 일괄 적으로보다 효율적으로 처리 할 수있는 경우 표준 일괄 처리 연산자 (크기 및 / 또는 시간 기준) 중 하나를 사용하여 MissingBackpressureException 의 가능성을 줄일 수 있습니다.
PublishSubject<Integer> source = PublishSubject.create();
source
.buffer(1024)
.observeOn(Schedulers.computation(), 1024)
.subscribe(list -> {
list.parallelStream().map(e -> e * e).first();
}, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
일부 값을 안전하게 무시할 수있는 경우 샘플링 (시간 또는 다른 Observable 포함)과 throttleFirst ( throttleFirst , throttleLast , throttleWithTimeout )를 사용할 수 있습니다.
PublishSubject<Integer> source = PublishSubject.create();
source
.sample(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation(), 1024)
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
이 연산자는 다운 스트림에 의한 값 수신 속도 만 줄이므로 MissingBackpressureException 이어질 수 있습니다.
onBackpressureBuffer ()
매개 변수없는 형식의이 연산자는 업스트림 소스와 다운 스트림 연산자 사이에 버퍼되지 않은 버퍼를 다시 도입합니다. 제한이 없다는 것은 JVM의 메모리가 부족하지 않으면 버스트 한 소스에서 나오는 거의 모든 양을 처리 할 수 있음을 의미합니다.
Observable.range(1, 1_000_000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation(), 8)
.subscribe(e -> { }, Throwable::printStackTrace);
이 예에서, observeOn 매우 낮은 버퍼 크기와 함께 간다 아직 없다 MissingBackpressureException 같은 onBackpressureBuffer 에 그것의 소량을 통해 모두 100 만 개 가치와 손을 젖은 observeOn .
그러나 onBackpressureBuffer 는 소스를 무한 방식으로 사용합니다. 즉, backpressure를 적용하지 않습니다. 이것은 range 와 같은 배압 지원 소스조차도 완전히 실현 될 것이라는 결과를 낳습니다.
onBackpressureBuffer 에는 4 가지 추가 오버로드가 onBackpressureBuffer
onBackpressureBuffer (int capacity)
버퍼가 지정된 용량에 도달 할 경우를 대비하여 BufferOverflowError 를 신호하는 제한된 버전입니다.
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
점점 더 많은 운영자가 버퍼 크기를 설정할 수있게됨에 따라이 연산자의 관련성은 감소하고 있습니다. 나머지는 onBackpressureBuffer 를 사용하여 더 큰 숫자를 onBackpressureBuffer 보다 "확장하여 내부 버퍼를 확장"할 수 있습니다.
onBackpressureBuffer (int capacity, Action0 onOverflow)
이 오버로드는 오버플로가 발생하면 (공유) 액션을 호출합니다. 현재 호출 스택보다 오버플로에 대해 제공된 다른 정보가 없기 때문에 유용성이 다소 제한적입니다.
onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflow.Strategy 전략)
이 과부하는 실제로 용량에 도달 한 경우 수행 할 작업을 정의하는대로 더 유용합니다. BackpressureOverflow.Strategy 는 실제로 인터페이스이지만 BackpressureOverflow 클래스는 일반적인 동작을 나타내는 4 개의 정적 필드와 구현 클래스를 제공합니다.
-
ON_OVERFLOW_ERROR: 이전의 두 가지 오버로드의 기본 동작으로,BufferOverflowExceptionON_OVERFLOW_ERROR. -
ON_OVERFLOW_DEFAULT: 현재는ON_OVERFLOW_ERROR와ON_OVERFLOW_ERROR -
ON_OVERFLOW_DROP_LATEST: 오버플로가 발생하면 현재 값이 무시되고 다운 스트림 요청이있을 때 이전 값만 전달됩니다. -
ON_OVERFLOW_DROP_OLDEST: 버퍼의 가장 오래된 요소를 삭제하고 현재 값을 버퍼에 추가합니다.
Observable.range(1, 1_000_000)
.onBackpressureBuffer(16, () -> { },
BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
마지막 두 가지 전략은 요소를 제거 할 때 스트림에서 불연속성을 유발한다는 점에 유의하십시오. 또한, 그들은 BufferOverflowException 신호하지 않습니다.
onBackpressureDrop ()
다운 스트림이 값을 수신 할 준비가되지 않을 때마다이 연산자는 시퀀스에서 해당 엘레멘트를 제거합니다. 하나의 전략을 ON_OVERFLOW_DROP_LATEST 전략으로 0 용량 onBackpressureBuffer 로 생각할 수 있습니다.
이 연산자는 나중에 최신 값이 있으므로 소스 (예 : 마우스 이동 또는 현재 GPS 위치 신호)의 값을 안전하게 무시할 수있을 때 유용합니다.
component.mouseMoves()
.onBackpressureDrop()
.observeOn(Schedulers.computation(), 1)
.subscribe(event -> compute(event.x, event.y));
소스 연산자 interval() 과 함께 유용 할 수 있습니다. 예를 들어 주기적 백그라운드 작업을 수행하려고하지만 각 반복이 해당 기간보다 오래 지속될 수있는 경우 나중에 간격이 초과 될 때 초과 간격 알림을 삭제하는 것이 안전합니다.
Observable.interval(1, TimeUnit.MINUTES)
.onBackpressureDrop()
.observeOn(Schedulers.io())
.doOnNext(e -> networkCall.doStuff())
.subscribe(v -> { }, Throwable::printStackTrace);
onBackpressureDrop(Action1<? super T> onDrop) 여기서 값은 삭제되고 (shared) 액션이 호출됩니다. 이 변형은 값 자체를 정리할 수 있습니다 (예 : 연관된 자원 해제).
onBackpressureLatest ()
최종 연산자는 최신 값만 유지하고 실제로 전달되지 않은 이전 값을 덮어 씁니다. 이것을 1의 용량과 ON_OVERFLOW_DROP_OLDEST 전략을 가진 onBackpressureBuffer 의 변형으로 생각할 수 있습니다.
onBackpressureDrop 와는 달리, 다운 스트림이 뒤쳐지면 항상 소비 할 수있는 값이 있습니다. 이것은 데이터가 일부 파열 패턴으로 나타날 수있는 원격 측정과 같은 상황에서 유용 할 수 있지만 매우 최신 데이터 만 처리에 흥미가 있습니다.
예를 들어, 사용자가 화면에서 많이 클릭하면, 우리는 여전히 최신 입력에 반응하고자합니다.
component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
이 경우에 onBackpressureDrop 을 사용하면 마지막 클릭이 끊어지고 사용자가 비즈니스 논리가 실행되지 않은 이유가 궁금해집니다.
역압 데이터 소스 만들기
라이브러리가 이미 Observable 에서 개발자의 배압을 처리하는 정적 메서드를 제공하기 때문에 일반적으로 배압을 처리 할 때 배압 데이터 소스를 만드는 것이 상대적으로 쉬운 작업입니다. 우리는 두 가지 종류의 공장 방식을 구별 할 수 있습니다. 다운 스트림 수요에 따라 요소를 반환하고 생성하는 차가운 "발전기"및 일반적으로 비 반응 및 / 또는 비방 향성 데이터 소스를 연결하는 뜨거운 "푸셔"및 그들.
다만
가장 기본적인 배압 인식 소스를 통해 생성되는 just :
Observable.just(1).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(0);
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
// the rest is omitted for brevity
}
우리는 onStart 에서 명시 적으로 요청하지 않기 때문에 아무 것도 인쇄하지 않습니다. just 우리가 순서를 점프 시작하고 싶습니다 상수 값이있을 때 유용합니다.
불행하게도, just 종종 동적으로 소비 할 수있는 뭔가를 계산하는 방법을 오해 Subscriber 들 :
int counter;
int computeValue() {
return ++counter;
}
Observable<Integer> o = Observable.just(computeValue());
o.subscribe(System.out:println);
o.subscribe(System.out:println);
놀랍게도, 1과 2를 각각 인쇄하는 대신에 1 번을 두 번 인쇄합니다. 호출이 다시 작성되면 작동하는 이유가 명확 해집니다.
int temp = computeValue();
Observable<Integer> o = Observable.just(temp);
computeValue 는 메인 루틴의 일부로 호출되며 구독하는 구독자에 대한 응답이 아닙니다.
fromCallable
사람들이 실제로 필요로하는 것은 메소드 fromCallable .
Observable<Integer> o = Observable.fromCallable(() -> computeValue());
여기서 computeValue 는 구독자가 구독하고 각각에 대해 예상 1과 2를 인쇄 할 때만 실행됩니다. 당연히 fromCallable 는 백 프레셔를 올바르게 지원하고 요청하지 않는 한 계산 된 값을 방출하지 않습니다. 그러나 계산은 어쨌든 발생합니다. 경우에 계산 자체는 다운 스트림이 실제로 우리가 사용할 수 요청할 때까지 연기해야 just 와 map :
Observable.just("This doesn't matter").map(ignored -> computeValue())...
just 그것이 결과에 매핑 될 때 요청 때까지 일정한 값을 방출하지 않습니다 computeValue 여전히 개별적으로 각 가입자 요구했다.
...에서
데이터 객체의 배열, 객체의리스트 또는 이미 사용 가능한 경우 Iterable 소스 각각 from 배압 및 발광 원을 처리 과부하 :
Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);
편의 (그리고 일반적인 배열 생성에 대한 경고를 방지)하기 위해 2~10 인수 과부하있다 just 내부적으로 위임 from .
from(Iterable) 은 또한 흥미로운 기회를 제공합니다. 많은 가치 창출은 상태 기계의 한 형태로 표현 될 수 있습니다. 요청 된 각 요소는 상태 전이와 반환 값 계산을 트리거합니다.
Iterable 같은 상태 머신을 작성하는 것은 Observable 을 쓰는 것보다 다소 복잡하지만 C #과 달리 컴파일러가 고전적인 코드를 작성하여 컴파일러에서 지원하지 않습니다 ( yield return 및 yield break ). 일부 라이브러리는 Google Guava의 AbstractIterable 및 IxJava의 Ix.generate() 및 Ix.forloop() 과 같은 일부 도움말을 제공합니다. 이들은 그래서 몇 가지 아주 기본적인 보자 스스로 전체 시리즈의 가치가 Iterable 몇 가지 상수 값을 무한정 반복 소스 :
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return 1;
}
};
Observable.from(iterable).take(5).subscribe(System.out::println);
classic for-loop를 통해 iterator 를 소비한다면 무한 루프가 발생합니다. Observable 을 구축하기 때문에 처음 5 개만 소비하고 나머지는 요청하지 않을 의지를 표현할 수 있습니다. 이것은 Observable 내부에서 느리게 평가하고 계산하는 진정한 힘입니다.
create (SyncOnSubscribe)
때로는 반응 적 세계로 변환 될 데이터 소스가 동기식 (차단)이고 당김 식입니다. 즉, 다음 데이터를 얻기 위해 일부 get 또는 read 메소드를 호출해야합니다. 물론 그것을 Iterable 으로 변환 할 수 있지만 이러한 소스가 자원과 관련되어있을 때 다운 스트림이 종료되기 전에 시퀀스를 등록 취소하면 해당 자원이 누출 될 수 있습니다.
이러한 경우를 처리하기 위해 RxJava에는 SyncOnSubscribe 클래스가 있습니다. 이를 확장하고 메소드를 구현하거나 람다 기반 팩토리 메소드 중 하나를 사용하여 인스턴스를 빌드 할 수 있습니다.
SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
() -> new FileInputStream("data.bin"),
(inputstream, output) -> {
try {
int byte = inputstream.read();
if (byte < 0) {
output.onCompleted();
} else {
output.onNext(byte);
}
} catch (IOException ex) {
output.onError(ex);
}
return inputstream;
},
inputstream -> {
try {
inputstream.close();
} catch (IOException ex) {
RxJavaHooks.onError(ex);
}
}
);
Observable<Integer> o = Observable.create(binaryReader);
일반적으로 SyncOnSubscribe 는 3 콜백을 사용합니다.
첫 번째 콜백을 사용하면 예제에서 FileInputStream 과 같이 구독자 별 상태를 만들 수 있습니다. 파일은 각 개별 가입자에게 독립적으로 열립니다.
두 번째 콜백은이 상태 객체를 사용하고 onXXX 메서드가 호출되어 값을 방출 할 수있는 출력 Observer 를 제공합니다. 이 콜백은 다운 스트림이 요청한 횟수만큼 실행됩니다. 호출 할 때마다 onNext 를 호출 onNext 하며, 선택적으로 onError 또는 onCompleted 가 하나만 따라야합니다. 이 예에서는, read 바이트가 부의 경우, 파일의 종료를 나타내는 onCompleted() 호출 해, read가 IOException 했을 경우에 onError 를 호출합니다.
최종 콜백은 다운 스트림 구독 취소 (입력 스트림 닫기) 또는 이전 콜백이 터미널 메서드를 호출 할 때 호출됩니다. 리소스를 확보 할 수 있습니다. 모든 소스가 이러한 모든 기능을 필요로하지는 않으므로 SyncOnSubscribe 의 정적 메서드를 사용하면 인스턴스없이 인스턴스를 만들 수 있습니다.
유감 스럽지만 JVM과 다른 라이브러리에서 많은 메서드 호출이 확인 된 예외를 throw하므로이 클래스에서 사용하는 기능 인터페이스는 확인 된 예외를 throw 할 수 없으므로 try-catch es에 래핑해야합니다.
물론 우리는 무한 범위와 같은 다른 일반적인 소스를 모방 할 수 있습니다.
SyncOnSubscribe.createStateful(
() -> 0,
(current, output) -> {
output.onNext(current);
return current + 1;
},
e -> { }
);
이 설정에서 current 는 0 시작하고 다음에 람다가 호출되면 매개 변수 current 가 1 유지됩니다.
의 변형이 SyncOnSubscribe 라는 AsyncOnSubscribe 중간 콜백 하류에서 요청 금액을 나타내는 long 값을 받아 콜백이 발생한다는 것을 제외하고 매우 비슷 Observable 동일한 길이는. 이 소스는 Observable 을 모두 하나의 시퀀스로 연결합니다.
AsyncOnSubscribe.createStateful(
() -> 0,
(state, requested, output) -> {
output.onNext(Observable.range(state, (int)requested));
return state + 1;
},
e -> { }
);
이 클래스의 유용성에 대한 지속적인 논의가 진행 중입니다. 일반적으로 권장되지는 않습니다. 왜냐하면 실제로 생성 된 값과 어떻게 반응할지, 심지어 어떤 종류의 요청 값을 받을지에 대한 기대치를 주기적으로 깨뜨리기 때문입니다. 보다 복잡한 소비자 시나리오.
생성 (이미 터)
Observable 래핑 될 소스가 이미 마우스 동작과 같이 뜨겁지 만 API에서는 역 압박이 불가능합니다 (비동기 네트워크 콜백과 같은 경우).
이러한 경우를 처리하기 위해 최신 버전의 RxJava에서 create(emitter) 팩토리 메소드를 도입했습니다. 두 개의 매개 변수가 필요합니다.
- 수신자마다
Emitter<T>인터페이스의 인스턴스로 호출 될 콜백, - 개발자가 적용될 배압 동작을 지정하도록 요구하는
Emitter.BackpressureMode열거 형입니다.onBackpressureXXX와 비슷한 일반적인 모드가 있습니다.MissingBackpressureException신호를MissingBackpressureException거나 단순히 내부의 오버플로를 무시하면됩니다.
현재는 이러한 배압 모드에 대한 추가 매개 변수를 지원하지 않습니다. 이러한 사용자 정의가 필요한 경우 배압 모드로 NONE 을 사용하고 결과 Observable 에 onBackpressureXXX 를 적용하면됩니다.
GUI 이벤트와 같은 푸시 기반 소스와 상호 작용할 때 사용되는 첫 번째 일반적인 경우입니다. 이러한 API에는 활용할 수있는 몇 가지 형태의 addListener / removeListener 호출이 있습니다.
Observable.create(emitter -> {
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
emitter.setCancellation(() ->
button.removeListener(al));
}, BackpressureMode.BUFFER);
Emitter 비교적 직관적입니다. 하나는 onNext , onError 및 onCompleted 를 호출 할 수 있으며 연산자는 배압 및 탈퇴 관리를 자체적으로 처리합니다. 또한 래핑 된 API가 취소 (예 : 리스너 제거)를 지원하는 경우 setCancellation (또는 Subscription 과 같은 자원에 대해 setSubscription )을 사용하여 다운 스트림 수신 거부 또는 onError / setCancellation 리소스가 호출 될 때 호출되는 취소 콜백을 등록 할 수 있습니다. onCompleted 는 제공된 Emitter 인스턴스에서 호출됩니다.
이 방법을 사용하면 한 번에 하나의 리소스 만 에미 터와 연결할 수 있으며 새 리소스를 설정하면 이전 리소스를 자동으로 구독 취소합니다. 복합 리소스를 처리해야하는 경우 CompositeSubscription 만들고이 이미 터와 연결 한 다음 추가 리소스를 CompositeSubscription 자체에 추가합니다.
Observable.create(emitter -> {
CompositeSubscription cs = new CompositeSubscription();
Worker worker = Schedulers.computation().createWorker();
ActionListener al = e -> {
emitter.onNext(e);
};
button.addActionListener(al);
cs.add(worker);
cs.add(Subscriptions.create(() ->
button.removeActionListener(al));
emitter.setSubscription(cs);
}, BackpressureMode.BUFFER);
두 번째 시나리오는 일반적으로 Observable 로 변환되어야하는 비동기 콜백 기반 API를 필요로합니다.
Observable.create(emitter -> {
someAPI.remoteCall(new Callback<Data>() {
@Override
public void onSuccess(Data data) {
emitter.onNext(data);
emitter.onCompleted();
}
@Override
public void onFailure(Exception error) {
emitter.onError(error);
}
});
}, BackpressureMode.LATEST);
이 경우 위임은 동일한 방식으로 작동합니다. 불행히도, 보통, 이러한 고전적인 콜백 스타일의 API는 취소를 지원하지 않지만, 그렇다면 previoius 예제에서와 같이 취소를 설정할 수 있습니다. LATEST 백 프레셔 모드를 사용하십시오. 하나의 값만 존재할 것이라는 것을 알게된다면 절대로 완전히 활용되지 않을 기본 128 개의 요소 버퍼 (필요에 따라 증가)를 할당하기 때문에 BUFFER 전략은 필요하지 않습니다.