apache-spark Tutorial
Iniziare con la scintilla di apache
Ricerca…
Osservazioni
Apache Spark è un framework di elaborazione di big data open source basato su velocità, facilità d'uso e analisi sofisticate. Uno sviluppatore dovrebbe usarlo quando (s) gestisce una grande quantità di dati, che di solito implicano limitazioni di memoria e / o tempo di elaborazione proibitivo.
Dovrebbe anche menzionare tutti i soggetti di grandi dimensioni all'interno della scintilla di apache e collegarsi agli argomenti correlati. Poiché la documentazione di apache-spark è nuova, potrebbe essere necessario creare versioni iniziali di tali argomenti correlati.
Versioni
Versione | Data di rilascio |
---|---|
2.2.0 | 2017/07/11 |
2.1.1 | 2017/05/02 |
2.1.0 | 2016/12/28 |
2.0.1 | 2016/10/03 |
2.0.0 | 2016/07/26 |
1.6.0 | 2016/01/04 |
1.5.0 | 2015/09/09 |
1.4.0 | 2015/06/11 |
1.3.0 | 2015/03/13 |
1.2.0 | 2014/12/18 |
1.1.0 | 2014/09/11 |
1.0.0 | 2014/05/30 |
0.9.0 | 2014/02/02 |
0.8.0 | 2013/09/25 |
0.7.0 | 2013/02/27 |
0.6.0 | 2012/10/15 |
introduzione
Prototipo :
aggregato (zeroValue, seqOp, combOp)
Descrizione :
aggregate()
consente di acquisire un RDD e generare un singolo valore di un tipo diverso rispetto a quello memorizzato nell'RDD originale.
Parametri :
-
zeroValue
: il valore di inizializzazione, per il risultato, nel formato desiderato. -
seqOp
: l'operazione che si desidera applicare ai record RDD. Esegue una volta per ogni record in una partizione. -
combOp
: definisce come gli oggetti risultanti (uno per ogni partizione), vengono combinati.
Esempio :
Calcola la somma di un elenco e la lunghezza di tale elenco. Restituisce il risultato in una coppia di
(sum, length)
.
In una shell Spark, crea una lista con 4 elementi, con 2 partizioni :
listRDD = sc.parallelize([1,2,3,4], 2)
Quindi definire seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Quindi definire combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Quindi aggregato:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
La prima partizione ha la sottolista [1, 2]. Questo applica seqOp a ciascun elemento di quell'elenco, che produce un risultato locale - Una coppia di (sum, length)
che rifletterà il risultato localmente, solo in quella prima partizione.
local_result
viene inizializzato sul parametro zeroValue
con cui è stato fornito aggregate()
. Ad esempio, (0, 0) e list_element
è il primo elemento dell'elenco:
0 + 1 = 1
0 + 1 = 1
Il risultato locale è (1, 1), il che significa che la somma è 1 e la lunghezza 1 per la prima partizione dopo aver elaborato solo il primo elemento. local_result
viene aggiornato da (0, 0), a (1, 1).
1 + 2 = 3
1 + 1 = 2
Il risultato locale è ora (3, 2), che sarà il risultato finale della prima partizione, poiché non ci sono altri elementi nella sottolista della prima partizione. Fare lo stesso per i ritorni della 2a partizione (7, 2).
Applica combOp a ogni risultato locale per formare il risultato finale globale:
(3,2) + (7,2) = (10, 4)
Esempio descritto in 'figura':
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Trasformazione vs Azione
Spark usa una valutazione pigra ; ciò significa che non farà alcun lavoro, a meno che non sia necessario. Questo approccio ci consente di evitare l'utilizzo di memoria non necessario, rendendoci così in grado di lavorare con i big data.
Una trasformazione viene valutata pigra e il lavoro effettivo si verifica quando si verifica un'azione .
Esempio:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Quindi, in [1]
abbiamo detto a Spark di leggere un file in un RDD, chiamato lines
. Spark ci ha sentito e ci ha detto: "Sì, lo farò", ma in effetti non ha ancora letto il file.
In [2], stiamo filtrando le righe del file, assumendo che il suo contenuto contenga linee con errori contrassegnati da un error
all'inizio. Quindi diciamo a Spark di creare un nuovo RDD, chiamato errors
, che avrà gli elementi delle lines
RDD, con la parola error
all'inizio.
Ora in [3]
, chiediamo a Spark di conteggiare gli errori , cioè di contare il numero di elementi che l'RDD ha chiamato errors
. count()
è un'azione , che non lascia alcuna scelta a Spark, ma di eseguire effettivamente l'operazione, in modo che possa trovare il risultato di count()
, che sarà un numero intero.
Di conseguenza, quando viene raggiunto [3]
, [1]
e [2]
verranno effettivamente eseguiti, cioè che quando raggiungiamo [3]
, allora e solo allora:
il file verrà letto in
textFile()
(causa di[1]
)lines
sarannofilter()
'ed (a causa di[2]
)count()
verrà eseguito, a causa di[3]
Suggerimento per il debug: poiché Spark non svolgerà alcun lavoro reale fino al raggiungimento di [3]
, è importante capire che se esiste un errore in [1]
e / o [2]
, non verrà visualizzato, fino a quando l'azione in [3]
innesca Spark per svolgere il proprio lavoro. Ad esempio, se i tuoi dati nel file non supportano startsWith()
ho usato, allora [2]
sarà correttamente accettato da Spark e non genererà alcun errore, ma quando [3]
viene inviato e Spark effettivamente valuta entrambi [1]
e [2]
, quindi e solo allora capirà che qualcosa non è corretto con [2]
e produce un errore descrittivo.
Di conseguenza, un errore può essere attivato quando [3]
viene eseguito, ma ciò non significa che l'errore deve trovarsi nell'istruzione di [3]
!
Nota, né le lines
né gli errors
saranno memorizzati in memoria dopo [3]
. Continueranno ad esistere solo come un insieme di istruzioni di elaborazione. Se ci saranno più azioni eseguite su uno di questi RDD, spark leggerà e filtrerà i dati più volte. Per evitare la duplicazione delle operazioni durante l'esecuzione di più azioni su un singolo RDD, è spesso utile memorizzare i dati nella memoria utilizzando la cache
.
Puoi vedere più trasformazioni / azioni in Spark docs .
Controlla la versione di Spark
In spark-shell
:
sc.version
Generalmente in un programma:
SparkContext.version
Utilizzando spark-submit
:
spark-submit --version