Ricerca…


Osservazioni

Apache Spark è un framework di elaborazione di big data open source basato su velocità, facilità d'uso e analisi sofisticate. Uno sviluppatore dovrebbe usarlo quando (s) gestisce una grande quantità di dati, che di solito implicano limitazioni di memoria e / o tempo di elaborazione proibitivo.


Dovrebbe anche menzionare tutti i soggetti di grandi dimensioni all'interno della scintilla di apache e collegarsi agli argomenti correlati. Poiché la documentazione di apache-spark è nuova, potrebbe essere necessario creare versioni iniziali di tali argomenti correlati.

Versioni

Versione Data di rilascio
2.2.0 2017/07/11
2.1.1 2017/05/02
2.1.0 2016/12/28
2.0.1 2016/10/03
2.0.0 2016/07/26
1.6.0 2016/01/04
1.5.0 2015/09/09
1.4.0 2015/06/11
1.3.0 2015/03/13
1.2.0 2014/12/18
1.1.0 2014/09/11
1.0.0 2014/05/30
0.9.0 2014/02/02
0.8.0 2013/09/25
0.7.0 2013/02/27
0.6.0 2012/10/15

introduzione

Prototipo :

aggregato (zeroValue, seqOp, combOp)

Descrizione :

aggregate() consente di acquisire un RDD e generare un singolo valore di un tipo diverso rispetto a quello memorizzato nell'RDD originale.

Parametri :

  1. zeroValue : il valore di inizializzazione, per il risultato, nel formato desiderato.
  2. seqOp : l'operazione che si desidera applicare ai record RDD. Esegue una volta per ogni record in una partizione.
  3. combOp : definisce come gli oggetti risultanti (uno per ogni partizione), vengono combinati.

Esempio :

Calcola la somma di un elenco e la lunghezza di tale elenco. Restituisce il risultato in una coppia di (sum, length) .

In una shell Spark, crea una lista con 4 elementi, con 2 partizioni :

listRDD = sc.parallelize([1,2,3,4], 2)

Quindi definire seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Quindi definire combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Quindi aggregato:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

La prima partizione ha la sottolista [1, 2]. Questo applica seqOp a ciascun elemento di quell'elenco, che produce un risultato locale - Una coppia di (sum, length) che rifletterà il risultato localmente, solo in quella prima partizione.

local_result viene inizializzato sul parametro zeroValue con cui è stato fornito aggregate() . Ad esempio, (0, 0) e list_element è il primo elemento dell'elenco:

0 + 1 = 1
0 + 1 = 1

Il risultato locale è (1, 1), il che significa che la somma è 1 e la lunghezza 1 per la prima partizione dopo aver elaborato solo il primo elemento. local_result viene aggiornato da (0, 0), a (1, 1).

1 + 2 = 3
1 + 1 = 2

Il risultato locale è ora (3, 2), che sarà il risultato finale della prima partizione, poiché non ci sono altri elementi nella sottolista della prima partizione. Fare lo stesso per i ritorni della 2a partizione (7, 2).

Applica combOp a ogni risultato locale per formare il risultato finale globale:

(3,2) + (7,2) = (10, 4)

Esempio descritto in 'figura':

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Trasformazione vs Azione

Spark usa una valutazione pigra ; ciò significa che non farà alcun lavoro, a meno che non sia necessario. Questo approccio ci consente di evitare l'utilizzo di memoria non necessario, rendendoci così in grado di lavorare con i big data.

Una trasformazione viene valutata pigra e il lavoro effettivo si verifica quando si verifica un'azione .

Esempio:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

Quindi, in [1] abbiamo detto a Spark di leggere un file in un RDD, chiamato lines . Spark ci ha sentito e ci ha detto: "Sì, lo farò", ma in effetti non ha ancora letto il file.

In [2], stiamo filtrando le righe del file, assumendo che il suo contenuto contenga linee con errori contrassegnati da un error all'inizio. Quindi diciamo a Spark di creare un nuovo RDD, chiamato errors , che avrà gli elementi delle lines RDD, con la parola error all'inizio.

Ora in [3] , chiediamo a Spark di conteggiare gli errori , cioè di contare il numero di elementi che l'RDD ha chiamato errors . count() è un'azione , che non lascia alcuna scelta a Spark, ma di eseguire effettivamente l'operazione, in modo che possa trovare il risultato di count() , che sarà un numero intero.

Di conseguenza, quando viene raggiunto [3] , [1] e [2] verranno effettivamente eseguiti, cioè che quando raggiungiamo [3] , allora e solo allora:

  1. il file verrà letto in textFile() (causa di [1] )

  2. lines saranno filter() 'ed (a causa di [2] )

  3. count() verrà eseguito, a causa di [3]


Suggerimento per il debug: poiché Spark non svolgerà alcun lavoro reale fino al raggiungimento di [3] , è importante capire che se esiste un errore in [1] e / o [2] , non verrà visualizzato, fino a quando l'azione in [3] innesca Spark per svolgere il proprio lavoro. Ad esempio, se i tuoi dati nel file non supportano startsWith() ho usato, allora [2] sarà correttamente accettato da Spark e non genererà alcun errore, ma quando [3] viene inviato e Spark effettivamente valuta entrambi [1] e [2] , quindi e solo allora capirà che qualcosa non è corretto con [2] e produce un errore descrittivo.

Di conseguenza, un errore può essere attivato quando [3] viene eseguito, ma ciò non significa che l'errore deve trovarsi nell'istruzione di [3] !

Nota, né le lines né gli errors saranno memorizzati in memoria dopo [3] . Continueranno ad esistere solo come un insieme di istruzioni di elaborazione. Se ci saranno più azioni eseguite su uno di questi RDD, spark leggerà e filtrerà i dati più volte. Per evitare la duplicazione delle operazioni durante l'esecuzione di più azioni su un singolo RDD, è spesso utile memorizzare i dati nella memoria utilizzando la cache .


Puoi vedere più trasformazioni / azioni in Spark docs .

Controlla la versione di Spark

In spark-shell :

sc.version

Generalmente in un programma:

SparkContext.version

Utilizzando spark-submit :

 spark-submit --version


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow