수색…


소개

BackpressureObservable 프로세싱 파이프 라인에서 일부 비동기 스테이지가 값을 충분히 빠르게 처리 할 수 ​​없으며 업스트림 프로듀서에게 속도 저하를 지시 할 방법이 필요합니다.

배압이 필요한 전형적인 경우는 생산자가 뜨거운 출처 인 경우입니다.

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

이 예에서 주 스레드는 백그라운드 스레드에서이를 처리하는 최종 사용자에게 1 백만 항목을 생성합니다. compute(int) 메서드는 시간이 좀 걸릴 가능성이 있지만 Observable 연산자 체인의 오버 헤드가 항목 처리에 걸릴 시간을 더할 수 있습니다. 그러나 for 루프가있는 생성 스레드는이를 알 수 없으며 onNext ing을 유지합니다.

내부적으로, 비동기 연산자는 처리 될 때까지 이러한 요소를 보유하는 버퍼를 가지고 있습니다. 고전적인 Rx.NET과 초기 RxJava에서 이러한 버퍼는 제한이 없었습니다. 즉, 예제에서 거의 백만 개의 요소를 모두 보유 할 가능성이 높습니다. 문제는 예를 들어 10 억 개의 요소가 있거나 프로그램에서 1000 만 개의 시퀀스가 ​​1000 번 나타나면 OutOfMemoryError 이어지고 일반적으로 과도한 GC 오버 헤드로 인해 속도가 느려질 때 시작됩니다.

오류 처리가 일류 시민이 된 것과 onErrorXXX 연산자를 통해 onErrorXXX 연산자를 onErrorXXX 배압은 프로그래머가 onBackpressureXXX 연산자를 통해 생각하고 처리 onBackpressureXXX 데이터 흐름의 또 다른 속성입니다.

위의 PublishSubject 이외에도 기능상의 이유로 주로 배압을 지원하지 않는 다른 연산자가 있습니다. 예를 들어, 운전자 interval 은 주기적으로 값을 내고, 다시 압력을 가하면 벽 시계에 비례하여 시간이 이동합니다.

현대의 RxJava에서 대부분의 비동기 연산자는 위의 observeOn 과 같은 바운드 된 내부 버퍼를 observeOn 버퍼를 오버플로하려는 시도는 MissingBackpressureException 전체 시퀀스를 종료합니다. 각 작동기의 문서에는 배압 작동에 대한 설명이 있습니다.

그러나 배압은 정기적 인 콜드 시퀀스 ( MissingBackpressureException 이 발생하지 MissingBackpressureException )에서 더 세밀하게 MissingBackpressureException . 첫 번째 예제가 다시 작성된 경우 :

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

오류가없고 작은 메모리 사용으로 모든 것이 원활하게 실행됩니다. 그 이유는 많은 소스 연산자가 필요에 따라 값을 "생성"할 수 있기 때문에 연산자 observeOnobserveOn 버퍼가 오버플로없이 한 번에 유지할 수있는 많은 값을 생성 range 알릴 수 있습니다.

이 협상은 컴퓨터 루틴 공동 개념의 컴퓨터 과학 개념에 기반합니다 (저는 당신을 부릅니다, 당신은 저를 부름이라고합니다). 연산자 rangeProducer 인터페이스 구현 형태의 콜백을 해당 (내부 Subscriber 의) setProducer 를 호출하여 observeOn 으로 observeOn . 대신에, observeOnProducer.request(n) 에 값을 지정하여 range 에서 많은 추가 요소를 생성 (즉, onNext )하도록 onNext 합니다. 적절한 시간에 request 메소드를 호출하여 데이터가 흐르지 만 넘치지 않도록 올바른 값으로 호출하는 것은 observeOn 의 책임입니다.

최종 사용자의 배압 표현은 거의 필요하지 않습니다 (호출 스택 차단으로 인해 자연스럽게 발생하는 상류 및 배압에 대해 동기식이기 때문에). 그러나이를 이해하는 것이 더 쉬울 수 있습니다.

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

