apache-spark
Łączy się
Szukaj…
Uwagi
Należy zwrócić uwagę na zasoby i rozmiar danych, do których dołączasz. W tym przypadku kod Spark Join może zawieść, powodując błędy pamięci. Z tego powodu upewnij się, że dobrze skonfigurowałeś swoje zadania Spark w zależności od wielkości danych. Poniżej znajduje się przykład konfiguracji połączenia od 1,5 miliona do 200 milionów.
Korzystanie z Spark-Shell
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Korzystanie z Spark Prześlij
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Transmisja Hash Dołącz w Spark
Sprzężenie rozgłoszeniowe kopiuje małe dane do węzłów roboczych, co prowadzi do bardzo wydajnego i superszybkiego łączenia. Kiedy łączymy dwa zestawy danych, a jeden z nich jest znacznie mniejszy od drugiego (np. Gdy mały zestaw danych może zmieścić się w pamięci), powinniśmy użyć funkcji mieszania transmisji rozgłoszeniowej.
Poniższy obraz przedstawia połączenie Hash emisji, gdzie mały zestaw danych jest emitowany do każdej partycji dużego zestawu danych.
Poniżej znajduje się przykładowy kod, który możesz łatwo wdrożyć, jeśli masz podobny scenariusz łączenia dużego i małego zestawu danych.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")