apache-spark
partitioner
Sök…
Anmärkningar
Antalet partitioner är avgörande för en applikations prestanda och / eller framgångsrika avslutning.
En Resilient Distribuerad Dataset (RDD) är Sparks huvudsakliga abstraktion. En RDD delas upp i partitioner, det betyder att en partition är en del av datasatsen, en bit av det, eller med andra ord, en bit av den.
Ju större antalet partitioner är, desto mindre är storleken på varje partition.
Observera dock att ett stort antal partitioner sätter ett stort tryck på Hadoop Distribuerat filsystem (HDFS), som måste hålla en betydande mängd metadata.
Antalet partitioner är relaterat till minnesanvändningen, och ett minne över huvudproblem kan relateras till detta nummer ( personlig upplevelse ).
En vanlig fallgrop för nya användare är att förvandla sin RDD till en RDD med bara en partition, som vanligtvis ser ut så:
data = sc.textFile(file)
data = data.coalesce(1)
Det är vanligtvis en väldigt dålig idé, eftersom du berättar för Spark att lägga all information är bara en partition! Kom ihåg det:
Ett steg i Spark fungerar på en partition åt gången (och laddar data i den partitionen i minnet).
Som ett resultat ber du Spark att hantera alla data på en gång, vilket vanligtvis resulterar i minnesrelaterade fel (till exempel utan minne), eller till och med ett undantag från nollpekaren.
Så om du inte vet vad du gör ska du undvika att återuppdela din RDD i bara en partition!
Partitioner Intro
Hur blir en RDD partitionerad?
Som standard skapas en partition för varje HDFS-partition, som som standard är 64MB. Läs mer här .
Hur balanserar jag mina data mellan partitioner?
Först, titta på de tre sätten man kan dela upp sina data på:
Skicka en andra parameter, det önskade minsta antalet partitioner för din RDD, till textFile () , men var försiktig:
I [14]: linjer = sc.textFile ("data")
I [15]: lines.getNumPartitions () Out [15]: 1000
I [16]: rader = sc.textFile ("data", 500)
I [17]: lines.getNumPartitions () Out [17]: 1434
I [18]: rader = sc.textFile ("data", 5000)
I [19]: lines.getNumPartitions () Out [19]: 5926
Som ni kan se, [16]
gör inte vad man kan förvänta sig, eftersom antalet partitioner som RDD har, redan är större än det minsta antalet partitioner vi begär.
Använd repartition () , så här:
I [22]: rader = linjer.delning (10)
I [23]: lines.getNumPartitions () Out [23]: 10
Varning: Detta kommer att åberopa en blandning och bör användas när du vill öka antalet partitioner som din RDD har.
Från dokumenten :
Blandningen är Sparks mekanism för att distribuera data så att de grupperas annorlunda mellan partitioner. Detta innebär vanligtvis att kopiera data över körare och maskiner, vilket gör blandningen till en komplex och kostsam operation.
Använd coalesce () , så här:
I [25]: rader = linjer.coalesce (2)
I [26]: lines.getNumPartitions () Out [26]: 2
Här vet Spark att du kommer att krympa RDD och få fördel av det. Läs mer om repartition () vs coalesce () .
Men kommer allt detta garantera att dina data kommer att vara perfekt balanserade mellan dina partitioner? Inte riktigt, som jag upplevde i Hur balanserar jag mina data mellan partitionerna?
Partitioner av en RDD
Som nämnts i "Anmärkningar" är en partition en del / skiva / bit av en RDD. Nedan är ett minimalt exempel på hur du begär ett minimumantal partitioner för din RDD:
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
Lägg märke till [1] hur vi passerade 2 som en andra parameter för parallelize()
. Den parametern säger att vi vill att vår RDD ska ha minst två partitioner.
Repartition en RDD
Ibland vill vi dela upp en RDD, till exempel för att den kommer från en fil som inte skapades av oss, och antalet partitioner som definierats från skaparen är inte det vi vill ha.
De två mest kända funktionerna för att uppnå detta är:
repartition(numPartitions)
och:
coalesce(numPartitions, shuffle=False)
Som tumregel använder du den första när du vill dela upp din RDD i ett större antal partitioner och den andra för att minska din RDD, i ett mindre antal partitioner. Spark - repartition () vs coalesce () .
Till exempel:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
kommer att minska antalet partitioner i RDD som kallas 'data' till 100, med tanke på att denna RDD har mer än 100 partitioner när den lästes av textFile()
.
Och på liknande sätt, om du vill ha mer än det aktuella antalet partitioner för din RDD, kan du göra (med tanke på att din RDD är distribuerad till exempelvis 200 partitioner):
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
Tommelfingerregel om antal partitioner
Som tumregel skulle man vilja att hans RDD ska ha lika många partitioner som produkten av antalet körare med antalet använda kärnor med 3 (eller kanske 4). Naturligtvis är det en heuristisk och det beror verkligen på din applikation, dataset och klusterkonfiguration.
Exempel:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
Visa RDD-innehåll
För att visa innehållet i en RDD måste den skrivas ut:
myRDD.foreach(println)
För att begränsa antalet utskrivna rader:
myRDD.take(num_of_rows).foreach(println)