수색…


비고

파티션 수는 응용 프로그램의 성능 및 / 또는 성공적인 종료에 중요합니다.

Resilient Distributed Dataset (RDD)은 Spark의 주요 추상화입니다. RDD는 파티션으로 분할됩니다. 즉, 파티션이 데이터 세트의 일부이거나 슬라이스이거나 다른 말로하면 그 파티션입니다.

파티션 수가 많을수록 각 파티션의 크기가 작아집니다.

그러나 많은 수의 파티션이 Hadoop 분산 파일 시스템 (HDFS)에 많은 압력을 가하고 있으며 상당한 양의 메타 데이터를 유지해야합니다.

파티션의 수는 메모리 사용과 관련이 있으며 memoryOverhead 문제는이 숫자와 관련 될 수 있습니다 ( 개인 경험 ).


새로운 사용자에게 공통적 인 함정 은 RDD를 하나의 파티션만으로 RDD로 변환하는 것입니다. 일반적으로 다음과 같습니다.

data = sc.textFile(file)
data = data.coalesce(1) 

Spark이 모든 데이터 를 단 하나의 파티션이라고 말하기 때문에 대개 매우 나쁜 생각입니다! 기억:

Spark의 스테이지는 한 번에 하나의 파티션에서 작동하며 (해당 파티션의 데이터를 메모리로로드합니다).

결과적으로 Spark에 모든 데이터를 한 번에 처리하도록 지시하면 메모리 관련 오류 (예 : 메모리 부족) 또는 널 포인터 예외가 발생합니다.

따라서, 무엇을하고 있는지 알지 못하는 한, 하나의 파티션에서 RDD를 다시 파티션하지 마십시오!

파티션 소개

RDD는 어떻게 분할됩니까?

기본적으로 파티션은 각 HDFS 파티션에 대해 만들어지며 기본적으로 64MB입니다. 자세한 내용은 여기를 참조 하십시오 .

파티션간에 데이터의 균형을 유지하는 방법은 무엇입니까?

먼저 데이터를 다시 분할 할 수있는 세 가지 방법을 살펴보십시오.

  1. RDD에 필요한 최소 파티션 수인 두 번째 매개 변수를 textFile () 에 전달하지만주의해야합니다.

    [14]에서 : lines = sc.textFile ( "data")

    In [15] : lines.getNumPartitions () Out [15] : 1000

    [16]에서 : lines = sc.textFile ( "data", 500)

    [17]에서 : lines.getNumPartitions () Out [17] : 1434

    [18]에서 : lines = sc.textFile ( "data", 5000)

    [19]에서 : lines.getNumPartitions () Out [19] : 5926

보시다시피, [16] 은 RDD가 가진 파티션의 수는 이미 우리가 요청한 파티션의 최소 수보다 더 크기 때문에 예상 할 수있는 것을하지 않습니다.

  1. 다음과 같이 repartition ()을 사용하십시오.

    에서 [22] : lines = lines.repartition (10)

    [23]에서 : lines.getNumPartitions () Out [23] : 10

경고 : 셔플을 호출하므로 RDD가있는 파티션의 수를 늘리려 는 경우 사용해야합니다.

문서에서 :

셔플은 파티션간에 그룹화되도록 데이터를 다시 배포하는 Spark의 메커니즘입니다. 이것은 전형적으로 집행자와 기계간에 데이터를 복사하여 셔플을 복잡하고 값 비싼 작업으로 만듭니다.

  1. 다음과 같이 coalesce ()를 사용하십시오.

    [25]에서 : lines = lines.coalesce (2)

    [26]에서 : lines.getNumPartitions () Out [26] : 2

여기에서 Spark는 RDD를 축소하고이를 이용한다는 것을 알고 있습니다. 재분할 () vs coalesce () 에 대해 자세히 알아보십시오.


하지만이 모든 것이 데이터가 파티션 전체에서 완벽하게 균형을 유지하도록 보장 할 것입니까? 실제로 파티션에서 데이터의 균형을 유지하는 방법에 대해 경험했던 것처럼 ?

RDD의 파티션

"설명"에서 언급했듯이 파티션은 RDD의 파트 / 슬라이스 / 청크입니다. 다음은 RDD에 필요한 최소 파티션 수를 요청하는 방법에 대한 최소 예입니다.

In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

In [2]: mylistRDD.getNumPartitions()
Out[2]: 2

[1]에서 parallelize() 의 두 번째 매개 변수로 2를 어떻게 전달했는지 주목하십시오. 이 매개 변수는 RDD에 최소 2 개의 파티션이 있어야한다고 말합니다.

RDD 재배치

때로는 우리가 만든 파일이 아니기 때문에 예를 들어 RDD를 다시 파티션하려는 경우가 있으며 제작자가 정의한 파티션 수는 원하는 것이 아닙니다.

이를 위해 가장 잘 알려진 두 가지 기능은 다음과 같습니다.

repartition(numPartitions)

과:

coalesce(numPartitions, shuffle=False)

일반적으로 더 많은 수의 파티션에서 RDD를 다시 파티션하려는 경우 첫 번째 파티션을 사용하고 적은 수의 파티션에서 RDD를 줄이려면 두 번째 파티션을 사용하십시오. spark - repartition () vs coalesce () .

예 :

data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions

이 RDD가 textFile() 의해 textFile() 졌을 때 100 개가 넘는 파티션을 가지고 있다면 '데이터'라는 RDD의 파티션 수를 100으로 textFile() 입니다.

비슷한 방식으로, RDD에 현재 파티션 수 이상을 갖고 싶다면 다음과 같이 할 수 있습니다 (예 : RDD가 200 개의 파티션에 분산되어있는 경우).

data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions

파티션 수에 대한 Thumb 규칙

엄지 손가락의 규칙으로, 그의 RDD가 실행자의 수와 사용 된 코어의 수를 3 (또는 아마 4) 곱한만큼의 파티션을 가지기를 원할 것입니다. 물론 이것은 경험적이며 응용 프로그램, 데이터 집합 및 클러스터 구성에 따라 달라집니다.

예:

In [1]: data  = sc.textFile(file)

In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))

In [3]: data = data.coalesce(total_cores * 3)      

RDD 컨텐츠 표시

RDD의 내용을 보려면 인쇄해야합니다.

myRDD.foreach(println)

인쇄 할 행 수를 제한하려면 다음과 같이하십시오.

myRDD.take(num_of_rows).foreach(println)


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow