Sök…


Anmärkningar

Apache Spark är en öppen källkod för stor databehandling byggd kring hastighet, användarvänlighet och sofistikerad analys. En utvecklare bör använda den när han / hon hanterar stora datamängder, vilket vanligtvis innebär minnesbegränsningar och / eller oöverkomlig behandlingstid.


Det bör också nämna alla stora ämnen inom apache-gnista och länka till relaterade ämnen. Eftersom dokumentationen för apache-spark är ny kan du behöva skapa initialversioner av relaterade ämnen.

versioner

Version Utgivningsdatum
2.2.0 2017/07/11
2.1.1 2017/05/02
2.1.0 2016/12/28
2.0.1 2016/10/03
2.0.0 2016/07/26
1.6.0 2016/01/04
1.5.0 2015/09/09
1.4.0 2015/06/11
1.3.0 2015/03/13
1.2.0 2014/12/18
1.1.0 2014/09/11
1.0.0 2014/05/30
0.9.0 2014/02/02
0.8.0 2013/09/25
0.7.0 2013/02/27
0.6.0 2012/10/15

Introduktion

Prototyp :

aggregat (zeroValue, seqOp, combOp)

Beskrivning :

aggregate() låter dig ta en RDD och generera ett enda värde som är av en annan typ än vad som lagrades i det ursprungliga RDD.

Parametrar :

  1. zeroValue : zeroValue för ditt resultat i önskat format.
  2. seqOp : Den operation du vill använda på RDD-poster. Kör en gång för varje skiva i en partition.
  3. combOp : Definierar hur de resulterade objekten (ett för varje partition) kombineras.

Exempel :

Beräkna summan av en lista och längden på listan. Returnera resultatet i ett par (sum, length) .

Skapa en lista med fyra element i ett gnisterskal med två partitioner :

listRDD = sc.parallelize([1,2,3,4], 2)

Definiera sedan seqOp :

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

Definiera sedan combOp :

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

Sedan aggregerade:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

Den första partitionen har sublist [1, 2]. Detta tillämpar sekvensen för varje element i listan, som ger ett lokalt resultat - Ett par (sum, length) som kommer att återspegla resultatet lokalt, bara i den första partitionen.

local_result initialiseras till parametern zeroValue aggregate() tillhandahölls med. Till exempel är (0, 0) och list_element det första elementet i listan:

0 + 1 = 1
0 + 1 = 1

Det lokala resultatet är (1, 1), vilket innebär att summan är 1 och längden 1 för den första partitionen efter bearbetning endast det första elementet. local_result uppdateras från (0, 0), till (1, 1).

1 + 2 = 3
1 + 1 = 2

Det lokala resultatet är nu (3, 2), vilket blir det slutliga resultatet från den första partitionen, eftersom de inte är några andra element i underlistan för den första partitionen. Att göra samma sak för andra partition returnerar (7, 2).

Applicera combOp på varje lokalt resultat för att bilda det slutliga, globala resultatet:

(3,2) + (7,2) = (10, 4)

Exempel som beskrivs i "figur":

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

Transformation vs action

Spark använder lat utvärdering ; det betyder att det inte kommer att göra något arbete, såvida det inte verkligen måste. Den metoden gör att vi kan undvika onödig minnesanvändning, vilket gör att vi kan arbeta med big data.

En omvandling utvärderas lat och det faktiska arbetet händer när en handling inträffar.

Exempel:

In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example

[1] berättade vi för Spark att läsa en fil i en RDD med namngivna lines . Spark hörde oss och sa till oss: "Ja, jag kommer att göra det", men faktiskt läste filen inte ännu .

I [2] filtrerar vi filens linjer under antagande att dess innehåll innehåller rader med fel som är markerade med ett error i början. Så vi berätta för Spark att skapa en ny RDD, kallad errors , som kommer att ha elementen i RDD- lines , som hade error i början.

Nu [3] ber vi Spark att räkna felen , dvs räkna antalet element som RDD kallat errors har. count() är en åtgärd som inte lämnar något val för Spark, men att faktiskt göra operationen, så att den kan hitta resultatet av count() , som kommer att vara ett heltal.

Som ett resultat, när [3] nås, kommer [1] och [2] faktiskt att utföras, dvs. att när vi når [3] , då och bara då:

  1. filen kommer att läsas i textFile() (på grund av [1] )

  2. lines kommer att filter() 'redigeras (på grund av [2] )

  3. count() kommer att köras på grund av [3]


Felsökningstips: Eftersom Spark inte kommer att göra något riktigt arbete förrän [3] har nåtts, är det viktigt att förstå att om det finns ett fel i [1] och / eller [2] så kommer det inte att visas förrän handlingen i [3] utlöser Spark till faktiskt arbete. Till exempel om dina data i filen inte stöder startsWith() jag använde, kommer [2] att accepteras korrekt av Spark och det kommer inte att ge några fel, men när [3] skickas in, och Spark faktiskt utvärderar både [1] och [2] , då och först då kommer det att förstå att något inte är korrekt med [2] och ger ett beskrivande fel.

Som ett resultat kan ett fel utlösas när [3] körs, men det betyder inte att felet måste ligga i uttalandet om [3] !

Observera att varken lines eller errors lagras i minnet efter [3] . De kommer att fortsätta att existera endast som en uppsättning behandlingsinstruktioner. Om det kommer att utföras flera åtgärder på någon av dessa RDD: er kommer gnisten att läsa och filtrera data flera gånger. För att undvika dubblering av operationer vid utförande av flera åtgärder på en enda RDD är det ofta användbart att lagra data i minnet med hjälp av cache .


Du kan se fler transformationer / åtgärder i gnistdokument .

Kontrollera gnistversionen

I spark-shell :

sc.version

Generellt i ett program:

SparkContext.version

Använda spark-submit :

 spark-submit --version


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow