수색…


비고

Apache Spark 는 속도, 사용 편의성 및 정교한 분석을 기반으로 구축 된 오픈 소스 대형 데이터 처리 프레임 워크입니다. 개발자는 대용량 데이터를 처리 할 때 메모리 사용 제한 및 / 또는 처리 시간이 너무 길 때이를 사용해야합니다.


또한 apache-spark 내의 큰 주제를 언급하고 관련 주제와 연결됩니다. apache-spark에 대한 문서가 새로운 것이므로 관련 주제의 초기 버전을 만들어야 할 수도 있습니다.

버전

번역 출시일
2.2.0 2017-07-11
2.1.1 2017-05-02
2.1.0 2016-12-28
2.0.1 2016-10-03
2.0.0 2016-07-26
1.6.0 2016-01-04
1.5.0 2015-09-09
1.4.0 2015-06-11
1.3.0 2015-03-13
1.2.0 2014-12-18
1.1.0 2014-09-11
1.0.0 2014-05-30
0.9.0 2014-02-02
0.8.0 2013-09-25
0.7.0 2013-02-27
0.6.0 2012-10-15

소개

프로토 타입 :

집합 (zeroValue, seqOp, combOp)

설명 :

aggregate() 사용하면 RDD를 사용하여 원본 RDD에 저장된 유형과 다른 유형의 단일 값을 생성 할 수 있습니다.

매개 변수 :

  1. zeroValue : 결과의 초기화 값을 원하는 형식으로 zeroValue 합니다.
  2. seqOp : RDD 레코드에 적용하려는 작업입니다. 파티션의 모든 레코드에 대해 한 번 실행됩니다.
  3. combOp : 결과 객체 (모든 파티션에 하나씩)가 결합되는 방법을 정의합니다.

:

목록의 합과 그 목록의 길이를 계산하십시오. 결과를 (sum, length) 쌍으로 리턴하십시오.

Spark 셸에서 2 개의 파티션을 가진 4 개의 요소로 목록을 만듭니다.

listRDD = sc.parallelize([1,2,3,4], 2)

그런 다음 seqOp를 정의 하십시오 .

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

그런 다음 combOp를 정의 합니다 .

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

그런 다음 집계 됨 :

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

첫 번째 파티션에는 하위 목록 [1, 2]이 있습니다. 그러면 seqOp가 해당 목록의 각 요소에 적용되어 로컬 결과를 생성합니다. 결과를 로컬로 반영하는 첫 번째 파티션에만 (sum, length) 쌍을 만듭니다.

local_result 받는 초기화됩니다 zeroValue 매개 변수 aggregate() 와 함께 제공되었다. 예를 들어 (0, 0) 및 list_element 는 목록의 첫 번째 요소입니다.

0 + 1 = 1
0 + 1 = 1

국지 결과는 (1, 1)입니다. 이는 첫 x 째 요소 처리 한 후 첫 x 째 파티션에 대해 길이가 1이고 합이 1임을의 L합니다. local_result 는 (0, 0)에서 (1, 1)로 업데이트됩니다.

1 + 2 = 3
1 + 1 = 2

지역 결과는 이제 (1, 2)이며 첫 번째 파티션의 하위 목록에 다른 요소가 없으므로 첫 번째 파티션의 최종 결과가됩니다. 두 번째 파티션에 대해 동일한 작업을 수행하면 (7, 2)가 반환됩니다.

마지막 로컬 결과를 만들기 위해 각 로컬 결과에 combOp를 적용합니다.

(3,2) + (7,2) = (10, 4)

'그림'에 설명 된 예 :

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

변환 대 액션

스파크는 게으른 평가를 사용합니다. 그것은 실제로해야하지 않는 한 어떤 일도하지 않을 것임을 의미합니다. 이러한 접근 방식은 불필요한 메모리 사용을 피할 수있게 해 주므로 큰 데이터로 작업 할 수 있습니다.

변형 은 느리게 평가되고 실제 작업은 동작 이 발생할 때 발생합니다.

예:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

그래서,에 [1] 우리는 RDD라는 이름으로 파일을 읽을 스파크 말했다 lines . 불꽃은 우리를 듣고 우리에게 말했다 : "예 나는 그것을 할 것"있지만, 사실 아직 파일을 읽을 수 없습니다.

[2]에서 우리는 파일의 내용에 시작시 error 가있는 줄이 있다고 가정하여 파일의 줄을 필터링하고 있습니다. 그래서 우리는 스파크에게 errors 라 불리는 새로운 RDD를 작성하라고 지시합니다. RDD lines 의 요소는 처음부터 error 라는 단어 error 있습니다.

이제 [3] 에서 우리는 Spark에 오류계산 하도록 요청 합니다 . 즉 RDD가 errors 라고 부르는 요소의 수를 계산합니다. count() 은 결과 찾을 수 있도록, 스파크하는 선택의 여지를 남겨 두지 작업은,하지만, 실제로 작업을 할 count() 정수가됩니다.

결과적으로 [3] 에 도달하면 [1][2] 가 실제로 수행 될 것입니다. 즉 [3] 도달했을 때, 그 다음에 만 :

  1. 파일은 textFile() 에서 textFile() [1] 때문에).

  2. linesfilter() 때문에 ( [2] 때문에)

  3. [3] 때문에 count() 가 실행됩니다 [3]


디버그 팁 : Spark은 [3] 에 도달 할 때까지 아무런 실제 작업을 수행하지 않으므로 [1] 및 / 또는 [2] 오류가있는 경우 표시되지 않습니다. [3] 스파크가 실제 작업을 시작합니다. 예를 들어 파일의 데이터가 내가 사용한 startsWith() 지원하지 않는다면, [2] 는 Spark에 의해 적절히 받아 들여질 것이고 아무런 에러도 발생시키지 않을 것입니다.하지만 [3] 이 제출되고 실제로 Spark가 실행되면 평가 모두 [1][2] , 다음 만 그때는 뭔가에 정확하지 않은 것을 이해할 수있을 것이다 [2] 및 설명 오류가 발생합니다.

그 결과, 오류가 때 트리거 할 수있다 [3] 실행,하지만 오류가 계산서에 거짓말을해야한다는 것을 의미하지 않는다 [3] !

lines 이나 errors[3] 이후에 메모리에 저장되지 않습니다. 그들은 일련의 처리 명령으로서 만 존재할 것입니다. 이러한 RDD 중 하나에서 여러 작업이 수행되는 경우 spark는 데이터를 여러 번 읽고 필터링합니다. 단일 RDD에서 여러 작업을 수행 할 때 작업이 중복되지 않도록하려면 cache 사용하여 메모리에 데이터를 저장하는 것이 좋습니다.


Spark 문서 에서 더 많은 변형 / 동작을 볼 수 있습니다.

스파크 버전 확인

spark-shell :

sc.version

일반적으로 프로그램에서 :

SparkContext.version

spark-submit :

 spark-submit --version


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow