apache-spark Tutorial
Erste Schritte mit Apache-Spark
Suche…
Bemerkungen
Apache Spark ist ein Open Source-Framework für die Verarbeitung von Big Data, das auf Geschwindigkeit, Benutzerfreundlichkeit und ausgefeilten Analysen basiert. Ein Entwickler sollte es verwenden, wenn er große Datenmengen handhabt, was normalerweise Speicherbeschränkungen und / oder unzulässige Verarbeitungszeiten impliziert.
Es sollte auch alle großen Themen innerhalb von Apache-Spark erwähnen und auf die verwandten Themen verweisen. Da die Dokumentation für apache-spark neu ist, müssen Sie möglicherweise erste Versionen dieser verwandten Themen erstellen.
Versionen
Ausführung | Veröffentlichungsdatum |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 2017-05-02 |
2.1.0 | 2016-12-28 |
2.0.1 | 2016-10-03 |
2.0.0 | 2016-07-26 |
1.6.0 | 2016-01-04 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0,9,0 | 2014-02-02 |
0,8,0 | 2013-09-25 |
0,7,0 | 2013-02-27 |
0,6,0 | 2012-10-15 |
Einführung
Vorbild:
Aggregat (zeroValue, seqOp, combOp)
Beschreibung:
aggregate()
können Sie eine RDD verwenden und einen einzelnen Wert generieren, der sich von dem in der ursprünglichen RDD gespeicherten Typ unterscheidet.
Parameter :
-
zeroValue
: Der Initialisierungswert für Ihr Ergebnis im gewünschten Format. -
seqOp
: Der Vorgang , den Sie RDD Datensätze anwenden möchten. Läuft einmal für jeden Datensatz in einer Partition. -
combOp
: Definiert, wie die resultierenden Objekte (eines für jede Partition) kombiniert werden.
Beispiel :
Berechnen Sie die Summe aus einer Liste und die Länge dieser Liste. Bringen Sie das Ergebnis in einem Paar
(sum, length)
.
In einem Spark - Shell, eine Liste mit vier Elementen, mit 2 Partitionen erstellen:
listRDD = sc.parallelize([1,2,3,4], 2)
Dann definieren seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Dann combOp definieren:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Dann aggregiert:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Die erste Partition hat den sublist [1, 2]. Dies gilt für jedes Element dieser Liste, das ein lokales Ergebnis erzeugt - ein Paar aus (sum, length)
, das das Ergebnis lokal nur in dieser ersten Partition widerspiegelt.
local_result
wird auf den zeroValue
Parameter initialisiert, der zeroValue
aggregate()
versehen wurde. Zum Beispiel ist (0, 0) und list_element
das erste Element der Liste:
0 + 1 = 1
0 + 1 = 1
Das lokale Ergebnis ist (1, 1), dh die Summe ist 1 und die Länge 1 für die 1. Partition, nachdem nur das erste Element verarbeitet wurde. local_result
wird von (0, 0) bis (1, 1) aktualisiert.
1 + 2 = 3
1 + 1 = 2
Das lokale Ergebnis ist jetzt (3, 2). Dies ist das Endergebnis der 1. Partition, da es sich nicht um andere Elemente in der Unterliste der 1. Partition handelt. Dadurch könnte die gleiche für die 2. Partition zurückkehrt (7, 2).
Bewerben combOp zu jedem lokalen Ergebnis die endgültige, weltweite Ergebnis zu bilden:
(3,2) + (7,2) = (10, 4)
Beispiel in ‚Zahl‘ beschrieben:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Transformation vs. Aktion
Spark verwendet eine faule Bewertung ; Das bedeutet, dass es keine Arbeit leistet, es sei denn, es muss wirklich. Auf diese Weise können wir unnötigen Speicherbedarf vermeiden und so mit Big Data arbeiten.
Eine Transformation wird faul bewertet und die eigentliche Arbeit findet statt, wenn eine Aktion ausgeführt wird .
Beispiel:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Daher haben wir Spark in [1]
befohlen, eine Datei in eine RDD ( lines
einzulesen. Spark hörte uns und sagte: "Ja, ich werde es tun", aber tatsächlich hat er die Datei noch nicht gelesen.
In [2] filtern wir die Zeilen der Datei und nehmen an, dass der Inhalt Zeilen enthält, deren error
am Anfang mit einem error
gekennzeichnet sind. Also sagen wir Funken ein neues RDD zu schaffen, die so genannten errors
, die die Elemente der RDD haben lines
, in denen das Wort hatten error
bei ihrem Anfang.
In [3]
bitten wir Spark nun, die Fehler zu zählen , dh die Anzahl der Elemente, die der RDD- errors
hat. count()
ist eine Aktion , die Spark keine Wahl lässt, sondern die Operation tatsächlich ausführt, um das Ergebnis von count()
, das eine Ganzzahl ist.
Wenn also [3]
erreicht wird, werden tatsächlich [1]
und [2]
ausgeführt, dh wenn [3]
, dann und nur dann:
Die Datei wird in
textFile()
gelesen (wegen[1]
)lines
werdenfilter()
(wegen[2]
)count()
wird ausgeführt, weil[3]
Debug-Tipp: Da Spark erst dann echte Arbeit leistet, wenn [3]
erreicht ist, ist es wichtig zu verstehen, dass ein Fehler in [1]
und / oder [2]
erst dann angezeigt wird, wenn die Aktion ausgeführt wird [3]
löst Spark für die eigentliche Arbeit aus. Wenn zum Beispiel Ihre Daten in der Datei das von mir verwendete startsWith()
nicht unterstützen, wird [2]
von Spark ordnungsgemäß akzeptiert, und es werden keine Fehler startsWith()
Wenn jedoch [3]
wird, wird Spark tatsächlich angezeigt wertet sowohl [1]
als auch [2]
und erst dann wird es verstehen, dass etwas mit [2]
nicht stimmt und einen beschreibenden Fehler erzeugt.
Daher kann ein Fehler ausgelöst werden, wenn [3]
ausgeführt wird. Dies bedeutet jedoch nicht, dass der Fehler in der Anweisung von [3]
liegen muss!
Beachten Sie, dass nach [3]
weder lines
noch errors
gespeichert werden. Sie existieren nur als Satz von Verarbeitungsanweisungen. Wenn für eine dieser RDDs mehrere Aktionen ausgeführt werden, liest und filtert spark die Daten mehrmals. Um Doppeloperationen zu vermeiden, wenn mehrere Aktionen für eine einzelne RDD ausgeführt werden, ist es häufig hilfreich, Daten im cache
Verwendung des cache
zu speichern.
Weitere Transformationen / Aktionen finden Sie in Spark-Dokumenten .
Überprüfen Sie die Spark-Version
In spark-shell
:
sc.version
Generell in einem Programm:
SparkContext.version
Verwenden von spark-submit
:
spark-submit --version