Поиск…


Вступление

В этом разделе Spark Users могут найти различные конфигурации Spark SQL, который является наиболее используемым компонентом инфраструктуры Apache Spark.

Управление Spark SQL Shuffle Partitions

В Apache Spark при выполнении операций shuffle, таких как join и cogroup многие данные передаются по сети. Теперь, чтобы контролировать количество разделов, по которым происходит перетасовка, можно управлять конфигурациями, указанными в Spark SQL. Эта конфигурация выглядит следующим образом:

spark.sql.shuffle.partitions

Используя эту конфигурацию, мы можем контролировать количество разделов операций перетасовки. По умолчанию его значение равно 200 . Но 200 разделов не имеют никакого смысла, если у нас есть файлы из нескольких ГБ (ов). Таким образом, мы должны изменить их в зависимости от объема данных, которые нам нужно обрабатывать через Spark SQL. Например:

В этом случае у нас есть две таблицы, которые будут объединены employee и department . Обе таблицы содержат только несколько записей, но нам нужно присоединиться к ним, чтобы узнать отдел каждого сотрудника. Таким образом, мы присоединяемся к ним, используя Spark DataFrames следующим образом:

val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

Теперь количество разделов, созданных при соединении, по умолчанию 200, что, конечно же, слишком много для этого большого количества данных.

Итак, давайте изменим это значение, чтобы мы могли уменьшить количество операций тасования.

val conf = new SparkConf().setAppName("sample").setMaster("local").set("spark.sql.shuffle.partitions", 2)
val sc = new SparkContext(conf)

val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")

employeeDF.join(departmentDF, "employeeName").show()

Теперь количество разделов в случайном порядке сокращается до 2, что не только уменьшит количество операций перетасовки, но и сократит время, необходимое для присоединения к DataFrames с 0.878505 s до 0.077847 s .

Поэтому всегда настраивайте количество разделов для операций тасования в соответствии с обрабатываемыми данными.



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow