apache-spark
Fogar
Sök…
Anmärkningar
En sak att notera är dina resurser jämfört med storleken på data du går med. Det är här din Spark Join-kod kan misslyckas, vilket ger dig minnesfel. Se därför till att du konfigurerar dina Spark-jobb riktigt bra beroende på datastorlek. Följande är ett exempel på en konfiguration för en koppling mellan 1,5 och 200 miljoner.
Använda gnistskal
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Använda Spark Submit
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Broadcast Hash Gå med i gnista
En sändningskoppling kopierar de små uppgifterna till arbetarnas noder vilket leder till en mycket effektiv och supersnabb anslutning. När vi går med i två datasätt och en av datasätten är mycket mindre än den andra (t.ex. när det lilla datasettet kan passa in i minnet), bör vi använda en Broadcast Hash Join.
Följande bild visualiserar en Broadcast Hash-sammankoppling där det lilla datasettet sänds till varje partition i Large Dataset.
Följande är kodprov som du enkelt kan implementera om du har ett liknande scenario med ett stort och litet datasätt.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")