Buscar..


Observaciones

El número de particiones es crítico para el rendimiento de una aplicación y / o la terminación exitosa.

Un conjunto de datos distribuido resistente (RDD) es la abstracción principal de Spark. Un RDD se divide en particiones, lo que significa que una partición es una parte del conjunto de datos, una porción de ella, o en otras palabras, una parte de ella.

Cuanto mayor sea el número de particiones, menor será el tamaño de cada partición.

Sin embargo, tenga en cuenta que un gran número de particiones ejerce mucha presión sobre el Sistema de archivos distribuidos de Hadoop (HDFS), que debe mantener una cantidad significativa de metadatos.

El número de particiones está relacionado con el uso de la memoria, y un problema de MemoryOverhead se puede relacionar con este número ( experiencia personal ).


Un error común para los nuevos usuarios es transformar su RDD en un RDD con una sola partición, que generalmente se ve así:

data = sc.textFile(file)
data = data.coalesce(1) 

¡Por lo general, es una muy mala idea, ya que le está diciendo a Spark que ponga todos los datos en una sola partición! Recuérdalo:

Una etapa en Spark operará en una partición a la vez (y cargará los datos en esa partición en la memoria).

Como resultado, le dice a Spark que maneje todos los datos a la vez, lo que generalmente resulta en errores relacionados con la memoria (memoria insuficiente, por ejemplo), o incluso una excepción de puntero nulo.

Por lo tanto, a menos que sepa lo que está haciendo, ¡evite volver a particionar su RDD en una sola partición!

Particiones Intro

¿Cómo se divide un RDD?

De forma predeterminada, se crea una partición para cada partición HDFS, que de forma predeterminada es de 64 MB. Lea más aquí .

¿Cómo equilibrar mis datos a través de particiones?

Primero, eche un vistazo a las tres formas en que uno puede repartir sus datos:

  1. Pase un segundo parámetro, el número mínimo deseado de particiones para su RDD, en textFile () , pero tenga cuidado:

    En [14]: líneas = sc.textFile ("datos")

    En [15]: lines.getNumPartitions () Out [15]: 1000

    En [16]: líneas = sc.textFile ("datos", 500)

    En [17]: lines.getNumPartitions () Out [17]: 1434

    En [18]: líneas = sc.textFile ("datos", 5000)

    En [19]: lines.getNumPartitions () Out [19]: 5926

Como puede ver, [16] no hace lo que uno esperaría, ya que la cantidad de particiones que tiene el RDD ya es mayor que la cantidad mínima de particiones que solicitamos.

  1. Use repartition () , así:

    En [22]: líneas = líneas. Reparto (10)

    En [23]: lines.getNumPartitions () Out [23]: 10

Advertencia: Esto invocará un orden aleatorio y debe usarse cuando desee aumentar el número de particiones que tiene su RDD.

De la documentación :

El orden aleatorio es el mecanismo de Spark para redistribuir los datos de modo que se agrupen de manera diferente entre las particiones. Por lo general, esto implica copiar datos a través de ejecutores y máquinas, lo que hace que el orden aleatorio sea una operación compleja y costosa.

  1. Use coalesce () , así:

    En [25]: líneas = lines.coalesce (2)

    En [26]: lines.getNumPartitions () Out [26]: 2

Aquí, Spark sabe que va a reducir el RDD y se aprovecha de él. Lea más acerca de repartition () vs coalesce () .


¿Pero todo esto garantizará que sus datos estarán perfectamente equilibrados en sus particiones? Realmente no, como lo experimenté en ¿Cómo equilibrar mis datos en las particiones?

Particiones de un RDD

Como se mencionó en "Comentarios", una partición es una parte / sector / segmento de un RDD. A continuación se muestra un ejemplo mínimo sobre cómo solicitar un número mínimo de particiones para su RDD:

In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

In [2]: mylistRDD.getNumPartitions()
Out[2]: 2

Observe en [1] cómo pasamos 2 como segundo parámetro de parallelize() . Ese parámetro dice que queremos que nuestro RDD tenga al menos 2 particiones.

Repartir una RDD

A veces queremos repartir una RDD, por ejemplo, porque proviene de un archivo que no creamos nosotros, y la cantidad de particiones definidas por el creador no es la que queremos.

Las dos funciones más conocidas para lograrlo son:

repartition(numPartitions)

y:

coalesce(numPartitions, shuffle=False)

Como regla general, use el primero cuando desee volver a particionar su RDD en un mayor número de particiones y el segundo para reducir su RDD, en un número menor de particiones. Spark - repartition () vs coalesce () .

Por ejemplo:

data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions

disminuirá el número de particiones del RDD llamado 'datos' a 100, dado que este RDD tiene más de 100 particiones cuando fue leído por textFile() .

Y de manera similar, si desea tener más del número actual de particiones para su RDD, podría hacerlo (dado que su RDD se distribuye en 200 particiones, por ejemplo):

data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions

Regla de oro sobre el número de particiones

Como regla general, uno querría que su RDD tenga tantas particiones como el producto del número de ejecutores por el número de núcleos utilizados por 3 (o quizás 4). Por supuesto, eso es una heurística y realmente depende de su aplicación, conjunto de datos y configuración de clúster.

Ejemplo:

In [1]: data  = sc.textFile(file)

In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))

In [3]: data = data.coalesce(total_cores * 3)      

Mostrar contenidos RDD

Para mostrar el contenido de un RDD, se debe imprimir:

myRDD.foreach(println)

Para limitar el número de filas impresas:

myRDD.take(num_of_rows).foreach(println)


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow