수색…


비고

참고해야 할 것은 리소스에 대한 데이터 크기와 가입하는 데이터의 크기입니다. 이것은 Spark Join 코드가 메모리 오류를 일으키는 데 실패 할 수있는 곳입니다. 이러한 이유로 데이터 크기에 따라 Spark 작업을 실제로 구성해야합니다. 다음은 1.5 백만에서 2 억까지의 조인 구성의 예입니다.

스파크 셸 사용

spark-shell   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10   

Spark Submit 사용

spark-submit   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10 code.jar 

Spark의 Broadcast Hash Join

브로드 캐스트 조인은 작은 데이터를 작업자 노드로 복사하므로 매우 효율적이며 초고속 조인이 가능합니다. 우리가 두 개의 데이터 세트에 합류하고 하나의 데이터 세트가 다른 데이터 세트보다 훨씬 작 으면 (예 : 작은 데이터 세트가 메모리에 맞을 수있는 경우), 우리는 브로드 캐스트 해시 결합을 사용해야합니다.

다음 이미지는 Large DataSet의 각 파티션에 작은 데이터 세트가 브로드 캐스트 해시 조인을 시각화합니다.

여기에 이미지 설명을 입력하십시오.

다음은 크고 작은 데이터 집합 조인과 비슷한 시나리오가있는 경우 쉽게 구현할 수있는 코드 샘플입니다.

case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
 
val small = sc.textFile("/datasource")
 
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
 
val lg_data = sc.textFile("/datasource")
 
case class LargeData(col1: Int, col2: String, col3: Int)
 
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
 
 
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow