Recherche…


introduction

Les fonctions de fenêtre sont utilisées pour effectuer des opérations (généralement des agrégations) sur un ensemble de lignes appelées collectivement comme fenêtre. Les fonctions de fenêtre fonctionnent dans Spark 1.4 ou version ultérieure. Les fonctions de fenêtre fournissent plus d'opérations que les fonctions intégrées ou UDF, telles que substr ou round (largement utilisées avant Spark 1.4). Les fonctions de fenêtre permettent aux utilisateurs de Spark SQL de calculer des résultats tels que le rang d'une ligne donnée ou une moyenne mobile sur une plage de lignes en entrée. Ils améliorent considérablement l'expressivité des API SQL et DataFrame de Spark.

À la base, une fonction de fenêtre calcule une valeur de retour pour chaque ligne d'entrée d'un tableau en fonction d'un groupe de lignes, appelé Frame. Chaque ligne d'entrée peut être associée à une image unique. Cette caractéristique des fonctions de fenêtre les rend plus puissantes que d’autres fonctions. Les types de fonctions de fenêtre sont

  • Fonctions de classement
  • Fonctions analytiques
  • Fonctions d'agrégat

Pour utiliser les fonctions de fenêtre, les utilisateurs doivent indiquer qu'une fonction est utilisée comme fonction de fenêtre

  • Ajouter une clause OVER après une fonction prise en charge dans SQL, par exemple avg(revenue) OVER (...); ou
  • Appel de la méthode over sur une fonction prise en charge dans l'API DataFrame, par exemple rank().over(...) Over rank().over(...) .

Cette documentation vise à démontrer certaines de ces fonctions avec un exemple. On suppose que le lecteur a quelques connaissances sur les opérations de base sur Spark DataFrame, telles que: ajouter une nouvelle colonne, renommer une colonne, etc.

Lecture d'un échantillon de données:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Liste des instructions d'importation requises:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

La première instruction importe la Window Specification . Une spécification de fenêtre contient des conditions / spécifications indiquant quelles lignes doivent être incluses dans la fenêtre.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Moyenne mobile

Pour calculer la moyenne mobile du salaire des employeurs en fonction de leur rôle:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() crée une nouvelle colonne nommée movingAverage , effectuant une average sur la colonne Salary
  • over() est utilisé pour définir la spécification de la fenêtre.
  • partitionBy() partitionne les données sur la colonne Role
  • rowsBetween(start, end) Cette fonction définit les lignes à inclure dans la fenêtre. Les paramètres ( start et end ) prennent des entrées numériques, 0 représente la ligne actuelle, -1 la ligne précédente, 1 la ligne suivante, etc. La fonction inclut toutes les lignes entre start et end . Ainsi, dans cet exemple, trois lignes (-1,0,1) sont incluses dans la fenêtre.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark ignore automatiquement les lignes précédentes et suivantes, si la ligne en cours est respectivement la première et la dernière ligne.

Dans l'exemple ci-dessus, movingAverage de la première ligne est la moyenne de la ligne actuelle et de la ligne suivante uniquement, car la ligne précédente n'existe pas. De même, la dernière ligne de la partition (c'est-à-dire la 6ème ligne) est la moyenne de la ligne actuelle et précédente, car la ligne suivante n'existe pas.

Somme cumulée

Pour calculer la moyenne mobile du salaire des employeurs en fonction de leur rôle:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() trie la colonne de salaire et calcule la somme cumulée.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Fonctions de fenêtre - Tri, Lead, Lag, Rank, Analyse des tendances

Cette rubrique montre comment utiliser des fonctions telles que withColumn, lead, lag, Level, etc. avec Spark. Spark dataframe est une couche abstraite SQL sur les fonctionnalités de base. Cela permet à l'utilisateur d'écrire du code SQL sur des données distribuées. Spark SQL prend en charge les formats de fichiers hétérogènes, notamment JSON, XML, CSV, TSV, etc.

Dans ce blog, nous avons un aperçu rapide de la façon d'utiliser SQL et des dataframes d'étincelles pour les cas d'usage courants dans SQL world. Le fichier a quatre champs, employeeID, employeeName, salaire, salaireDate

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Enregistrez ce fichier sous le nom emp.dat. Dans un premier temps, nous allons créer une image de référence à l'aide du paquetage CSV, spark, à partir des databricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF est le dataframe utilisé dans les exercices restants. Comme myDF est utilisé de manière répétée, il est recommandé de le persister pour qu'il ne soit pas nécessaire de le réévaluer.

 myDF.persist()

Sortie de dataframe show

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Ajouter une nouvelle colonne à dataframe

Comme les diagrammes d'étincelles sont immuables, l'ajout d'une nouvelle colonne créera un nouveau cadre de données avec une colonne ajoutée. Pour ajouter une colonne, utilisez withColumn (columnName, Transformation). Dans l'exemple ci-dessous, la colonne empName est formatée en majuscule.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Trier les données en fonction d'une colonne

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Trier par ordre décroissant

desc ("Salary")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Obtenir et utiliser la ligne précédente (Lag)

LAG est une fonction en SQL qui permet d'accéder aux valeurs de ligne précédentes dans la ligne en cours. Ceci est utile lorsque nous avons des cas d'utilisation comme la comparaison avec la valeur précédente. LAG in data spark est disponible dans les fonctions Window

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Obtenir et utiliser la ligne suivante (Lead)

LEAD est une fonction en SQL qui permet d'accéder aux valeurs de ligne suivantes dans la ligne en cours. Ceci est utile lorsque nous avons des casinos comme la comparaison avec la valeur suivante. LEAD in Spark dataframes est disponible dans les fonctions Window

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Analyse de tendance avec fonctions de fenêtre Maintenant, mettons la fonction de fenêtre LAG à utiliser avec une analyse de tendance simple. Si le salaire est inférieur au mois précédent, nous le marquerons comme «DOWN», si le salaire a augmenté, puis «UP». Le code utilise la fonction Window pour ordonner by, puis faire simple avec WHEN.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow