apache-spark
Partycje
Szukaj…
Uwagi
Liczba partycji ma kluczowe znaczenie dla wydajności aplikacji i / lub pomyślnego zakończenia.
Odporny rozproszony zestaw danych (RDD) jest główną abstrakcją Sparka. RDD jest podzielony na partycje, co oznacza, że partycja jest częścią zestawu danych, jego fragmentem, lub innymi słowy, jego częścią.
Im większa liczba partycji, tym mniejszy jest rozmiar każdej partycji.
Zauważ jednak, że duża liczba partycji wywiera dużą presję na rozproszony system plików Hadoop (HDFS), który musi przechowywać znaczną ilość metadanych.
Liczba partycji jest związana z użyciem pamięci, a problem z pamięcią zewnętrzną może być związany z tym numerem ( osobiste doświadczenie ).
Częstą pułapką dla nowych użytkowników jest przekształcanie ich RDD w RDD z tylko jedną partycją, która zwykle wygląda tak:
data = sc.textFile(file)
data = data.coalesce(1)
To zwykle bardzo zły pomysł, ponieważ mówisz Sparkowi, aby wszystkie dane były tylko jedną partycją! Zapamietaj to:
Stopień w Spark będzie działał na jednej partycji naraz (i ładował dane z tej partycji do pamięci).
W rezultacie każesz Sparkowi obsługiwać wszystkie dane naraz, co zwykle powoduje błędy związane z pamięcią (na przykład brak pamięci) lub nawet wyjątek wskaźnika zerowego.
Tak więc, chyba że wiesz, co robisz, unikaj dzielenia RDD na jedną partycję!
Wprowadzenie do partycji
Jak partycjonuje się RDD?
Domyślnie partycja tworzona jest dla każdej partycji HDFS, która domyślnie ma 64 MB. Przeczytaj więcej tutaj .
Jak zrównoważyć moje dane między partycjami?
Najpierw spójrz na trzy sposoby podziału danych na partycje :
Przekaż drugi parametr, pożądaną minimalną liczbę partycji dla twojego RDD, do textFile () , ale bądź ostrożny:
W [14]: lines = sc.textFile („data”)
W [15]: lines.getNumPartitions () Out [15]: 1000
W [16]: lines = sc.textFile („data”, 500)
W [17]: lines.getNumPartitions () Out [17]: 1434
W [18]: lines = sc.textFile („data”, 5000)
W [19]: lines.getNumPartitions () Out [19]: 5926
Jak widać, [16]
nie robi tego, czego można by oczekiwać, ponieważ liczba partycji RDD jest już większa niż minimalna liczba partycji, o które prosimy.
Użyj repartition () , jak poniżej:
W [22]: lines = lines.repartition (10)
W [23]: lines.getNumPartitions () Out [23]: 10
Ostrzeżenie: spowoduje to losowe odtwarzanie i powinno być używane, gdy chcesz zwiększyć liczbę partycji posiadanych przez RDD.
Z dokumentów :
Losowanie to mechanizm Spark'a służący do ponownej dystrybucji danych, dzięki czemu są one pogrupowane w różny sposób na partycje. Zazwyczaj wiąże się to z kopiowaniem danych między programami wykonawczymi i komputerami, przez co tasowanie jest złożoną i kosztowną operacją.
Użyj coalesce () , jak poniżej:
W [25]: lines = lines.coalesce (2)
W [26]: lines.getNumPartitions () Out [26]: 2
Tutaj Spark wie, że zmniejszysz RDD i skorzystasz z niego. Przeczytaj więcej o repartition () vs coalesce () .
Ale czy to wszystko zagwarantuje, że twoje dane będą idealnie zrównoważone na twoich partycjach? Nie bardzo, jak doświadczyłem w Jak zrównoważyć moje dane na partycjach?
Partycje RDD
Jak wspomniano w „Uwagach”, partycja jest częścią / plasterkiem / fragmentem RDD. Poniżej znajduje się minimalny przykład, w jaki sposób zażądać minimalnej liczby partycji dla twojego RDD:
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
Zauważ w [1], w jaki sposób przekazaliśmy 2 jako drugi parametr parallelize()
. Ten parametr mówi, że chcemy, aby nasz RDD miał co najmniej 2 partycje.
Ponowny podział RDD
Czasami chcemy podzielić partycję RDD, na przykład ponieważ pochodzi ona z pliku, który nie został przez nas utworzony, a liczba partycji zdefiniowanych przez twórcę nie jest taka, jakiej chcemy.
Dwie najbardziej znane funkcje do osiągnięcia tego celu to:
repartition(numPartitions)
i:
coalesce(numPartitions, shuffle=False)
Zasadniczo używaj pierwszej, jeśli chcesz podzielić RDD na większą liczbę partycji, a drugą, aby zmniejszyć RDD, na mniejszej liczbie partycji. Spark - repartition () vs coalesce () .
Na przykład:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
zmniejszy liczbę partycji RDD o nazwie „data” do 100, biorąc pod uwagę, że ta RDD ma ponad 100 partycji, gdy zostanie odczytana przez textFile()
.
I podobnie, jeśli chcesz mieć więcej niż bieżącą liczbę partycji dla twojego RDD, możesz to zrobić (biorąc pod uwagę, że twój RDD jest podzielony na przykład na 200 partycji):
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
Ogólna zasada o liczbie partycji
Zgodnie z ogólną zasadą, jego RDD powinien mieć tyle partycji, ile iloczynu liczby wykonawców przez liczbę używanych rdzeni o 3 (a może 4). Oczywiście jest to heurystyka i naprawdę zależy od aplikacji, zestawu danych i konfiguracji klastra.
Przykład:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
Pokaż zawartość RDD
Aby wyświetlić zawartość RDD, należy wydrukować:
myRDD.foreach(println)
Aby ograniczyć liczbę wydrukowanych wierszy:
myRDD.take(num_of_rows).foreach(println)