apache-spark
присоединяется
Поиск…
замечания
Следует отметить, что ваши ресурсы сравниваются с размером данных, с которыми вы соединяетесь. Здесь ваш код Spark Join может завершиться неудачно, что приведет к ошибкам памяти. По этой причине убедитесь, что вы правильно настроили свои задачи Spark в зависимости от размера данных. Ниже приведен пример конфигурации для объединения от 1,5 миллиона до 200 миллионов.
Использование Spark-Shell
spark-shell --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10
Использование Spark Submit
spark-submit --executor-memory 32G --num-executors 80 --driver-memory 10g --executor-cores 10 code.jar
Широковещательный хэш Присоединяйтесь к Spark
Совместное вещание копирует небольшие данные в рабочие узлы, что приводит к высокоэффективному и сверхбыстрому соединению. Когда мы соединяем два набора данных, и один из наборов данных намного меньше, чем другой (например, когда небольшой набор данных может вписываться в память), тогда мы должны использовать Broadcast Hash Join.
Следующее изображение визуализирует широковещательный хэш, связанный с тем, что небольшой набор данных передается на каждый раздел большого набора данных.
Ниже приведен пример кода, который вы можете легко реализовать, если у вас есть аналогичный сценарий объединения больших и малых наборов данных.
case class SmallData(col1: String, col2:String, col3:String, col4:Int, col5:Int)
val small = sc.textFile("/datasource")
val df1 = sm_data.map(_.split("\\|")).map(attr => SmallData(attr(0).toString, attr(1).toString, attr(2).toString, attr(3).toInt, attr(4).toInt)).toDF()
val lg_data = sc.textFile("/datasource")
case class LargeData(col1: Int, col2: String, col3: Int)
val LargeDataFrame = lg_data.map(_.split("\\|")).map(attr => LargeData(attr(0).toInt, attr(2).toString, attr(3).toInt)).toDF()
val joinDF = LargeDataFrame.join(broadcast(smallDataFrame), "key")