apache-spark
Wanden
Zoeken…
Opmerkingen
Het aantal partities is cruciaal voor de prestaties en / of succesvolle beëindiging van een toepassing.
Een Resilient Distributed Dataset (RDD) is Spark's belangrijkste abstractie. Een RDD is opgesplitst in partities, wat betekent dat een partitie deel uitmaakt van de gegevensset, een deel ervan, of met andere woorden, een deel ervan.
Hoe groter het aantal partities, hoe kleiner de grootte van elke partitie.
Merk echter op dat een groot aantal partities veel druk uitoefent op Hadoop Distributed File System (HDFS), dat een aanzienlijke hoeveelheid metadata moet behouden.
Het aantal partities is gerelateerd aan het geheugengebruik en een memoryOverhead-probleem kan gerelateerd zijn aan dit nummer ( persoonlijke ervaring ).
Een veel voorkomende valkuil voor nieuwe gebruikers is om hun RDD te transformeren in een RDD met slechts één partitie, die er meestal zo uitziet:
data = sc.textFile(file)
data = data.coalesce(1)
Dat is meestal een heel slecht idee, omdat je Spark vertelt dat alle gegevens maar één partitie moeten zijn! Onthoud dat:
Een fase in Spark werkt op één partitie tegelijk (en laadt de gegevens in die partitie in het geheugen).
Als gevolg hiervan vertelt u Spark dat het alle gegevens in één keer moet verwerken, wat meestal resulteert in geheugengerelateerde fouten (bijvoorbeeld onvoldoende geheugen) of zelfs een uitzondering voor een lege aanwijzer.
Dus, tenzij u weet wat u doet, moet u uw RDD niet in slechts één partitie opnieuw partitioneren!
Partities Intro
Hoe wordt een RDD gepartitioneerd?
Standaard wordt voor elke HDFS-partitie een partitie gemaakt, die standaard 64 MB is. Lees hier meer.
Hoe breng ik mijn gegevens over verschillende partities in evenwicht?
Bekijk eerst de drie manieren waarop je zijn gegevens opnieuw kunt verdelen :
Geef een tweede parameter, het gewenste minimum aantal partities voor uw RDD, door in textFile () , maar wees voorzichtig:
In [14]: lines = sc.textFile ("data")
In [15]: lines.getNumPartitions () Uit [15]: 1000
In [16]: lines = sc.textFile ("data", 500)
In [17]: lines.getNumPartitions () Out [17]: 1434
In [18]: lines = sc.textFile ("data", 5000)
In [19]: lines.getNumPartitions () Out [19]: 5926
Zoals je kunt zien, doet [16]
niet wat men zou verwachten, omdat het aantal partities dat de RDD heeft al groter is dan het minimum aantal partities dat we vragen.
Gebruik repartition () , zoals hier:
In [22]: lines = lines.repartition (10)
In [23]: lines.getNumPartitions () Out [23]: 10
Waarschuwing: dit roept een shuffle op en moet worden gebruikt wanneer u het aantal partities dat uw RDD heeft wilt vergroten .
Uit de documenten :
De shuffle is het mechanisme van Spark voor het opnieuw distribueren van gegevens, zodat deze anders gegroepeerd zijn over partities. Dit omvat meestal het kopiëren van gegevens naar uitvoerders en machines, waardoor de shuffle een complexe en kostbare operatie wordt.
Gebruik coalesce () , zoals hier:
In [25]: lines = lines.coalesce (2)
In [26]: lines.getNumPartitions () Out [26]: 2
Hier weet Spark dat u de RDD zult verkleinen en profiteert u ervan. Lees meer over repartition () vs coalesce () .
Maar zal dit alles garanderen dat uw gegevens perfect in evenwicht zijn over uw partities? Niet echt, zoals ik heb ervaren in Hoe mijn gegevens over de partities te verdelen?
Partities van een RDD
Zoals vermeld in "Opmerkingen", is een partitie een onderdeel / segment / stuk van een RDD. Hieronder is een minimaal voorbeeld van hoe u een minimum aantal partities voor uw RDD kunt aanvragen:
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
Merk in [1] op hoe we 2 zijn gepasseerd als een tweede parameter van parallelize()
. Die parameter zegt dat we willen dat onze RDD minimaal 2 partities heeft.
Verdeling van een RDD
Soms willen we een RDD opnieuw partitioneren, bijvoorbeeld omdat het afkomstig is van een bestand dat niet door ons is gemaakt en omdat het aantal partities dat door de maker is gedefinieerd niet het gewenste is.
De twee meest bekende functies om dit te bereiken zijn:
repartition(numPartitions)
en:
coalesce(numPartitions, shuffle=False)
Gebruik als vuistregel de eerste wanneer u uw RDD in een groter aantal partities opnieuw wilt partitioneren en de tweede om uw RDD in een kleiner aantal partities te verminderen. Spark - repartition () vs coalesce () .
Bijvoorbeeld:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
zal het aantal partities van de RDD genaamd 'data' verminderen tot 100, aangezien deze RDD meer dan 100 partities heeft toen het werd gelezen door textFile()
.
En op een vergelijkbare manier, als u meer dan het huidige aantal partities voor uw RDD wilt hebben, zou u het kunnen doen (gezien het feit dat uw RDD bijvoorbeeld in 200 partities wordt verdeeld):
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
Vuistregel over het aantal partities
Als vuistregel zou men willen dat zijn RDD net zoveel partities heeft als het product van het aantal executors door het aantal gebruikte cores met 3 (of misschien 4). Dat is natuurlijk een heuristiek en het hangt echt af van uw applicatie, dataset en clusterconfiguratie.
Voorbeeld:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
Toon RDD-inhoud
Om de inhoud van een RDD te tonen, moet deze worden afgedrukt:
myRDD.foreach(println)
Om het aantal afgedrukte rijen te beperken:
myRDD.take(num_of_rows).foreach(println)