여기서 onStart 구현은 첫 번째 값을 생성하는 range 를 나타내며 그 값은 onNext 에서 수신됩니다. compute(int) 가 끝나면 range 에서 다른 값을 요청 range . 순진한 range 구현에서 그러한 호출은 재귀 적으로 onNext 호출하여 StackOverflowError 시키는 것은 당연히 바람직하지 않습니다.

이를 방지하기 위해 운영자는 이러한 재진입 호출을 방지하는 소위 트램폴린 (trampolining) 논리를 사용합니다. range 의 용어로는 onNext() 호출하는 동안 request(1) 호출이 있었고 onNext() 반환하면 다른 라운드를 만들고 다음 정수 값으로 onNext() 를 호출 onNext() 입니다. 따라서 두 개가 바뀌면 예제는 여전히 동일하게 작동합니다.

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

그러나 onStart 경우에는 그렇지 않습니다. Observable 인프라가 각 Subscriber 에서 최대 한 번 호출된다는 것을 보장하지만 request(1) 호출은 즉시 요소 방출을 트리거 할 수 있습니다. onNext 가 필요로하는 request(1) 호출 이후에 초기화 논리가 있으면 예외가 발생할 수 있습니다.

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

이 동기식의 경우, onStart 실행 중에 NullPointerException 가 즉시 슬로우됩니다. request(1) 대한 호출이 다른 스레드에서 onNext 에 대한 비동기 호출을 트리거하고 onNext 레이스에서 nameonNext onStart post request 에 쓰는 경우보다 미묘한 버그가 발생합니다.

따라서 onStart 에서 모든 필드 초기화를 수행해야하며 그 전에도 request() 호출해야합니다. 연산자의 request() 구현은 필요시 적절한 발생 전 관계 (또는 다른면에서는 메모리 해제 또는 전체 펜스)를 보장합니다.

onBackpressureXXX 연산자

대부분의 개발자는 MissingBackpressureException 인해 응용 프로그램이 실패 할 때 MissingBackpressureException 압력이 발생하며 일반적으로 예외는 observeOn 연산자를 가리 킵니다. 실제 원인은 일반적으로 PublishSubject , timer() 또는 interval() 또는 create() 를 통해 create() 사용자 정의 연산자의 PublishSubject 사용입니다.

이러한 상황을 다루는 여러 가지 방법이 있습니다.

버퍼 크기 늘리기

때로는 오버런이 폭발하는 원인으로 인해 발생합니다. 갑자기 사용자가 화면을 너무 빠르게 observeOn 안드로이드의 기본 16 개 요소 내부 버퍼가 오버플로를 observeOn 합니다.

최신 버전의 RxJava에서 대부분의 배압에 민감한 운영자는 이제 프로그래머가 내부 버퍼의 크기를 지정할 수 있습니다. 관련 매개 변수는 일반적으로 bufferSize , prefetch 또는 capacityHint 라고 capacityHint . 소개에서 넘치는 예제를 감안할 때 observeOn 의 버퍼 크기를 observeOn 모든 값에 대해 충분한 공간을 확보 할 수 있습니다.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

그러나 소스가 예측 된 버퍼 크기를 과다하게 생성하는 경우에도 오버플로가 여전히 발생할 수 있으므로 일반적으로 임시 수정 일 수 있습니다. 이 경우 다음 연산자 중 하나를 사용할 수 있습니다.

표준 연산자로 값 일괄 처리 / 건너 뛰기

원본 데이터를 일괄 적으로보다 효율적으로 처리 할 수있는 경우 표준 일괄 처리 연산자 (크기 및 / 또는 시간 기준) 중 하나를 사용하여 MissingBackpressureException 의 가능성을 줄일 수 있습니다.

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

일부 값을 안전하게 무시할 수있는 경우 샘플링 (시간 또는 다른 Observable 포함)과 throttleFirst ( throttleFirst , throttleLast , throttleWithTimeout )를 사용할 수 있습니다.

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

이 연산자는 다운 스트림에 의한 값 수신 속도 만 줄이므로 MissingBackpressureException 이어질 수 있습니다.

onBackpressureBuffer ()

매개 변수없는 형식의이 연산자는 업스트림 소스와 다운 스트림 연산자 사이에 버퍼되지 않은 버퍼를 다시 도입합니다. 제한이 없다는 것은 JVM의 메모리가 부족하지 않으면 버스트 한 소스에서 나오는 거의 모든 양을 처리 할 수 ​​있음을 의미합니다.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

이 예에서, observeOn 매우 낮은 버퍼 크기와 함께 간다 아직 없다 MissingBackpressureException 같은 onBackpressureBuffer 에 그것의 소량을 통해 모두 100 만 개 가치와 손을 젖은 observeOn .

그러나 onBackpressureBuffer 는 소스를 무한 방식으로 사용합니다. 즉, backpressure를 적용하지 않습니다. 이것은 range 와 같은 배압 지원 소스조차도 완전히 실현 될 것이라는 결과를 낳습니다.

onBackpressureBuffer 에는 4 가지 추가 오버로드가 onBackpressureBuffer

onBackpressureBuffer (int capacity)

버퍼가 지정된 용량에 도달 할 경우를 대비하여 BufferOverflowError 를 신호하는 제한된 버전입니다.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

점점 더 많은 운영자가 버퍼 크기를 설정할 수있게됨에 따라이 연산자의 관련성은 감소하고 있습니다. 나머지는 onBackpressureBuffer 를 사용하여 더 큰 숫자를 onBackpressureBuffer 보다 "확장하여 내부 버퍼를 확장"할 수 있습니다.

onBackpressureBuffer (int capacity, Action0 onOverflow)

이 오버로드는 오버플로가 발생하면 (공유) 액션을 호출합니다. 현재 호출 스택보다 오버플로에 대해 제공된 다른 정보가 없기 때문에 유용성이 다소 제한적입니다.

onBackpressureBuffer (int capacity, Action0 onOverflow, BackpressureOverflow.Strategy 전략)

이 과부하는 실제로 용량에 도달 한 경우 수행 할 작업을 정의하는대로 더 유용합니다. BackpressureOverflow.Strategy 는 실제로 인터페이스이지만 BackpressureOverflow 클래스는 일반적인 동작을 나타내는 4 개의 정적 필드와 구현 클래스를 제공합니다.

  • ON_OVERFLOW_ERROR : 이전의 두 가지 오버로드의 기본 동작으로, BufferOverflowException ON_OVERFLOW_ERROR .
  • ON_OVERFLOW_DEFAULT : 현재는 ON_OVERFLOW_ERRORON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : 오버플로가 발생하면 현재 값이 무시되고 다운 스트림 요청이있을 때 이전 값만 전달됩니다.
  • ON_OVERFLOW_DROP_OLDEST : 버퍼의 가장 오래된 요소를 삭제하고 현재 값을 버퍼에 추가합니다.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

마지막 두 가지 전략은 요소를 제거 할 때 스트림에서 불연속성을 유발한다는 점에 유의하십시오. 또한, 그들은 BufferOverflowException 신호하지 않습니다.

onBackpressureDrop ()

다운 스트림이 값을 수신 할 준비가되지 않을 때마다이 연산자는 시퀀스에서 해당 엘레멘트를 제거합니다. 하나의 전략을 ON_OVERFLOW_DROP_LATEST 전략으로 0 용량 onBackpressureBuffer 로 생각할 수 있습니다.

이 연산자는 나중에 최신 값이 있으므로 소스 (예 : 마우스 이동 또는 현재 GPS 위치 신호)의 값을 안전하게 무시할 수있을 때 유용합니다.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

소스 연산자 interval() 과 함께 유용 할 수 있습니다. 예를 들어 주기적 백그라운드 작업을 수행하려고하지만 각 반복이 해당 기간보다 오래 지속될 수있는 경우 나중에 간격이 초과 될 때 초과 간격 알림을 삭제하는 것이 안전합니다.

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

onBackpressureDrop(Action1<? super T> onDrop) 여기서 값은 삭제되고 (shared) 액션이 호출됩니다. 이 변형은 값 자체를 정리할 수 있습니다 (예 : 연관된 자원 해제).

onBackpressureLatest ()

최종 연산자는 최신 값만 유지하고 실제로 전달되지 않은 이전 값을 덮어 씁니다. 이것을 1의 용량과 ON_OVERFLOW_DROP_OLDEST 전략을 가진 onBackpressureBuffer 의 변형으로 생각할 수 있습니다.

onBackpressureDrop 와는 달리, 다운 스트림이 뒤쳐지면 항상 소비 할 수있는 값이 있습니다. 이것은 데이터가 일부 파열 패턴으로 나타날 수있는 원격 측정과 같은 상황에서 유용 할 수 있지만 매우 최신 데이터 만 처리에 흥미가 있습니다.

예를 들어, 사용자가 화면에서 많이 클릭하면, 우리는 여전히 최신 입력에 반응하고자합니다.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

이 경우에 onBackpressureDrop 을 사용하면 마지막 클릭이 끊어지고 사용자가 비즈니스 논리가 실행되지 않은 이유가 궁금해집니다.

역압 데이터 소스 만들기

라이브러리가 이미 Observable 에서 개발자의 배압을 처리하는 정적 메서드를 제공하기 때문에 일반적으로 배압을 처리 할 때 배압 데이터 소스를 만드는 것이 상대적으로 쉬운 작업입니다. 우리는 두 가지 종류의 공장 방식을 구별 할 수 있습니다. 다운 스트림 수요에 따라 요소를 반환하고 생성하는 차가운 "발전기"및 일반적으로 비 반응 및 / 또는 비방 향성 데이터 소스를 연결하는 뜨거운 "푸셔"및 그들.

다만

가장 기본적인 배압 인식 소스를 통해 생성되는 just :

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // the rest is omitted for brevity
}

우리는 onStart 에서 명시 적으로 요청하지 않기 때문에 아무 것도 인쇄하지 않습니다. just 우리가 순서를 점프 시작하고 싶습니다 상수 값이있을 때 유용합니다.

불행하게도, just 종종 동적으로 소비 할 수있는 뭔가를 계산하는 방법을 오해 Subscriber 들 :

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

놀랍게도, 1과 2를 각각 인쇄하는 대신에 1 번을 두 번 인쇄합니다. 호출이 다시 작성되면 작동하는 이유가 명확 해집니다.

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

computeValue 는 메인 루틴의 일부로 호출되며 구독하는 구독자에 대한 응답이 아닙니다.

fromCallable

사람들이 실제로 필요로하는 것은 메소드 fromCallable .

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

여기서 computeValue 는 구독자가 구독하고 각각에 대해 예상 1과 2를 인쇄 할 때만 실행됩니다. 당연히 fromCallable 는 백 프레셔를 올바르게 지원하고 요청하지 않는 한 계산 된 값을 방출하지 않습니다. 그러나 계산은 어쨌든 발생합니다. 경우에 계산 자체는 다운 스트림이 실제로 우리가 사용할 수 요청할 때까지 연기해야 justmap :

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just 그것이 결과에 매핑 될 때 요청 때까지 일정한 값을 방출하지 않습니다 computeValue 여전히 개별적으로 각 가입자 요구했다.

...에서

데이터 객체의 배열, 객체의리스트 또는 이미 사용 가능한 경우 Iterable 소스 각각 from 배압 및 발광 원을 처리 과부하 :

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

편의 (그리고 일반적인 배열 생성에 대한 경고를 방지)하기 위해 2~10 인수 과부하있다 just 내부적으로 위임 from .

from(Iterable) 은 또한 흥미로운 기회를 제공합니다. 많은 가치 창출은 상태 기계의 한 형태로 표현 될 수 있습니다. 요청 된 각 요소는 상태 전이와 반환 값 계산을 트리거합니다.

Iterable 같은 상태 머신을 작성하는 것은 Observable 을 쓰는 것보다 다소 복잡하지만 C #과 달리 컴파일러가 고전적인 코드를 작성하여 컴파일러에서 지원하지 않습니다 ( yield returnyield break ). 일부 라이브러리는 Google Guava의 AbstractIterable 및 IxJava의 Ix.generate()Ix.forloop() 과 같은 일부 도움말을 제공합니다. 이들은 그래서 몇 가지 아주 기본적인 보자 스스로 전체 시리즈의 가치가 Iterable 몇 가지 상수 값을 무한정 반복 소스 :

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

classic for-loop를 통해 iterator 를 소비한다면 무한 루프가 발생합니다. Observable 을 구축하기 때문에 처음 5 개만 소비하고 나머지는 요청하지 않을 의지를 표현할 수 있습니다. 이것은 Observable 내부에서 느리게 평가하고 계산하는 진정한 힘입니다.

create (SyncOnSubscribe)

때로는 반응 적 세계로 변환 될 데이터 소스가 동기식 (차단)이고 당김 식입니다. 즉, 다음 데이터를 얻기 위해 일부 get 또는 read 메소드를 호출해야합니다. 물론 그것을 Iterable 으로 변환 할 수 있지만 이러한 소스가 자원과 관련되어있을 때 다운 스트림이 종료되기 전에 시퀀스를 등록 취소하면 해당 자원이 누출 될 수 있습니다.

이러한 경우를 처리하기 위해 RxJava에는 SyncOnSubscribe 클래스가 있습니다. 이를 확장하고 메소드를 구현하거나 람다 기반 팩토리 메소드 중 하나를 사용하여 인스턴스를 빌드 할 수 있습니다.

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

일반적으로 SyncOnSubscribe 는 3 콜백을 사용합니다.

첫 번째 콜백을 사용하면 예제에서 FileInputStream 과 같이 구독자 별 상태를 만들 수 있습니다. 파일은 각 개별 가입자에게 독립적으로 열립니다.

두 번째 콜백은이 상태 객체를 사용하고 onXXX 메서드가 호출되어 값을 방출 할 수있는 출력 Observer 를 제공합니다. 이 콜백은 다운 스트림이 요청한 횟수만큼 실행됩니다. 호출 할 때마다 onNext 를 호출 onNext 하며, 선택적으로 onError 또는 onCompleted 가 하나만 따라야합니다. 이 예에서는, read 바이트가 부의 경우, 파일의 종료를 나타내는 onCompleted() 호출 해, read가 IOException 했을 경우에 onError 를 호출합니다.

최종 콜백은 다운 스트림 구독 취소 (입력 스트림 닫기) 또는 이전 콜백이 터미널 메서드를 호출 할 때 호출됩니다. 리소스를 확보 할 수 있습니다. 모든 소스가 이러한 모든 기능을 필요로하지는 않으므로 SyncOnSubscribe 의 정적 메서드를 사용하면 인스턴스없이 인스턴스를 만들 수 있습니다.

유감 스럽지만 JVM과 다른 라이브러리에서 많은 메서드 호출이 확인 된 예외를 throw하므로이 클래스에서 사용하는 기능 인터페이스는 확인 된 예외를 throw 할 수 없으므로 try-catch es에 래핑해야합니다.

물론 우리는 무한 범위와 같은 다른 일반적인 소스를 모방 할 수 있습니다.

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

이 설정에서 current0 시작하고 다음에 람다가 호출되면 매개 변수 current1 유지됩니다.

의 변형이 SyncOnSubscribe 라는 AsyncOnSubscribe 중간 콜백 하류에서 요청 금액을 나타내는 long 값을 받아 콜백이 발생한다는 것을 제외하고 매우 비슷 Observable 동일한 길이는. 이 소스는 Observable 을 모두 하나의 시퀀스로 연결합니다.

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

이 클래스의 유용성에 대한 지속적인 논의가 진행 중입니다. 일반적으로 권장되지는 않습니다. 왜냐하면 실제로 생성 된 값과 어떻게 반응할지, 심지어 어떤 종류의 요청 값을 받을지에 대한 기대치를 주기적으로 깨뜨리기 때문입니다. 보다 복잡한 소비자 시나리오.

생성 (이미 터)

Observable 래핑 될 소스가 이미 마우스 동작과 같이 뜨겁지 만 API에서는 역 압박이 불가능합니다 (비동기 네트워크 콜백과 같은 경우).

이러한 경우를 처리하기 위해 최신 버전의 RxJava에서 create(emitter) 팩토리 메소드를 도입했습니다. 두 개의 매개 변수가 필요합니다.

  • 수신자마다 Emitter<T> 인터페이스의 인스턴스로 호출 될 콜백,
  • 개발자가 적용될 배압 동작을 지정하도록 요구하는 Emitter.BackpressureMode 열거 형입니다. onBackpressureXXX 와 비슷한 일반적인 모드가 있습니다. MissingBackpressureException 신호를 MissingBackpressureException 거나 단순히 내부의 오버플로를 무시하면됩니다.

현재는 이러한 배압 모드에 대한 추가 매개 변수를 지원하지 않습니다. 이러한 사용자 정의가 필요한 경우 배압 모드로 NONE 을 사용하고 결과 ObservableonBackpressureXXX 를 적용하면됩니다.

GUI 이벤트와 같은 푸시 기반 소스와 상호 작용할 때 사용되는 첫 번째 일반적인 경우입니다. 이러한 API에는 활용할 수있는 몇 가지 형태의 addListener / removeListener 호출이 있습니다.

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

Emitter 비교적 직관적입니다. 하나는 onNext , onErroronCompleted 를 호출 할 수 있으며 연산자는 배압 및 탈퇴 관리를 자체적으로 처리합니다. 또한 래핑 된 API가 취소 (예 : 리스너 제거)를 지원하는 경우 setCancellation (또는 Subscription 과 같은 자원에 대해 setSubscription )을 사용하여 다운 스트림 수신 거부 또는 onError / setCancellation 리소스가 호출 될 때 호출되는 취소 콜백을 등록 할 수 있습니다. onCompleted 는 제공된 Emitter 인스턴스에서 호출됩니다.

이 방법을 사용하면 한 번에 하나의 리소스 만 에미 터와 연결할 수 있으며 새 리소스를 설정하면 이전 리소스를 자동으로 구독 취소합니다. 복합 리소스를 처리해야하는 경우 CompositeSubscription 만들고이 이미 터와 연결 한 다음 추가 리소스를 CompositeSubscription 자체에 추가합니다.

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

두 번째 시나리오는 일반적으로 Observable 로 변환되어야하는 비동기 콜백 기반 API를 필요로합니다.

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

이 경우 위임은 동일한 방식으로 작동합니다. 불행히도, 보통, 이러한 고전적인 콜백 스타일의 API는 취소를 지원하지 않지만, 그렇다면 previoius 예제에서와 같이 취소를 설정할 수 있습니다. LATEST 백 프레셔 모드를 사용하십시오. 하나의 값만 존재할 것이라는 것을 알게된다면 절대로 완전히 활용되지 않을 기본 128 개의 요소 버퍼 (필요에 따라 증가)를 할당하기 때문에 BUFFER 전략은 필요하지 않습니다.



Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow