Поиск…


замечания

Apache Spark - это платформа для обработки данных с открытым исходным кодом, основанная на скорости, простоте использования и сложной аналитике. Разработчик должен использовать его, когда он обрабатывает большой объем данных, что обычно подразумевает ограничения памяти и / или чрезмерное время обработки.


Следует также упомянуть о любых крупных предметах в искры apache и ссылки на связанные темы. Поскольку Документация для apache-spark является новой, вам может потребоваться создать начальные версии этих связанных тем.

Версии

Версия Дата выхода
2.2.0 2017-07-11
2.1.1 2017-05-02
2.1.0 2016-12-28
2.0.1 2016-10-03
2.0.0 2016-07-26
1.6.0 2016-01-04
1.5.0 2015-09-09
1.4.0 2015-06-11
1.3.0 2015-03-13
1.2.0 2014-12-18
1.1.0 2014-09-11
1.0.0 2014-05-30
0.9.0 2014-02-02
0.8.0 2013-09-25
0.7.0 2013-02-27
0.6.0 2012-10-15

Вступление

Прототип :

aggregate (zeroValue, seqOp, combOp)

Описание :

aggregate() позволяет вам использовать RDD и генерировать одно значение, которое отличается от того, что было сохранено в исходном RDD.

Параметры :

  1. zeroValue : значение инициализации для вашего результата в нужном формате.
  2. seqOp : операция, которую вы хотите применить к записям RDD. Выполняется один раз для каждой записи в разделе.
  3. combOp : Определяет, как объединенные объекты (по одному для каждого раздела) объединяются.

Пример :

Вычислить сумму списка и длину этого списка. Вернуть результат в пару (sum, length) .

В оболочке Spark создайте список из 4 элементов с двумя разделами :

listRDD = sc.parallelize([1,2,3,4], 2)

Затем определите seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Затем определите combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Затем агрегируются:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

Первый раздел имеет подписок [1, 2]. Это применит seqOp к каждому элементу этого списка, который создает локальный результат - пару (sum, length) которая будет отражать результат локально, только в этом первом разделе.

local_result инициализируется параметром zeroValue aggregate() . Например, (0, 0) и list_element - это первый элемент списка:

0 + 1 = 1
0 + 1 = 1

Локальный результат равен (1, 1), что означает, что сумма равна 1 и длина 1 для 1-го раздела после обработки только первого элемента. local_result обновляется с (0, 0), до (1, 1).

1 + 2 = 3
1 + 1 = 2

Локальный результат теперь (3, 2), который будет конечным результатом 1-го раздела, так как они не являются никакими другими элементами в подсписке 1-го раздела. Выполнение этого же для возврата 2-го раздела (7, 2).

Примените combOp к каждому локальному результату, чтобы сформировать окончательный глобальный результат:

(3,2) + (7,2) = (10, 4)

Пример, описанный в «рисунке»:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Трансформация против действия

Spark использует ленивую оценку ; это означает, что он не будет делать никакой работы, если это действительно не так. Такой подход позволяет избежать ненужного использования памяти, что позволяет нам работать с большими данными.

Преобразование выполняется с ленивым вычислением и происходит фактическая работа, когда происходит действие .

Пример:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

Итак, в [1] мы сказали Спарку прочитать файл в RDD, называемый lines . Искра услышал нас и сказал нам: «Да , я сделаю это», но на самом деле он еще не читал файл.

В [2] мы фильтруем строки файла, предполагая, что его содержимое содержит строки с ошибками, отмеченными error в начале. Поэтому мы говорим Spark о создании нового RDD, называемого errors , в котором будут элементы RDD- lines , которые с самого начала имели error слова.

Теперь в [3] мы попросим Spark подсчитать ошибки , т. Е. Подсчитать количество элементов, которые RDD называется errors . count() - это действие , которое не оставляет выбора Spark, но фактически выполняет операцию, так что он может найти результат count() , который будет целочисленным.

В результате, когда [3] будет достигнуто, [1] и [2] будут выполняться фактически, т. Е. Когда мы достигнем [3] , тогда и только тогда:

  1. файл будет считаться в textFile() (из-за [1] )

  2. lines будут filter() 'ed (из-за [2] )

  3. count() выполнит, из-за [3]


Совет отладки: поскольку Spark не будет выполнять какую-либо реальную работу до достижения [3] , важно понять, что если ошибка существует в [1] и / или [2] , она не появится, пока действие в [3] запускает Spark для выполнения реальной работы. Например, если ваши данные в файле не поддерживают startsWith() я использовал, тогда [2] будет правильно принят Spark, и это не вызовет никакой ошибки, но когда [3] будет отправлено, а Spark фактически оценивает как [1] и [2] , тогда и только тогда он поймет, что что-то не так с [2] и создает описательную ошибку.

В результате при запуске [3] может быть запущена ошибка, но это не означает, что ошибка должна заключаться в утверждении [3] !

Обратите внимание: ни lines ни errors не будут сохранены в памяти после [3] . Они будут продолжать существовать только как набор инструкций обработки. Если на любом из этих RDD будет выполняться несколько действий, искра будет считывать и фильтровать данные несколько раз. Чтобы избежать дублирования операций при выполнении нескольких действий на одном RDD, часто полезно хранить данные в памяти с помощью cache .


Вы можете увидеть больше преобразований / действий в документах Spark .

Проверьте версию Spark

В spark-shell :

sc.version

Как правило, в программе:

SparkContext.version

Использование spark-submit :

 spark-submit --version


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow