Szukaj…


Uwagi

Należy zwrócić uwagę na zasoby i rozmiar danych, do których dołączasz. W tym przypadku kod Spark Join może zawieść, powodując błędy pamięci. Z tego powodu upewnij się, że dobrze skonfigurowałeś swoje zadania Spark w zależności od wielkości danych. Poniżej znajduje się przykład konfiguracji połączenia od 1,5 miliona do 200 milionów.

Korzystanie z Spark-Shell

spark-shell   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10   

Korzystanie z Spark Prześlij

spark-submit   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10 code.jar 

Transmisja Hash Dołącz w Spark

Sprzężenie rozgłoszeniowe kopiuje małe dane do węzłów roboczych, co prowadzi do bardzo wydajnego i superszybkiego łączenia. Kiedy łączymy dwa zestawy danych, a jeden z nich jest znacznie mniejszy od drugiego (np. Gdy mały zestaw danych może zmieścić się w pamięci), powinniśmy użyć funkcji mieszania transmisji rozgłoszeniowej.

Poniższy obraz przedstawia połączenie Hash emisji, gdzie mały zestaw danych jest emitowany do każdej partycji dużego zestawu danych.

wprowadź opis zdjęcia tutaj

Poniżej znajduje się przykładowy kod, który możesz łatwo wdrożyć, jeśli masz podobny scenariusz łączenia dużego i małego zestawu danych.

case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
 
val small = sc.textFile("/datasource")
 
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
 
val lg_data = sc.textFile("/datasource")
 
case class LargeData(col1: Int, col2: String, col3: Int)
 
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
 
 
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow