apache-spark Samouczek
Rozpoczęcie pracy z Apache-Spark
Szukaj…
Uwagi
Apache Spark to platforma do przetwarzania dużych zbiorów danych o otwartym kodzie źródłowym, zbudowana wokół szybkości, łatwości użytkowania i wyrafinowanej analizy. Deweloper powinien z niego korzystać, gdy obsługuje dużą ilość danych, co zwykle implikuje ograniczenia pamięci i / lub zbyt długi czas przetwarzania.
Powinien również wymieniać wszelkie duże tematy w ramach iskry apache i link do powiązanych tematów. Ponieważ Dokumentacja dla Apache-Spark jest nowa, może być konieczne utworzenie początkowych wersji tych pokrewnych tematów.
Wersje
Wersja | Data wydania |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 02.05.2017 |
2.1.0 | 28.12.2016 |
2.0.1 | 2016-10-03 |
2.0.0 | 26.07.2016 |
1.6.0 | 04.01.2016 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 13.03.2015 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 02.02.2014 |
0.8.0 | 2013-09-25 |
0.7.0 | 27.02.2013 |
0.6.0 | 15.10.2012 |
Wprowadzenie
Prototyp :
agregat (zeroValue, seqOp, combOp)
Opis :
aggregate()
pozwala pobrać RDD i wygenerować pojedynczą wartość, która jest innego typu niż ta, która była zapisana w pierwotnym RDD.
Parametry :
-
zeroValue
: Wartość inicjalizacji dla twojego wyniku, w pożądanym formacie. -
seqOp
: Operacja, którą chcesz zastosować do rekordów RDD. Działa raz dla każdego rekordu na partycji. -
combOp
: Definiuje sposóbcombOp
obiektówcombOp
(jednego dla każdej partycji).
Przykład :
Oblicz sumę listy i długość tej listy. Zwróć wynik w parze
(sum, length)
.
W powłoce Spark utwórz listę z 4 elementami z 2 partycjami :
listRDD = sc.parallelize([1,2,3,4], 2)
Następnie zdefiniuj seqOp :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
Następnie zdefiniuj combOp :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
Następnie agregowane:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Pierwsza partycja ma podlistę [1, 2]. Dotyczy to seqOp do każdego elementu tej listy, co daje wynik lokalny - parę (sum, length)
która będzie odzwierciedlać wynik lokalnie, tylko w tej pierwszej partycji.
local_result
zostaje zainicjowany na parametr zeroValue
został wyposażony w aggregate()
. Na przykład (0, 0) i list_element
to pierwszy element listy:
0 + 1 = 1
0 + 1 = 1
Lokalny wynik to (1, 1), co oznacza, że suma wynosi 1, a długość 1 dla 1. partycji po przetworzeniu tylko pierwszego elementu. local_result
zostanie zaktualizowany z (0, 0) na (1, 1).
1 + 2 = 3
1 + 1 = 2
Lokalny wynik to teraz (3, 2), który będzie końcowym wynikiem z pierwszej partycji, ponieważ nie są one innymi elementami na liście podrzędnej pierwszej partycji. To samo robi dla zwrotów drugiej partycji (7, 2).
Zastosuj combOp do każdego lokalnego wyniku, aby uzyskać końcowy, globalny wynik:
(3,2) + (7,2) = (10, 4)
Przykład opisany na „rysunku”:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Transformacja kontra działanie
Spark używa leniwej oceny ; oznacza to, że nie wykona żadnej pracy, chyba że naprawdę musi. Takie podejście pozwala nam uniknąć niepotrzebnego zużycia pamięci, dzięki czemu możemy pracować z dużymi danymi.
Transformacja jest leniwie oceniana, a rzeczywista praca ma miejsce, gdy nastąpi akcja .
Przykład:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
Tak więc w [1]
powiedzieliśmy Sparkowi, aby wczytał plik do RDD o nazwie lines
. Spark usłyszał nas i powiedział nam: „Tak, to zrobi”, ale w rzeczywistości to jeszcze nie odczytać pliku.
W [2] filtrujemy linie pliku, zakładając, że jego zawartość zawiera linie z błędami oznaczonymi error
na początku. Mówimy więc Sparkowi, aby utworzył nowy RDD, zwany errors
, który będzie zawierał elementy lines
RDD, które miały słowo error
na początku.
Teraz w [3]
prosimy Spark o policzenie błędów , tj. O policzenie liczby elementów, które RDD nazywa errors
. count()
to akcja , która nie pozostawia wyboru Sparkowi, ale wykonanie tej operacji, aby mogła znaleźć wynik count()
, który będzie liczbą całkowitą.
W rezultacie, gdy [3]
został osiągnięty, [1]
i [2]
będzie faktycznie wykonywana, to znaczy, że gdy dotrzemy [3]
, wtedy i tylko wtedy:
plik zostanie odczytany w
textFile()
(z powodu[1]
)lines
będąfilter()
'ed (z powodu[2]
)count()
wykona się z powodu[3]
Wskazówka dotycząca debugowania: Ponieważ Spark nie wykona żadnej prawdziwej pracy, dopóki nie zostanie osiągnięty [3]
, ważne jest, aby zrozumieć, że jeśli błąd występuje w [1]
i / lub [2]
, nie pojawi się, dopóki akcja w [3]
powoduje, że Spark wykonuje rzeczywistą pracę. Na przykład, jeśli twoje dane w pliku nie obsługują funkcji startsWith()
której użyłem, wówczas [2]
zostanie poprawnie zaakceptowany przez Spark i nie wygeneruje żadnego błędu, ale po przesłaniu [3]
, a Spark faktycznie ocenia zarówno [1]
i [2]
, wtedy i tylko wtedy zrozumie, że coś jest nie tak z [2]
i wygeneruje błąd opisowy.
W rezultacie błąd może zostać wyzwolony, gdy wykonywane jest [3]
, ale to nie znaczy, że błąd musi znajdować się w instrukcji [3]
!
Uwaga: ani lines
ani errors
nie zostaną zapisane w pamięci po [3]
. Będą nadal istnieć tylko jako zestaw instrukcji przetwarzania. Jeśli na jednym z tych RDD zostanie wykonanych wiele akcji, Spark będzie czytał i filtrował dane wiele razy. Aby uniknąć powielania operacji podczas wykonywania wielu działań na jednym RDD, często przydatne jest przechowywanie danych w pamięci za pomocą pamięci cache
.
Więcej transformacji / działań można znaleźć w dokumentacji Spark .
Sprawdź wersję Spark
W spark-shell
:
sc.version
Ogólnie w programie:
SparkContext.version
Za pomocą spark-submit
:
spark-submit --version