apache-spark
Schließt sich an
Suche…
Bemerkungen
Beachten Sie Ihre Ressourcen im Vergleich zur Größe der Daten, denen Sie beitreten. An diesem Punkt schlägt der Spark-Join-Code möglicherweise fehl, was zu Speicherfehlern führt. Stellen Sie daher sicher, dass Sie Ihre Spark-Jobs abhängig von der Datengröße wirklich gut konfigurieren. Das folgende Beispiel zeigt eine Konfiguration für einen Join von 1,5 bis 200 Millionen.
Spark-Shell verwenden
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Spark Submit verwenden
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Broadcast Hash Join in Spark
Ein Broadcast-Join kopiert die kleinen Daten auf die Arbeiterknoten, was zu einem hocheffizienten und superschnellen Join führt. Wenn wir zwei Datasets verbinden und eines der Datasets viel kleiner ist als das andere (z. B. wenn das kleine Dataset in den Speicher passen kann), sollten wir einen Broadcast Hash Join verwenden.
Das folgende Bild visualisiert einen Broadcast-Hash-Join, bei dem das kleine Dataset an jede Partition des Large Dataset gesendet wird.
Im Folgenden finden Sie ein Codebeispiel, das Sie leicht implementieren können, wenn Sie ein ähnliches Szenario für einen großen und einen kleinen Datensatzsatz haben.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")