apache-spark учебник
Начало работы с apache-spark
Поиск…
замечания
Apache Spark - это платформа для обработки данных с открытым исходным кодом, основанная на скорости, простоте использования и сложной аналитике. Разработчик должен использовать его, когда он обрабатывает большой объем данных, что обычно подразумевает ограничения памяти и / или чрезмерное время обработки.
Следует также упомянуть о любых крупных предметах в искры apache и ссылки на связанные темы. Поскольку Документация для apache-spark является новой, вам может потребоваться создать начальные версии этих связанных тем.
Версии
Версия | Дата выхода |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 2017-05-02 |
2.1.0 | 2016-12-28 |
2.0.1 | 2016-10-03 |
2.0.0 | 2016-07-26 |
1.6.0 | 2016-01-04 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 2014-02-02 |
0.8.0 | 2013-09-25 |
0.7.0 | 2013-02-27 |
0.6.0 | 2012-10-15 |
Вступление
Прототип :
aggregate (zeroValue, seqOp, combOp)
Описание :
aggregate()
позволяет вам использовать RDD и генерировать одно значение, которое отличается от того, что было сохранено в исходном RDD.
Параметры :
-
zeroValue
: значение инициализации для вашего результата в нужном формате. -
seqOp
: операция, которую вы хотите применить к записям RDD. Выполняется один раз для каждой записи в разделе. -
combOp
: Определяет, как объединенные объекты (по одному для каждого раздела) объединяются.
Пример :
Вычислить сумму списка и длину этого списка. Вернуть результат в пару
(sum, length)
.
В оболочке Spark создайте список из 4 элементов с двумя разделами :
listRDD = sc.parallelize([1,2,3,4], 2)
Затем определите seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Затем определите combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Затем агрегируются:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Первый раздел имеет подписок [1, 2]. Это применит seqOp к каждому элементу этого списка, который создает локальный результат - пару (sum, length)
которая будет отражать результат локально, только в этом первом разделе.
local_result
инициализируется параметром zeroValue
aggregate()
. Например, (0, 0) и list_element
- это первый элемент списка:
0 + 1 = 1
0 + 1 = 1
Локальный результат равен (1, 1), что означает, что сумма равна 1 и длина 1 для 1-го раздела после обработки только первого элемента. local_result
обновляется с (0, 0), до (1, 1).
1 + 2 = 3
1 + 1 = 2
Локальный результат теперь (3, 2), который будет конечным результатом 1-го раздела, так как они не являются никакими другими элементами в подсписке 1-го раздела. Выполнение этого же для возврата 2-го раздела (7, 2).
Примените combOp к каждому локальному результату, чтобы сформировать окончательный глобальный результат:
(3,2) + (7,2) = (10, 4)
Пример, описанный в «рисунке»:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Трансформация против действия
Spark использует ленивую оценку ; это означает, что он не будет делать никакой работы, если это действительно не так. Такой подход позволяет избежать ненужного использования памяти, что позволяет нам работать с большими данными.
Преобразование выполняется с ленивым вычислением и происходит фактическая работа, когда происходит действие .
Пример:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Итак, в [1]
мы сказали Спарку прочитать файл в RDD, называемый lines
. Искра услышал нас и сказал нам: «Да , я сделаю это», но на самом деле он еще не читал файл.
В [2] мы фильтруем строки файла, предполагая, что его содержимое содержит строки с ошибками, отмеченными error
в начале. Поэтому мы говорим Spark о создании нового RDD, называемого errors
, в котором будут элементы RDD- lines
, которые с самого начала имели error
слова.
Теперь в [3]
мы попросим Spark подсчитать ошибки , т. Е. Подсчитать количество элементов, которые RDD называется errors
. count()
- это действие , которое не оставляет выбора Spark, но фактически выполняет операцию, так что он может найти результат count()
, который будет целочисленным.
В результате, когда [3]
будет достигнуто, [1]
и [2]
будут выполняться фактически, т. Е. Когда мы достигнем [3]
, тогда и только тогда:
файл будет считаться в
textFile()
(из-за[1]
)lines
будутfilter()
'ed (из-за[2]
)count()
выполнит, из-за[3]
Совет отладки: поскольку Spark не будет выполнять какую-либо реальную работу до достижения [3]
, важно понять, что если ошибка существует в [1]
и / или [2]
, она не появится, пока действие в [3]
запускает Spark для выполнения реальной работы. Например, если ваши данные в файле не поддерживают startsWith()
я использовал, тогда [2]
будет правильно принят Spark, и это не вызовет никакой ошибки, но когда [3]
будет отправлено, а Spark фактически оценивает как [1]
и [2]
, тогда и только тогда он поймет, что что-то не так с [2]
и создает описательную ошибку.
В результате при запуске [3]
может быть запущена ошибка, но это не означает, что ошибка должна заключаться в утверждении [3]
!
Обратите внимание: ни lines
ни errors
не будут сохранены в памяти после [3]
. Они будут продолжать существовать только как набор инструкций обработки. Если на любом из этих RDD будет выполняться несколько действий, искра будет считывать и фильтровать данные несколько раз. Чтобы избежать дублирования операций при выполнении нескольких действий на одном RDD, часто полезно хранить данные в памяти с помощью cache
.
Вы можете увидеть больше преобразований / действий в документах Spark .
Проверьте версию Spark
В spark-shell
:
sc.version
Как правило, в программе:
SparkContext.version
Использование spark-submit
:
spark-submit --version