apache-spark
Si unisce
Ricerca…
Osservazioni
Una cosa da notare sono le tue risorse rispetto alla dimensione dei dati che stai aggiungendo. È qui che il codice Spark Join potrebbe non riuscire a darti errori di memoria. Per questo motivo, assicurati di configurare correttamente i tuoi lavori Spark in base alla dimensione dei dati. Di seguito è riportato un esempio di configurazione per un join da 1,5 a 200 milioni.
Usando Spark-Shell
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Utilizzando Spark Invia
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Broadcast Hash Join in Spark
Un broadcast join copia i piccoli dati sui nodi worker che porta a un join altamente efficiente e superveloce. Quando si uniscono due serie di dati e uno dei set di dati è molto più piccolo dell'altro (ad esempio quando il set di dati di piccole dimensioni può essere contenuto nella memoria), allora dovremmo usare un Broadcast Hash Join.
L'immagine seguente visualizza un broadcast hash join quando il piccolo set di dati viene trasmesso a ciascuna partizione del set di dati di grandi dimensioni.
Di seguito è riportato un esempio di codice che è possibile implementare facilmente se si dispone di uno scenario simile di join di set di dati grandi e piccoli.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")