apache-spark
Funzioni della finestra in Spark SQL
Ricerca…
introduzione
Le funzioni della finestra sono utilizzate per eseguire operazioni (generalmente di aggregazione) su un insieme di righe chiamate collettivamente come finestre. Le funzioni della finestra funzionano in Spark 1.4 o versioni successive. Le funzioni della finestra forniscono più operazioni quindi le funzioni integrate o le UDF, come substr o round (ampiamente utilizzate prima di Spark 1.4). Le funzioni della finestra consentono agli utenti di Spark SQL di calcolare risultati come il rango di una data riga o una media mobile su un intervallo di righe di input. Migliorano significativamente l'espressività delle API SQL e DataFrame di Spark.
Al suo interno, una funzione finestra calcola un valore di ritorno per ogni riga di input di una tabella in base a un gruppo di righe, chiamato Frame. Ad ogni riga di input può essere associata una cornice univoca. Questa caratteristica delle funzioni di finestra li rende più potenti di altre funzioni. I tipi di funzioni della finestra sono
- Funzioni di classificazione
- Funzioni analitiche
- Funzioni aggregate
Per utilizzare le funzioni della finestra, gli utenti devono contrassegnare che una funzione viene utilizzata come una funzione finestra da entrambi
- Aggiunta di una clausola
OVER
dopo una funzione supportata in SQL, ad es.avg(revenue) OVER (...);
o - Chiamare il metodo over su una funzione supportata nell'API DataFrame, ad esempio
rank().over(...)
.
Questa documentazione mira a dimostrare alcune di queste funzioni con l'esempio. Si presume che il lettore abbia una certa conoscenza delle operazioni di base su Spark DataFrame come: aggiunta di una nuova colonna, ridenominazione di una colonna, ecc.
Lettura di un set di dati di esempio:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
Elenco delle dichiarazioni di importazione richieste:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
La prima istruzione importa le Window Specification
. Una specifica della finestra contiene condizioni / specifiche che indicano quali righe devono essere incluse nella finestra.
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
Media mobile
Per calcolare la media mobile dello stipendio dei datori di lavoro in base al loro ruolo:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
crea una nuova colonna denominatamovingAverage
, eseguendo laaverage
sulla colonnaSalary
-
over()
è usato per definire le specifiche della finestra. -
partitionBy()
partiziona i dati sulla colonnaRole
-
rowsBetween(start, end)
Questa funzione definisce le righe che devono essere incluse nella finestra. I parametri (start
eend
) prendono input numerici,0
rappresenta la riga corrente,-1
è la riga precedente,1
è la riga successiva e così via. La funzione include tutte le righe trastart
eend
, quindi in questo esempio sono incluse tre righe (-1,0,1) nella finestra.
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
Spark ignora automaticamente le righe precedenti e successive, se la riga corrente è la prima e l'ultima riga rispettivamente.
Nell'esempio precedente, movingAverage della prima riga è la media della riga corrente e successiva, poiché la riga precedente non esiste. Allo stesso modo l'ultima riga della partizione (cioè la sesta riga) è la media della riga corrente e precedente, poiché la riga successiva non esiste.
Somma cumulativa
Per calcolare la media mobile dello stipendio dei datori di lavoro in base al loro ruolo:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
ordina la colonna salariale e calcola la somma cumulativa.
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
Funzioni della finestra: ordinamento, derivazione, ritardo, posizione, analisi tendenza
Questo argomento dimostra come usare funzioni come conColumn, lead, lag, Level etc usando Spark. Spark dataframe è un livello astratto di sql sulle funzionalità di spark core. Ciò consente all'utente di scrivere SQL su dati distribuiti. Spark SQL supporta i formati di file eterogenei inclusi JSON, XML, CSV, TSV ecc.
In questo blog abbiamo una rapida panoramica su come usare spark SQL e dataframes per casi di uso comune nel mondo SQL. Per semplicità ci occuperemo di un singolo file che è in formato CSV. Il file ha quattro campi, ID dipendente, NomeDipendente, Stipendio, Stipendio
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
Salva questo file come emp.dat. Nel primo passaggio creeremo una scintilla dataframe utilizzando, scintilla pacchetto CSV da databricks.
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDF è il dataframe utilizzato nell'esercizio restante. Poiché myDF è usato ripetutamente, si raccomanda di mantenerlo in modo che non debba essere rivalutato.
myDF.persist()
Uscita dello spettacolo di dataframe
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
Aggiungi una nuova colonna al dataframe
Poiché i datafram spark sono immutabili, l'aggiunta di una nuova colonna creerà un nuovo dataframe con colonna aggiunta. Per aggiungere una colonna utilizzare withColumn (columnName, Transformation). Nella colonna di esempio sottostante empName è formattato in maiuscolo.
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
Ordina i dati in base a una colonna
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
Ordine decrescente
disc ( "Stipendio")
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
Ottieni e usa la riga precedente (Lag)
Il LAG è una funzione in SQL che viene utilizzata per accedere ai valori di riga precedenti nella riga corrente. Questo è utile quando abbiamo casi d'uso come il confronto con il valore precedente. Il LAG in Spark dataframes è disponibile nelle funzioni Window
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
Ottieni e usa la riga successiva (Lead)
LEAD è una funzione in SQL che viene utilizzata per accedere ai valori della riga successiva nella riga corrente. Questo è utile quando disponiamo di casi come il confronto con il valore successivo. LEAD in Spark dataframes è disponibile nelle funzioni Window
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
Analisi delle tendenze con funzioni di finestra Ora, poniamo la funzione di finestra LAG da utilizzare con una semplice analisi delle tendenze. Se lo stipendio è inferiore al mese precedente, lo contrassegneremo come "GIÙ", se lo stipendio è aumentato, quindi "SU". Il codice usa la funzione Window per ordinare by, lag e poi fa un semplice if else con WHEN.
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+