apache-spark
結合
サーチ…
備考
注目すべきことの1つは、参加するデータのサイズに対するリソースです。これはSpark Joinコードが失敗してメモリエラーが発生する場所です。このため、データサイズに応じてSparkのジョブを実際に設定するようにしてください。以下は、150万〜200百万人の参加のための構成例です。
スパークシェルの使用
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Spark Submitを使用する
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Sparkのブロードキャストハッシュ結合
ブロードキャスト結合は、小さなデータをワーカー・ノードにコピーし、非常に効率的で超高速な結合につながります。 2つのデータセットを結合し、データセットの1つが他のデータセットよりもはるかに小さい場合(小さなデータセットがメモリに収まる場合など)、ブロードキャストハッシュ結合を使用する必要があります。
以下の画像は、Large DataSetの各パーティションに小さなデータセットがブロードキャストされるBroadcast Hash Joinを視覚化します。
以下は、大規模データセットと小規模データセットの同様のシナリオがある場合に簡単に実装できるコードサンプルです。
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")
Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow