apache-spark 튜토리얼
apache-spark 시작하기
수색…
비고
Apache Spark 는 속도, 사용 편의성 및 정교한 분석을 기반으로 구축 된 오픈 소스 대형 데이터 처리 프레임 워크입니다. 개발자는 대용량 데이터를 처리 할 때 메모리 사용 제한 및 / 또는 처리 시간이 너무 길 때이를 사용해야합니다.
또한 apache-spark 내의 큰 주제를 언급하고 관련 주제와 연결됩니다. apache-spark에 대한 문서가 새로운 것이므로 관련 주제의 초기 버전을 만들어야 할 수도 있습니다.
버전
번역 | 출시일 |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 2017-05-02 |
2.1.0 | 2016-12-28 |
2.0.1 | 2016-10-03 |
2.0.0 | 2016-07-26 |
1.6.0 | 2016-01-04 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 2014-02-02 |
0.8.0 | 2013-09-25 |
0.7.0 | 2013-02-27 |
0.6.0 | 2012-10-15 |
소개
프로토 타입 :
집합 (zeroValue, seqOp, combOp)
설명 :
aggregate()
사용하면 RDD를 사용하여 원본 RDD에 저장된 유형과 다른 유형의 단일 값을 생성 할 수 있습니다.
매개 변수 :
-
zeroValue
: 결과의 초기화 값을 원하는 형식으로zeroValue
합니다. -
seqOp
: RDD 레코드에 적용하려는 작업입니다. 파티션의 모든 레코드에 대해 한 번 실행됩니다. -
combOp
: 결과 객체 (모든 파티션에 하나씩)가 결합되는 방법을 정의합니다.
예 :
목록의 합과 그 목록의 길이를 계산하십시오. 결과를
(sum, length)
쌍으로 리턴하십시오.
Spark 셸에서 2 개의 파티션을 가진 4 개의 요소로 목록을 만듭니다.
listRDD = sc.parallelize([1,2,3,4], 2)
그런 다음 seqOp를 정의 하십시오 .
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
그런 다음 combOp를 정의 합니다 .
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
그런 다음 집계 됨 :
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
첫 번째 파티션에는 하위 목록 [1, 2]이 있습니다. 그러면 seqOp가 해당 목록의 각 요소에 적용되어 로컬 결과를 생성합니다. 결과를 로컬로 반영하는 첫 번째 파티션에만 (sum, length)
쌍을 만듭니다.
local_result
받는 초기화됩니다 zeroValue
매개 변수 aggregate()
와 함께 제공되었다. 예를 들어 (0, 0) 및 list_element
는 목록의 첫 번째 요소입니다.
0 + 1 = 1
0 + 1 = 1
국지 결과는 (1, 1)입니다. 이는 첫 x 째 요소 만 처리 한 후 첫 x 째 파티션에 대해 길이가 1이고 합이 1임을의 L합니다. local_result
는 (0, 0)에서 (1, 1)로 업데이트됩니다.
1 + 2 = 3
1 + 1 = 2
지역 결과는 이제 (1, 2)이며 첫 번째 파티션의 하위 목록에 다른 요소가 없으므로 첫 번째 파티션의 최종 결과가됩니다. 두 번째 파티션에 대해 동일한 작업을 수행하면 (7, 2)가 반환됩니다.
마지막 로컬 결과를 만들기 위해 각 로컬 결과에 combOp를 적용합니다.
(3,2) + (7,2) = (10, 4)
'그림'에 설명 된 예 :
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
변환 대 액션
스파크는 게으른 평가를 사용합니다. 그것은 실제로해야하지 않는 한 어떤 일도하지 않을 것임을 의미합니다. 이러한 접근 방식은 불필요한 메모리 사용을 피할 수있게 해 주므로 큰 데이터로 작업 할 수 있습니다.
변형 은 느리게 평가되고 실제 작업은 동작 이 발생할 때 발생합니다.
예:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
그래서,에 [1]
우리는 RDD라는 이름으로 파일을 읽을 스파크 말했다 lines
. 불꽃은 우리를 듣고 우리에게 말했다 : "예 나는 그것을 할 것"있지만, 사실 아직 파일을 읽을 수 없습니다.
[2]에서 우리는 파일의 내용에 시작시 error
가있는 줄이 있다고 가정하여 파일의 줄을 필터링하고 있습니다. 그래서 우리는 스파크에게 errors
라 불리는 새로운 RDD를 작성하라고 지시합니다. RDD lines
의 요소는 처음부터 error
라는 단어 error
있습니다.
이제 [3]
에서 우리는 Spark에 오류 를 계산 하도록 요청 합니다 . 즉 RDD가 errors
라고 부르는 요소의 수를 계산합니다. count()
은 결과 찾을 수 있도록, 스파크하는 선택의 여지를 남겨 두지 작업은,하지만, 실제로 작업을 할 count()
정수가됩니다.
결과적으로 [3]
에 도달하면 [1]
과 [2]
가 실제로 수행 될 것입니다. 즉 [3]
도달했을 때, 그 다음에 만 :
파일은
textFile()
에서textFile()
[1]
때문에).lines
은filter()
때문에 ([2]
때문에)[3]
때문에count()
가 실행됩니다[3]
디버그 팁 : Spark은 [3]
에 도달 할 때까지 아무런 실제 작업을 수행하지 않으므로 [1]
및 / 또는 [2]
오류가있는 경우 표시되지 않습니다. [3]
스파크가 실제 작업을 시작합니다. 예를 들어 파일의 데이터가 내가 사용한 startsWith()
지원하지 않는다면, [2]
는 Spark에 의해 적절히 받아 들여질 것이고 아무런 에러도 발생시키지 않을 것입니다.하지만 [3]
이 제출되고 실제로 Spark가 실행되면 평가 모두 [1]
과 [2]
, 다음 만 그때는 뭔가에 정확하지 않은 것을 이해할 수있을 것이다 [2]
및 설명 오류가 발생합니다.
그 결과, 오류가 때 트리거 할 수있다 [3]
실행,하지만 오류가 계산서에 거짓말을해야한다는 것을 의미하지 않는다 [3]
!
lines
이나 errors
는 [3]
이후에 메모리에 저장되지 않습니다. 그들은 일련의 처리 명령으로서 만 존재할 것입니다. 이러한 RDD 중 하나에서 여러 작업이 수행되는 경우 spark는 데이터를 여러 번 읽고 필터링합니다. 단일 RDD에서 여러 작업을 수행 할 때 작업이 중복되지 않도록하려면 cache
사용하여 메모리에 데이터를 저장하는 것이 좋습니다.
Spark 문서 에서 더 많은 변형 / 동작을 볼 수 있습니다.
스파크 버전 확인
spark-shell
:
sc.version
일반적으로 프로그램에서 :
SparkContext.version
spark-submit
:
spark-submit --version