apache-spark Tutorial
Empezando con la chispa de apache
Buscar..
Observaciones
Apache Spark es un marco de procesamiento de datos grandes de código abierto creado en torno a la velocidad, la facilidad de uso y el análisis sofisticado. Un desarrollador debe usarlo cuando maneja una gran cantidad de datos, lo que generalmente implica limitaciones de memoria y / o tiempo de procesamiento prohibitivo.
También debe mencionar cualquier tema grande dentro de apache-spark y vincular a los temas relacionados. Dado que la Documentación para apache-spark es nueva, es posible que deba crear versiones iniciales de esos temas relacionados.
Versiones
Versión | Fecha de lanzamiento |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 2017-05-02 |
2.1.0 | 2016-12-28 |
2.0.1 | 2016-10-03 |
2.0.0 | 2016-07-26 |
1.6.0 | 2016-01-04 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 2014-02-02 |
0.8.0 | 2013-09-25 |
0.7.0 | 2013-02-27 |
0.6.0 | 2012-10-15 |
Introducción
Prototipo :
agregado (valor cero, seqOp, combOp)
Descripción :
aggregate()
permite tomar un RDD y generar un valor único que es de un tipo diferente al que estaba almacenado en el RDD original.
Parámetros :
-
zeroValue
: el valor de inicialización, para su resultado, en el formato deseado. -
seqOp
: la operación que desea aplicar a los registros RDD. Se ejecuta una vez por cada registro en una partición. -
combOp
: define cómo secombOp
los objetos resultantes (uno para cada partición).
Ejemplo :
Calcule la suma de una lista y la longitud de esa lista. Devuelve el resultado en un par de
(sum, length)
.
En un shell de Spark, crea una lista con 4 elementos, con 2 particiones :
listRDD = sc.parallelize([1,2,3,4], 2)
Luego define seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Luego define combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Luego se agrega:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
La primera partición tiene la sublista [1, 2]. Esto aplica el seqOp a cada elemento de esa lista, que produce un resultado local: un par de (sum, length)
que reflejará el resultado localmente, solo en esa primera partición.
local_result
se inicializa con el parámetro zeroValue
aggregate()
se proporcionó. Por ejemplo, (0, 0) y list_element
es el primer elemento de la lista:
0 + 1 = 1
0 + 1 = 1
El resultado local es (1, 1), lo que significa que la suma es 1 y la longitud 1 para la primera partición después de procesar solo el primer elemento. local_result
se actualiza de (0, 0), a (1, 1).
1 + 2 = 3
1 + 1 = 2
El resultado local es ahora (3, 2), que será el resultado final de la primera partición, ya que no hay otros elementos en la lista secundaria de la primera partición. Haciendo lo mismo para la 2ª partición devuelve (7, 2).
Aplique combOp a cada resultado local para formar el resultado global final:
(3,2) + (7,2) = (10, 4)
Ejemplo descrito en 'figura':
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Transformación vs acción
Spark utiliza la evaluación perezosa ; eso significa que no hará ningún trabajo, a menos que realmente tenga que hacerlo. Ese enfoque nos permite evitar el uso innecesario de la memoria, lo que nos permite trabajar con big data.
Una transformación se evalúa de forma perezosa y el trabajo real ocurre cuando se produce una acción .
Ejemplo:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Entonces, en [1]
le dijimos a Spark que leyera un archivo en un RDD, llamado lines
. Spark nos escuchó y nos dijo: "Sí, lo haré", pero en realidad aún no había leído el archivo.
En [2], estamos filtrando las líneas del archivo, asumiendo que su contenido contiene líneas con errores que están marcados con un error
en su inicio. Entonces le pedimos a Spark que cree un nuevo RDD, llamado errors
, que tendrá los elementos de las lines
RDD, que tenían la palabra error
al comienzo.
Ahora en [3]
, le pedimos a Spark que cuente los errores , es decir, que cuente el número de elementos que tiene el RDD llamado errors
. count()
es una acción , que no deja ninguna opción a Spark, sino a realizar la operación, para que pueda encontrar el resultado de count()
, que será un número entero.
Como resultado, cuando [3]
se alcanza, [1]
y [2]
en realidad se está realizando, es decir, que cuando se llega a [3]
, entonces y sólo entonces:
el archivo se leerá en
textFile()
(debido a[1]
)lines
seránfilter()
'ed (debido a[2]
)count()
se ejecutará, debido a[3]
Consejo de depuración: como Spark no realizará ningún trabajo real hasta que se alcance [3]
, es importante entender que si existe un error en [1]
y / o [2]
, no aparecerá hasta que aparezca la acción en [3]
activa a Spark para hacer un trabajo real. Por ejemplo, si sus datos en el archivo no son compatibles con startsWith()
que usé, entonces [2]
será aceptado por Spark y no generará ningún error, pero cuando [3]
se envía, y Spark en realidad evalúa tanto [1]
como [2]
, entonces y solo entonces entenderá que algo no es correcto con [2]
y producirá un error descriptivo.
Como resultado, se puede desencadenar un error cuando se ejecuta [3]
, ¡pero eso no significa que el error deba estar en la declaración de [3]
!
Tenga en cuenta que ni las lines
ni los errors
se almacenarán en la memoria después de [3]
. Continuarán existiendo solo como un conjunto de instrucciones de procesamiento. Si se realizarán varias acciones en cualquiera de estos RDD, spark leerá y filtrará los datos varias veces. Para evitar la duplicación de operaciones cuando se realizan varias acciones en un solo RDD, a menudo es útil almacenar datos en la memoria usando la memoria cache
.
Puedes ver más transformaciones / acciones en documentos de Spark .
Revisar la versión Spark
En spark-shell
:
sc.version
Generalmente en un programa:
SparkContext.version
Utilizando spark-submit
:
spark-submit --version