apache-spark
Partitionen
Suche…
Bemerkungen
Die Anzahl der Partitionen ist entscheidend für die Leistung einer Anwendung und / oder für eine erfolgreiche Beendigung.
Ein Resilient Distributed Dataset (RDD) ist die Hauptabstraktion von Spark. Eine RDD ist in Partitionen aufgeteilt, dh, eine Partition ist Teil des Datasets, ein Teil davon oder mit anderen Worten ein Teil davon.
Je größer die Anzahl der Partitionen ist, desto kleiner ist jede Partition.
Beachten Sie jedoch, dass eine große Anzahl von Partitionen Hadoop Distributed File System (HDFS) sehr unter Druck setzt, da hier eine beträchtliche Menge an Metadaten vorhanden sein muss.
Die Anzahl der Partitionen hängt von der Speicherauslastung ab, und ein Problem mit memoryOverhead kann sich auf diese Anzahl beziehen ( persönliche Erfahrung ).
Eine häufige Gefahr für neue Benutzer ist die Umwandlung ihrer RDD in eine RDD mit nur einer Partition, die normalerweise so aussieht:
data = sc.textFile(file)
data = data.coalesce(1)
Das ist normalerweise eine sehr schlechte Idee, da Sie Spark sagen, dass alle Daten nur eine Partition sind. Erinnere dich daran:
Eine Phase in Spark arbeitet jeweils nur auf einer Partition (und lädt die Daten in dieser Partition in den Speicher).
Aus diesem Grund weisen Sie Spark an, alle Daten auf einmal zu behandeln, was normalerweise zu Fehlern in Bezug auf den Speicher (beispielsweise zu wenig Speicher) oder sogar zu einer Ausnahme mit einem Nullzeiger führt.
Wenn Sie also nicht wissen, was Sie tun, vermeiden Sie es, Ihre RDD in nur einer Partition neu zu partitionieren!
Partitionen Intro
Wie wird eine RDD partitioniert?
Standardmäßig wird für jede HDFS-Partition eine Partition erstellt, die standardmäßig 64 MB beträgt. Lesen Sie hier mehr.
Wie kann ich meine Daten auf mehrere Partitionen verteilen?
Sehen Sie sich zunächst die drei Möglichkeiten an, wie Sie seine Daten neu partitionieren können:
Übergeben Sie einen zweiten Parameter, die gewünschte Mindestanzahl von Partitionen für Ihre RDD, an textFile () , aber seien Sie vorsichtig:
In [14]: lines = sc.textFile ("data")
In [15]: lines.getNumPartitions () Out [15]: 1000
In [16]: lines = sc.textFile ("data", 500)
In [17]: lines.getNumPartitions () Out [17]: 1434
In [18]: lines = sc.textFile ("data", 5000)
In [19]: lines.getNumPartitions () Out [19]: 5926
Wie Sie sehen, tut [16]
nicht das, was man erwarten würde, da die Anzahl der Partitionen, die die RDD hat, bereits größer ist als die Mindestanzahl der von uns angeforderten Partitionen.
Verwenden Sie die Aufteilung () wie folgt :
In [22]: lines = lines.repartition (10)
In [23]: lines.getNumPartitions () Out [23]: 10
Warnung: Dadurch wird eine Zufallswiedergabe ausgelöst und sollte verwendet werden, wenn Sie die Anzahl der Partitionen Ihrer RDD erhöhen möchten.
Aus den Dokumenten :
Der Shuffle-Mechanismus ist der Mechanismus von Spark, um Daten neu zu verteilen, sodass sie zwischen den Partitionen unterschiedlich gruppiert werden. Dazu gehört in der Regel das Kopieren von Daten zwischen Executoren und Maschinen, wodurch der Shuffle zu einem komplexen und kostspieligen Vorgang wird.
Verwenden Sie coalesce () wie folgt:
In [25]: lines = lines.coalesce (2)
In [26]: lines.getNumPartitions () Out [26]: 2
Hier weiß Spark, dass Sie die RDD verkleinern werden und davon profitieren können. Lesen Sie mehr über Aufteilung () und Coalesce () .
Aber wird all dies garantieren, dass Ihre Daten in Ihren Partitionen perfekt ausgewogen sind? Nicht wirklich, wie ich erfahren habe, wie man meine Daten über die Partitionen verteilt.
Partitionen einer RDD
Wie in "Bemerkungen" erwähnt, ist eine Partition ein Teil / Slice / Chunk einer RDD. Im Folgenden finden Sie ein minimales Beispiel für das Anfordern einer minimalen Anzahl von Partitionen für Ihre RDD:
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
Beachten Sie in [1], wie wir 2 als zweiten Parameter von parallelize()
. Dieser Parameter besagt, dass wir möchten, dass unsere RDD mindestens zwei Partitionen hat.
Eine RDD neu partitionieren
Manchmal möchten wir eine RDD neu partitionieren, zum Beispiel, weil sie von einer Datei stammt, die nicht von uns erstellt wurde, und die Anzahl der vom Ersteller definierten Partitionen nicht die ist, die wir wollen.
Die zwei bekanntesten Funktionen, um dies zu erreichen, sind:
repartition(numPartitions)
und:
coalesce(numPartitions, shuffle=False)
Als Faustregel gilt: Verwenden Sie die erste Methode, wenn Sie Ihre RDD in einer größeren Anzahl von Partitionen neu partitionieren möchten, und die zweite, um Ihre RDD in einer kleineren Anzahl von Partitionen zu reduzieren. Spark-Aufteilung () vs. Koaleszierung () .
Zum Beispiel:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
verringert die Anzahl der Partitionen der RDD, die als "Daten" bezeichnet werden, auf 100, da diese RDD mehr als 100 Partitionen hat, wenn sie von textFile()
gelesen wurde.
Und in ähnlicher Weise, wenn Sie mehr als die aktuelle Anzahl von Partitionen für Ihre RDD haben möchten, können Sie dies tun (da Ihre RDD beispielsweise in 200 Partitionen verteilt ist):
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
Faustregel zur Anzahl der Partitionen
Als Faustregel würde man wollen, dass seine RDD so viele Partitionen hat wie das Produkt der Anzahl der Executoren, um die Anzahl der verwendeten Kerne um 3 (oder vielleicht 4). Das ist natürlich eine Heuristik, und es hängt wirklich von Ihrer Anwendung, Ihrer Datenmenge und der Cluster-Konfiguration ab.
Beispiel:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
RDD-Inhalte anzeigen
Um den Inhalt einer RDD anzuzeigen, muss diese gedruckt werden:
myRDD.foreach(println)
So beschränken Sie die Anzahl der gedruckten Zeilen:
myRDD.take(num_of_rows).foreach(println)