Sök…


Anmärkningar

En sak att notera är dina resurser jämfört med storleken på data du går med. Det är här din Spark Join-kod kan misslyckas, vilket ger dig minnesfel. Se därför till att du konfigurerar dina Spark-jobb riktigt bra beroende på datastorlek. Följande är ett exempel på en konfiguration för en koppling mellan 1,5 och 200 miljoner.

Använda gnistskal

spark-shell   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10   

Använda Spark Submit

spark-submit   --executor-memory 32G   --num-executors 80  --driver-memory 10g --executor-cores 10 code.jar 

Broadcast Hash Gå med i gnista

En sändningskoppling kopierar de små uppgifterna till arbetarnas noder vilket leder till en mycket effektiv och supersnabb anslutning. När vi går med i två datasätt och en av datasätten är mycket mindre än den andra (t.ex. när det lilla datasettet kan passa in i minnet), bör vi använda en Broadcast Hash Join.

Följande bild visualiserar en Broadcast Hash-sammankoppling där det lilla datasettet sänds till varje partition i Large Dataset.

ange bildbeskrivning här

Följande är kodprov som du enkelt kan implementera om du har ett liknande scenario med ett stort och litet datasätt.

case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
 
val small = sc.textFile("/datasource")
 
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
 
val lg_data = sc.textFile("/datasource")
 
case class LargeData(col1: Int, col2: String, col3: Int)
 
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
 
 
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow