apache-spark
조인
수색…
비고
참고해야 할 것은 리소스에 대한 데이터 크기와 가입하는 데이터의 크기입니다. 이것은 Spark Join 코드가 메모리 오류를 일으키는 데 실패 할 수있는 곳입니다. 이러한 이유로 데이터 크기에 따라 Spark 작업을 실제로 구성해야합니다. 다음은 1.5 백만에서 2 억까지의 조인 구성의 예입니다.
스파크 셸 사용
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Spark Submit 사용
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Spark의 Broadcast Hash Join
브로드 캐스트 조인은 작은 데이터를 작업자 노드로 복사하므로 매우 효율적이며 초고속 조인이 가능합니다. 우리가 두 개의 데이터 세트에 합류하고 하나의 데이터 세트가 다른 데이터 세트보다 훨씬 작 으면 (예 : 작은 데이터 세트가 메모리에 맞을 수있는 경우), 우리는 브로드 캐스트 해시 결합을 사용해야합니다.
다음 이미지는 Large DataSet의 각 파티션에 작은 데이터 세트가 브로드 캐스트 해시 조인을 시각화합니다.
다음은 크고 작은 데이터 집합 조인과 비슷한 시나리오가있는 경우 쉽게 구현할 수있는 코드 샘플입니다.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")
Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow