apache-spark
Partitions
Recherche…
Remarques
Le nombre de partitions est critique pour les performances d'une application et / ou la résiliation réussie.
Un ensemble de données distribué résilient (RDD) est l'abstraction principale de Spark. Un RDD est divisé en partitions, ce qui signifie qu'une partition fait partie du jeu de données, une tranche ou, en d'autres termes, un morceau.
Plus le nombre de partitions est élevé, plus la taille de chaque partition est petite.
Cependant, notez qu'un grand nombre de partitions met beaucoup de pression sur le système de fichiers distribué Hadoop (HDFS), qui doit conserver une quantité importante de métadonnées.
Le nombre de partitions est lié à l'utilisation de la mémoire et un problème de memoryOverhead peut être lié à ce numéro ( expérience personnelle ).
Un piège courant pour les nouveaux utilisateurs est de transformer leur RDD en RDD avec une seule partition, qui ressemble généralement à ceci:
data = sc.textFile(file)
data = data.coalesce(1)
C'est généralement une très mauvaise idée, puisque vous dites à Spark que toutes les données ne sont qu'une partition! Rappelez-vous que:
Une étape dans Spark fonctionnera sur une partition à la fois (et chargera les données dans cette partition en mémoire).
Par conséquent, vous indiquez à Spark de gérer toutes les données à la fois, ce qui entraîne généralement des erreurs liées à la mémoire (mémoire insuffisante par exemple) ou même une exception de pointeur nul.
Donc, à moins de savoir ce que vous faites, évitez de partitionner votre RDD en une seule partition!
Partitions Intro
Comment un RDD est-il partitionné?
Par défaut, une partition est créée pour chaque partition HDFS, qui est par défaut de 64 Mo. Lire plus ici .
Comment équilibrer mes données entre partitions?
Tout d'abord, jetez un coup d'œil sur les trois façons de répartir ses données:
Transmettez un second paramètre, le nombre minimum de partitions souhaité pour votre RDD, dans textFile () , mais faites attention:
In [14]: lines = sc.textFile ("data")
Dans [15]: lines.getNumPartitions () Out [15]: 1000
In [16]: lines = sc.textFile ("data", 500)
Dans [17]: lines.getNumPartitions () Out [17]: 1434
In [18]: lines = sc.textFile ("data", 5000)
Dans [19]: lines.getNumPartitions () Out [19]: 5926
Comme vous pouvez le voir, [16]
ne fait pas ce à quoi on s’attendrait, car le nombre de partitions que possède le RDD est déjà supérieur au nombre minimum de partitions que nous demandons.
Utilisez repartition () , comme ceci:
Dans [22]: lignes = lignes.partition (10)
Dans [23]: lines.getNumPartitions () Out [23]: 10
Attention: ceci invoquera un shuffle et devrait être utilisé quand vous voulez augmenter le nombre de partitions de votre RDD.
De la documentation :
Le shuffle est le mécanisme utilisé par Spark pour redistribuer les données afin qu'elles soient regroupées différemment selon les partitions. Cela implique généralement la copie des données sur les exécuteurs et les machines, ce qui en fait une opération complexe et coûteuse.
Utilisez coalesce () , comme ceci:
In [25]: lines = lines.coalesce (2)
Dans [26]: lines.getNumPartitions () Out [26]: 2
Spark sait que vous allez réduire le RDD et en tirer parti. En savoir plus sur repartition () vs coalesce () .
Mais tout cela garantira-t-il que vos données seront parfaitement équilibrées sur vos partitions? Pas vraiment, comme je l'ai expérimenté dans Comment équilibrer mes données entre les partitions?
Partitions d'un RDD
Comme mentionné dans "Remarques", une partition est une partie / une tranche / un bloc d’un RDD. Voici un exemple minimal sur la façon de demander un nombre minimum de partitions pour votre RDD:
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
Remarquez dans [1] comment nous avons passé 2 comme second paramètre de parallelize()
. Ce paramètre dit que nous voulons que notre RDD ait au moins 2 partitions.
Repartitionner un RDD
Nous souhaitons parfois repartitionner un RDD, par exemple parce qu'il provient d'un fichier que nous n'avons pas créé, et que le nombre de partitions défini à partir du créateur n'est pas celui que nous souhaitons.
Les deux fonctions les plus connues pour y parvenir sont:
repartition(numPartitions)
et:
coalesce(numPartitions, shuffle=False)
En règle générale, utilisez le premier lorsque vous souhaitez repartitionner votre RDD dans un plus grand nombre de partitions et le second pour réduire votre RDD, dans un nombre plus restreint de partitions. Spark - repartition () vs coalesce () .
Par exemple:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
diminuera le nombre de partitions du RDD appelé 'data' à 100, étant donné que ce RDD a plus de 100 partitions lorsqu'il a été lu par textFile()
.
Et de la même manière, si vous souhaitez avoir plus que le nombre actuel de partitions pour votre RDD, vous pouvez le faire (étant donné que votre RDD est distribué dans 200 partitions par exemple):
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
Règle de base sur le nombre de partitions
En règle générale, on voudrait que son RDD ait autant de partitions que le produit du nombre d'exécuteurs par le nombre de cœurs utilisés par 3 (ou peut-être 4). Bien sûr, c'est une heuristique et cela dépend vraiment de votre application, de votre jeu de données et de la configuration de votre cluster.
Exemple:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
Afficher le contenu RDD
Pour afficher le contenu d'un RDD, il faut l'imprimer:
myRDD.foreach(println)
Pour limiter le nombre de lignes imprimées:
myRDD.take(num_of_rows).foreach(println)