Ricerca…


introduzione

Le funzioni della finestra sono utilizzate per eseguire operazioni (generalmente di aggregazione) su un insieme di righe chiamate collettivamente come finestre. Le funzioni della finestra funzionano in Spark 1.4 o versioni successive. Le funzioni della finestra forniscono più operazioni quindi le funzioni integrate o le UDF, come substr o round (ampiamente utilizzate prima di Spark 1.4). Le funzioni della finestra consentono agli utenti di Spark SQL di calcolare risultati come il rango di una data riga o una media mobile su un intervallo di righe di input. Migliorano significativamente l'espressività delle API SQL e DataFrame di Spark.

Al suo interno, una funzione finestra calcola un valore di ritorno per ogni riga di input di una tabella in base a un gruppo di righe, chiamato Frame. Ad ogni riga di input può essere associata una cornice univoca. Questa caratteristica delle funzioni di finestra li rende più potenti di altre funzioni. I tipi di funzioni della finestra sono

  • Funzioni di classificazione
  • Funzioni analitiche
  • Funzioni aggregate

Per utilizzare le funzioni della finestra, gli utenti devono contrassegnare che una funzione viene utilizzata come una funzione finestra da entrambi

  • Aggiunta di una clausola OVER dopo una funzione supportata in SQL, ad es. avg(revenue) OVER (...); o
  • Chiamare il metodo over su una funzione supportata nell'API DataFrame, ad esempio rank().over(...) .

Questa documentazione mira a dimostrare alcune di queste funzioni con l'esempio. Si presume che il lettore abbia una certa conoscenza delle operazioni di base su Spark DataFrame come: aggiunta di una nuova colonna, ridenominazione di una colonna, ecc.

Lettura di un set di dati di esempio:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Elenco delle dichiarazioni di importazione richieste:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

La prima istruzione importa le Window Specification . Una specifica della finestra contiene condizioni / specifiche che indicano quali righe devono essere incluse nella finestra.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Media mobile

Per calcolare la media mobile dello stipendio dei datori di lavoro in base al loro ruolo:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() crea una nuova colonna denominata movingAverage , eseguendo la average sulla colonna Salary
  • over() è usato per definire le specifiche della finestra.
  • partitionBy() partiziona i dati sulla colonna Role
  • rowsBetween(start, end) Questa funzione definisce le righe che devono essere incluse nella finestra. I parametri ( start e end ) prendono input numerici, 0 rappresenta la riga corrente, -1 è la riga precedente, 1 è la riga successiva e così via. La funzione include tutte le righe tra start e end , quindi in questo esempio sono incluse tre righe (-1,0,1) nella finestra.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark ignora automaticamente le righe precedenti e successive, se la riga corrente è la prima e l'ultima riga rispettivamente.

Nell'esempio precedente, movingAverage della prima riga è la media della riga corrente e successiva, poiché la riga precedente non esiste. Allo stesso modo l'ultima riga della partizione (cioè la sesta riga) è la media della riga corrente e precedente, poiché la riga successiva non esiste.

Somma cumulativa

Per calcolare la media mobile dello stipendio dei datori di lavoro in base al loro ruolo:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() ordina la colonna salariale e calcola la somma cumulativa.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Funzioni della finestra: ordinamento, derivazione, ritardo, posizione, analisi tendenza

Questo argomento dimostra come usare funzioni come conColumn, lead, lag, Level etc usando Spark. Spark dataframe è un livello astratto di sql sulle funzionalità di spark core. Ciò consente all'utente di scrivere SQL su dati distribuiti. Spark SQL supporta i formati di file eterogenei inclusi JSON, XML, CSV, TSV ecc.

In questo blog abbiamo una rapida panoramica su come usare spark SQL e dataframes per casi di uso comune nel mondo SQL. Per semplicità ci occuperemo di un singolo file che è in formato CSV. Il file ha quattro campi, ID dipendente, NomeDipendente, Stipendio, Stipendio

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Salva questo file come emp.dat. Nel primo passaggio creeremo una scintilla dataframe utilizzando, scintilla pacchetto CSV da databricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF è il dataframe utilizzato nell'esercizio restante. Poiché myDF è usato ripetutamente, si raccomanda di mantenerlo in modo che non debba essere rivalutato.

 myDF.persist()

Uscita dello spettacolo di dataframe

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Aggiungi una nuova colonna al dataframe

Poiché i datafram spark sono immutabili, l'aggiunta di una nuova colonna creerà un nuovo dataframe con colonna aggiunta. Per aggiungere una colonna utilizzare withColumn (columnName, Transformation). Nella colonna di esempio sottostante empName è formattato in maiuscolo.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Ordina i dati in base a una colonna

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Ordine decrescente

disc ( "Stipendio")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Ottieni e usa la riga precedente (Lag)

Il LAG è una funzione in SQL che viene utilizzata per accedere ai valori di riga precedenti nella riga corrente. Questo è utile quando abbiamo casi d'uso come il confronto con il valore precedente. Il LAG in Spark dataframes è disponibile nelle funzioni Window

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Ottieni e usa la riga successiva (Lead)

LEAD è una funzione in SQL che viene utilizzata per accedere ai valori della riga successiva nella riga corrente. Questo è utile quando disponiamo di casi come il confronto con il valore successivo. LEAD in Spark dataframes è disponibile nelle funzioni Window

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Analisi delle tendenze con funzioni di finestra Ora, poniamo la funzione di finestra LAG da utilizzare con una semplice analisi delle tendenze. Se lo stipendio è inferiore al mese precedente, lo contrassegneremo come "GIÙ", se lo stipendio è aumentato, quindi "SU". Il codice usa la funzione Window per ordinare by, lag e poi fa un semplice if else con WHEN.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow