apache-spark
Configurazione: Apache Spark SQL
Ricerca…
introduzione
Controllo delle partizioni Shuffle di Spark SQL
In Apache Spark mentre cogroup
operazioni di shuffle come join
e cogroup
molti dati vengono trasferiti attraverso la rete. Ora, per controllare il numero di partizioni su cui avviene lo shuffle può essere controllato dalle configurazioni fornite in Spark SQL. Quella configurazione è la seguente:
spark.sql.shuffle.partitions
Usando questa configurazione possiamo controllare il numero di partizioni delle operazioni shuffle. Per impostazione predefinita, il suo valore è 200
. Ma, 200 partizioni non ha senso se abbiamo file di pochi GB (s). Quindi, dovremmo cambiarli in base alla quantità di dati che dobbiamo elaborare tramite Spark SQL. Mi piace come segue:
In questo scenario abbiamo due tabelle da unire employee
e department
. Entrambe le tabelle contengono solo pochi record, ma è necessario unirle per conoscere il dipartimento di ciascun dipendente. Quindi, ci uniamo a loro usando Spark DataFrames come questo:
val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)
val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")
employeeDF.join(departmentDF, "employeeName").show()
Ora, il numero di partizioni che vengono create durante il join sono 200 per impostazione predefinita, il che è ovviamente troppo per questa grande quantità di dati.
Quindi, lascia cambiare questo valore in modo da ridurre il numero di operazioni shuffle.
val conf = new SparkConf().setAppName("sample").setMaster("local").set("spark.sql.shuffle.partitions", 2)
val sc = new SparkContext(conf)
val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")
employeeDF.join(departmentDF, "employeeName").show()
Ora, il numero di partizioni shuffle è ridotto a solo 2, il che non solo riduce il numero di operazioni di shuffling ma riduce anche il tempo necessario per unire i DataFrames da 0.878505 s
a 0.077847 s
.
Pertanto, configurare sempre il numero di partizioni per le operazioni shuffle in base ai dati in elaborazione.