サーチ…


備考

注目すべきことの1つは、参加するデータのサイズに対するリソースです。これはSpark Joinコードが失敗してメモリエラーが発生する場所です。このため、データサイズに応じてSparkのジョブを実際に設定するようにしてください。以下は、150万〜200百万人の参加のための構成例です。

スパークシェルの使用

spark-shell   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10   

Spark Submitを使用する

spark-submit   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10 code.jar 

Sparkのブロードキャストハッシュ結合

ブロードキャスト結合は、小さなデータをワーカー・ノードにコピーし、非常に効率的で超高速な結合につながります。 2つのデータセットを結合し、データセットの1つが他のデータセットよりもはるかに小さい場合(小さなデータセットがメモリに収まる場合など)、ブロードキャストハッシュ結合を使用する必要があります。

以下の画像は、Large DataSetの各パーティションに小さなデータセットがブロードキャストされるBroadcast Hash Joinを視覚化します。

ここに画像の説明を入力

以下は、大規模データセットと小規模データセットの同様のシナリオがある場合に簡単に実装できるコードサンプルです。

case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
 
val small = sc.textFile("/datasource")
 
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
 
val lg_data = sc.textFile("/datasource")
 
case class LargeData(col1: Int, col2: String, col3: Int)
 
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
 
 
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow