apache-spark учебник
Начало работы с apache-spark
Поиск…
замечания
Apache Spark - это платформа для обработки данных с открытым исходным кодом, основанная на скорости, простоте использования и сложной аналитике. Разработчик должен использовать его, когда он обрабатывает большой объем данных, что обычно подразумевает ограничения памяти и / или чрезмерное время обработки.
Следует также упомянуть о любых крупных предметах в искры apache и ссылки на связанные темы. Поскольку Документация для apache-spark является новой, вам может потребоваться создать начальные версии этих связанных тем.
Версии
| Версия | Дата выхода |
|---|---|
| 2.2.0 | 2017-07-11 |
| 2.1.1 | 2017-05-02 |
| 2.1.0 | 2016-12-28 |
| 2.0.1 | 2016-10-03 |
| 2.0.0 | 2016-07-26 |
| 1.6.0 | 2016-01-04 |
| 1.5.0 | 2015-09-09 |
| 1.4.0 | 2015-06-11 |
| 1.3.0 | 2015-03-13 |
| 1.2.0 | 2014-12-18 |
| 1.1.0 | 2014-09-11 |
| 1.0.0 | 2014-05-30 |
| 0.9.0 | 2014-02-02 |
| 0.8.0 | 2013-09-25 |
| 0.7.0 | 2013-02-27 |
| 0.6.0 | 2012-10-15 |
Вступление
Прототип :
aggregate (zeroValue, seqOp, combOp)
Описание :
aggregate() позволяет вам использовать RDD и генерировать одно значение, которое отличается от того, что было сохранено в исходном RDD.
Параметры :
-
zeroValue: значение инициализации для вашего результата в нужном формате. -
seqOp: операция, которую вы хотите применить к записям RDD. Выполняется один раз для каждой записи в разделе. -
combOp: Определяет, как объединенные объекты (по одному для каждого раздела) объединяются.
Пример :
Вычислить сумму списка и длину этого списка. Вернуть результат в пару
(sum, length).
В оболочке Spark создайте список из 4 элементов с двумя разделами :
listRDD = sc.parallelize([1,2,3,4], 2)
Затем определите seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Затем определите combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Затем агрегируются:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Первый раздел имеет подписок [1, 2]. Это применит seqOp к каждому элементу этого списка, который создает локальный результат - пару (sum, length) которая будет отражать результат локально, только в этом первом разделе.
local_result инициализируется параметром zeroValue aggregate() . Например, (0, 0) и list_element - это первый элемент списка:
0 + 1 = 1
0 + 1 = 1
Локальный результат равен (1, 1), что означает, что сумма равна 1 и длина 1 для 1-го раздела после обработки только первого элемента. local_result обновляется с (0, 0), до (1, 1).
1 + 2 = 3
1 + 1 = 2
Локальный результат теперь (3, 2), который будет конечным результатом 1-го раздела, так как они не являются никакими другими элементами в подсписке 1-го раздела. Выполнение этого же для возврата 2-го раздела (7, 2).
Примените combOp к каждому локальному результату, чтобы сформировать окончательный глобальный результат:
(3,2) + (7,2) = (10, 4)
Пример, описанный в «рисунке»:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Трансформация против действия
Spark использует ленивую оценку ; это означает, что он не будет делать никакой работы, если это действительно не так. Такой подход позволяет избежать ненужного использования памяти, что позволяет нам работать с большими данными.
Преобразование выполняется с ленивым вычислением и происходит фактическая работа, когда происходит действие .
Пример:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Итак, в [1] мы сказали Спарку прочитать файл в RDD, называемый lines . Искра услышал нас и сказал нам: «Да , я сделаю это», но на самом деле он еще не читал файл.
В [2] мы фильтруем строки файла, предполагая, что его содержимое содержит строки с ошибками, отмеченными error в начале. Поэтому мы говорим Spark о создании нового RDD, называемого errors , в котором будут элементы RDD- lines , которые с самого начала имели error слова.
Теперь в [3] мы попросим Spark подсчитать ошибки , т. Е. Подсчитать количество элементов, которые RDD называется errors . count() - это действие , которое не оставляет выбора Spark, но фактически выполняет операцию, так что он может найти результат count() , который будет целочисленным.
В результате, когда [3] будет достигнуто, [1] и [2] будут выполняться фактически, т. Е. Когда мы достигнем [3] , тогда и только тогда:
файл будет считаться в
textFile()(из-за[1])linesбудутfilter()'ed (из-за[2])count()выполнит, из-за[3]
Совет отладки: поскольку Spark не будет выполнять какую-либо реальную работу до достижения [3] , важно понять, что если ошибка существует в [1] и / или [2] , она не появится, пока действие в [3] запускает Spark для выполнения реальной работы. Например, если ваши данные в файле не поддерживают startsWith() я использовал, тогда [2] будет правильно принят Spark, и это не вызовет никакой ошибки, но когда [3] будет отправлено, а Spark фактически оценивает как [1] и [2] , тогда и только тогда он поймет, что что-то не так с [2] и создает описательную ошибку.
В результате при запуске [3] может быть запущена ошибка, но это не означает, что ошибка должна заключаться в утверждении [3] !
Обратите внимание: ни lines ни errors не будут сохранены в памяти после [3] . Они будут продолжать существовать только как набор инструкций обработки. Если на любом из этих RDD будет выполняться несколько действий, искра будет считывать и фильтровать данные несколько раз. Чтобы избежать дублирования операций при выполнении нескольких действий на одном RDD, часто полезно хранить данные в памяти с помощью cache .
Вы можете увидеть больше преобразований / действий в документах Spark .
Проверьте версию Spark
В spark-shell :
sc.version
Как правило, в программе:
SparkContext.version
Использование spark-submit :
spark-submit --version