apache-spark
Doet mee
Zoeken…
Opmerkingen
Een ding om op te merken is uw bronnen versus de grootte van de gegevens waaraan u deelneemt. Dit is waar uw Spark Join-code kan mislukken waardoor u geheugenfouten krijgt. Zorg daarom dat u uw Spark-taken echt goed configureert, afhankelijk van de grootte van de gegevens. Hierna volgt een voorbeeld van een configuratie voor een join van 1,5 miljoen tot 200 miljoen.
Spark-Shell gebruiken
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Spark Submit gebruiken
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Hash uitzenden Doe mee aan Spark
Een broadcast join kopieert de kleine gegevens naar de werkknooppunten, wat leidt tot een zeer efficiënte en supersnelle join. Wanneer we twee datasets samenvoegen en een van de datasets veel kleiner is dan de andere (bijvoorbeeld wanneer de kleine dataset in het geheugen past), moeten we een Broadcast Hash Join gebruiken.
De volgende afbeelding toont een Broadcast Hash Join waar de kleine gegevensset wordt uitgezonden naar elke partitie van de grote gegevensset.
Hierna volgt een codevoorbeeld dat u eenvoudig kunt implementeren als u een vergelijkbaar scenario hebt voor een grote en kleine gegevensset.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")