Поиск…


замечания

Количество разделов имеет решающее значение для производительности приложения и / или успешного завершения.

Резистивный распределенный набор данных (RDD) - основная абстракция Spark. RDD разделяется на разделы, это означает, что раздел является частью набора данных, его фрагмента или, другими словами, его фрагментом.

Чем больше количество разделов, тем меньше размер каждого раздела.

Однако обратите внимание, что большое количество разделов оказывает сильное давление на распределенную файловую систему Hadoop (HDFS), которая должна содержать значительное количество метаданных.

Количество разделов связано с использованием памяти, и проблема с памятью может быть связана с этим числом ( личный опыт ).


Общей ошибкой для новых пользователей является преобразование их RDD в RDD только с одним разделом, который обычно выглядит так:

data = sc.textFile(file)
data = data.coalesce(1) 

Это, как правило, очень плохая идея, поскольку вы говорите Spark, что все данные - это всего лишь один раздел! Помните, что:

Сцена в Spark будет работать с одним разделом за раз (и загружать данные в этот раздел в память).

В результате вы сообщаете Spark обрабатывать сразу все данные, что обычно приводит к ошибкам, связанным с памятью (например, вне памяти), или даже исключению с нулевым указателем.

Итак, если вы не знаете, что делаете, избегайте переделки вашего RDD только в одном разделе!

Разделы

Как RDD становится секционированным?

По умолчанию раздел создается для каждого раздела HDFS, который по умолчанию составляет 64 МБ. Подробнее читайте здесь .

Как сбалансировать мои данные по разделам?

Во-первых, взгляните на три способа переделать свои данные:

  1. Передайте второй параметр, требуемое минимальное количество разделов для вашего RDD, в textFile () , но будьте осторожны:

    В [14]: lines = sc.textFile («данные»)

    В [15]: lines.getNumPartitions () Out [15]: 1000

    В [16]: lines = sc.textFile («данные», 500)

    В [17]: lines.getNumPartitions () Out [17]: 1434

    В [18]: lines = sc.textFile («data», 5000)

    В [19]: lines.getNumPartitions () Out [19]: 5926

Как вы можете видеть, [16] не делает того, чего можно было бы ожидать, так как количество разделов, имеющих RDD, уже больше минимального количества запросов, которые мы запрашиваем.

  1. Используйте repartition () , например:

    В [22]: lines = lines.repartition (10)

    В [23]: lines.getNumPartitions () Out [23]: 10

Предупреждение. Это вызовет перетасовку и будет использоваться, если вы хотите увеличить количество разделов вашего RDD.

Из документов :

Shuffle - механизм Spark для перераспределения данных, так что он группируется по-разному между разделами. Обычно это связано с копированием данных между исполнителями и машинами, что делает перетасовку сложной и дорогостоящей.

  1. Используйте coalesce () , например:

    В [25]: lines = lines.coalesce (2)

    В [26]: lines.getNumPartitions () Out [26]: 2

Здесь Spark знает, что вы сжимаете RDD и получаете преимущество. Подробнее о repartition () vs coalesce () .


Но все ли это гарантирует, что ваши данные будут идеально сбалансированы по вашим разделам? Не совсем так, как я пережил в Как сбалансировать мои данные по разделам?

Разделы RDD

Как упоминалось в «Замечаниях», раздел является частью / срезом / фрагментом RDD. Ниже приведен минимальный пример того, как запрашивать минимальное количество разделов для вашего RDD:

In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

In [2]: mylistRDD.getNumPartitions()
Out[2]: 2

Обратите внимание, что в [1] мы передали 2 как второй параметр parallelize() . Этот параметр говорит, что мы хотим, чтобы наш RDD имел как минимум 2 раздела.

Переделка RDD

Иногда мы хотим переделать RDD, например, потому что он исходит из файла, который не был создан нами, а количество разделов, определенных от создателя, не такое, которое мы хотим.

Двумя наиболее известными функциями для этого являются:

repartition(numPartitions)

а также:

coalesce(numPartitions, shuffle=False)

Как правило, используйте первое, когда вы хотите переделать свой RDD в большее количество разделов, а второй - уменьшить свой RDD, в меньшем количестве разделов. Spark - repartition () vs coalesce () .

Например:

data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions

уменьшит количество разделов RDD, называемых «данными», до 100, учитывая, что это RDD имеет более 100 разделов, когда оно считывается с помощью textFile() .

И аналогичным образом, если вы хотите иметь больше, чем текущее количество разделов для вашего RDD, вы могли бы сделать это (учитывая, что ваш RDD распространяется, например, на 200 разделов):

data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions

Правило большого пальца о количестве разделов

Как правило, нужно, чтобы его RDD имел столько разделов, сколько произведение числа исполнителей на количество используемых ядер на 3 (или, возможно, 4). Конечно, это эвристика, и это действительно зависит от вашего приложения, набора данных и конфигурации кластера.

Пример:

In [1]: data  = sc.textFile(file)

In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))

In [3]: data = data.coalesce(total_cores * 3)      

Показать содержимое RDD

Чтобы показать содержимое RDD, оно должно быть напечатано:

myRDD.foreach(println)

Чтобы ограничить количество напечатанных строк:

myRDD.take(num_of_rows).foreach(println)


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow