Ricerca…


Osservazioni

Il numero di partizioni è fondamentale per le prestazioni di un'applicazione e / o per la terminazione corretta.

Un RDD (Resilient Distributed Dataset) è l'astrazione principale di Spark. Un RDD è suddiviso in partizioni, il che significa che una partizione è una parte del set di dati, una porzione di esso o, in altre parole, una parte di esso.

Maggiore è il numero di partizioni, minore è la dimensione di ciascuna partizione.

Tuttavia, si noti che un gran numero di partizioni esercita molta pressione su Hadoop Distributed File System (HDFS), che deve mantenere una quantità significativa di metadati.

Il numero di partizioni è correlato all'utilizzo della memoria e un problema di MemoryOverhead può essere correlato a questo numero ( esperienza personale ).


Una trappola comune per i nuovi utenti è quella di trasformare il loro RDD in un RDD con una sola partizione, che di solito assomiglia a questo:

data = sc.textFile(file)
data = data.coalesce(1) 

Di solito è una pessima idea, dato che stai dicendo a Spark di mettere tutti i dati in una sola partizione! Ricordatelo:

Uno stage in Spark funzionerà su una partizione alla volta (e caricherà i dati in quella partizione in memoria).

Di conseguenza, comunichi a Spark di gestire tutti i dati contemporaneamente, il che di solito comporta errori relativi alla memoria (ad esempio, Memoria esaurita) o un'eccezione di puntatore nullo.

Quindi, se non sai cosa stai facendo, evita di ripartizionare il tuo RDD in una sola partizione!

Introduzione alle partizioni

Come viene diviso un RDD?

Per impostazione predefinita viene creata una partizione per ciascuna partizione HDFS, che per impostazione predefinita è 64 MB. Leggi di più qui .

Come bilanciare i miei dati attraverso le partizioni?

Innanzitutto, dai uno sguardo ai tre modi in cui è possibile ripartizionare i dati:

  1. Passa un secondo parametro, il numero minimo desiderato di partizioni per il tuo RDD, in textFile () , ma fai attenzione:

    In [14]: lines = sc.textFile ("data")

    In [15]: lines.getNumPartitions () Out [15]: 1000

    In [16]: lines = sc.textFile ("data", 500)

    In [17]: lines.getNumPartitions () Out [17]: 1434

    In [18]: lines = sc.textFile ("data", 5000)

    In [19]: lines.getNumPartitions () Out [19]: 5926

Come potete vedere, [16] non fa ciò che ci si aspetterebbe, poiché il numero di partizioni del RDD è già maggiore del numero minimo di partizioni richiesto.

  1. Usa ripartizione () , come questo:

    In [22]: lines = lines.repartition (10)

    In [23]: lines.getNumPartitions () Out [23]: 10

Attenzione: invocherà un shuffle e dovrebbe essere usato quando si desidera aumentare il numero di partizioni del proprio RDD.

Dai documenti :

Il shuffle è il meccanismo di Spark per ridistribuire i dati in modo che siano raggruppati in modo diverso tra le partizioni. Questo in genere implica la copia dei dati tra esecutori e macchine, rendendo la shuffle un'operazione complessa e costosa.

  1. Usa coalesce () , come questo:

    In [25]: lines = lines.coalesce (2)

    In [26]: lines.getNumPartitions () Out [26]: 2

Qui, Spark sa che ridurrete il RDD e ne trarrete vantaggio. Maggiori informazioni su ripartizione () vs coalesce () .


Ma tutto questo garantirà che i tuoi dati saranno perfettamente bilanciati tra le tue partizioni? Non proprio, come ho sperimentato in Come bilanciare i miei dati attraverso le partizioni?

Partizioni di un RDD

Come menzionato in "Osservazioni", una partizione è una parte / slice / chunk di un RDD. Di seguito è riportato un esempio minimo su come richiedere un numero minimo di partizioni per il tuo RDD:

In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

In [2]: mylistRDD.getNumPartitions()
Out[2]: 2

Nota in [1] come abbiamo passato 2 come secondo parametro di parallelize() . Questo parametro dice che vogliamo che il nostro RDD abbia almeno 2 partizioni.

Repartition an RDD

A volte vogliamo ripartizionare un RDD, ad esempio perché proviene da un file che non è stato creato da noi e il numero di partizioni definito dal creatore non è quello che vogliamo.

Le due funzioni più conosciute per ottenere questo sono:

repartition(numPartitions)

e:

coalesce(numPartitions, shuffle=False)

Come regola generale, utilizzare il primo quando si desidera ripartizionare il proprio RDD in un numero maggiore di partizioni e il secondo per ridurre il RDD, in un numero inferiore di partizioni. Spark - repartition () vs coalesce () .

Per esempio:

data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions

diminuirà il numero di partizioni del RDD chiamato 'data' su 100, dato che questo RDD ha più di 100 partizioni quando viene letto da textFile() .

E in un modo simile, se vuoi avere più del numero attuale di partizioni per il tuo RDD, puoi farlo (dato che il tuo RDD è distribuito in 200 partizioni per esempio):

data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions

Regola generale sul numero di partizioni

Come regola generale, si vorrebbe che il suo RDD avesse tante partizioni quante il prodotto del numero di esecutori per il numero di core utilizzati per 3 (o forse 4). Ovviamente, è un euristico e dipende in realtà dall'applicazione, dal set di dati e dalla configurazione del cluster.

Esempio:

In [1]: data  = sc.textFile(file)

In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))

In [3]: data = data.coalesce(total_cores * 3)      

Mostra contenuti RDD

Per mostrare il contenuto di un RDD, deve essere stampato:

myRDD.foreach(println)

Per limitare il numero di righe stampate:

myRDD.take(num_of_rows).foreach(println)


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow