Suche…


Einführung

Fensterfunktionen werden verwendet, um Operationen (im Allgemeinen Aggregation) an einem Satz von Zeilen auszuführen, die gemeinsam als Fenster bezeichnet werden. Fensterfunktionen funktionieren in Spark 1.4 oder höher. Die Fensterfunktionen bieten mehr Funktionen als die integrierten Funktionen oder UDFs, z. B. substr oder round (vor Spark 1.4 ausgiebig verwendet). Mit Fensterfunktionen können Benutzer von Spark SQL Ergebnisse berechnen, z. B. den Rang einer bestimmten Zeile oder einen gleitenden Durchschnitt über einen Bereich von Eingabezeilen. Sie verbessern die Ausdruckskraft der SQL- und DataFrame-APIs von Spark erheblich.

Im Kern berechnet eine Fensterfunktion einen Rückgabewert für jede Eingabezeile einer Tabelle auf der Grundlage einer Gruppe von Zeilen, die als Frame bezeichnet wird. Jeder Eingabezeile kann ein eindeutiger Rahmen zugeordnet sein. Diese Eigenschaft der Fensterfunktionen macht sie leistungsfähiger als andere Funktionen. Die Arten von Fensterfunktionen sind

  • Ranking-Funktionen
  • Analytische Funktionen
  • Aggregatfunktionen

Um Fensterfunktionen verwenden zu können, müssen Benutzer markieren, dass eine Funktion als Fensterfunktion verwendet wird

  • Hinzufügen einer OVER Klausel nach einer unterstützten Funktion in SQL, z. B. avg(revenue) OVER (...); oder
  • Aufrufen der over-Methode für eine unterstützte Funktion in der DataFrame-API, z rank().over(...) B. rank().over(...) .

Diese Dokumentation soll einige dieser Funktionen anhand von Beispielen veranschaulichen. Es wird davon ausgegangen, dass der Leser einige Kenntnisse über grundlegende Vorgänge in Spark DataFrame besitzt, z. B. das Hinzufügen einer neuen Spalte, das Umbenennen einer Spalte usw.

Lesen eines Beispieldatensatzes:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Liste der erforderlichen Importanweisungen:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Die erste Anweisung importiert die Window Specification . Eine Fensterspezifikation enthält Bedingungen / Angaben, die angeben, welche Zeilen in das Fenster aufgenommen werden sollen.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Gleitender Durchschnitt

Um den gleitenden Durchschnitt des Gehalts der Arbeitgeber basierend auf ihrer Rolle zu berechnen:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() erstellt eine neue Spalte mit dem Namen movingAverage , die den average Salary
  • over() dient zur Definition der Fensterspezifikation.
  • partitionBy() partitioniert die Daten über die Spalte Role
  • rowsBetween(start, end) Diese Funktion definiert die Zeilen, die in das Fenster aufgenommen werden sollen. Die Parameter ( start und end ) nehmen numerische Eingaben an, 0 für die aktuelle Zeile, -1 für die vorherige Zeile, 1 für die nächste Zeile und so weiter. Die Funktion enthält alle Zeilen zwischen start und end . In diesem Beispiel sind also drei Zeilen (-1,0,1) im Fenster enthalten.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark ignoriert vorherige und nächste Zeilen automatisch, wenn die aktuelle Zeile die erste bzw. letzte Zeile ist.

In dem obigen Beispiel ist movingAverage der ersten Zeile nur der Durchschnitt der aktuellen und der nächsten Zeile, da die vorherige Zeile nicht vorhanden ist. Entsprechend ist die letzte Zeile der Partition (dh die sechste Zeile) der Durchschnitt der aktuellen und der vorherigen Zeile, da die nächste Zeile nicht vorhanden ist.

Kumulierte Summe

Um den gleitenden Durchschnitt des Gehalts der Arbeitgeber basierend auf ihrer Rolle zu berechnen:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() sortiert die Gehaltsspalte und berechnet die kumulierte Summe.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Fensterfunktionen - Sortieren, Lead, Lag, Rang, Trendanalyse

In diesem Thema wird veranschaulicht, wie Sie Funktionen wie withColumn, Lead, Lag, Level usw. mit Spark verwenden. Spark-Datenrahmen ist eine abstrakte SQL-Ebene zu Funkenkernfunktionen. Dies ermöglicht dem Benutzer das Schreiben von SQL für verteilte Daten. Spark SQL unterstützt heterogene Dateiformate wie JSON, XML, CSV, TSV usw.

In diesem Blog erhalten Sie einen schnellen Überblick über die Verwendung von Spark SQL und Datenrahmen für allgemeine Anwendungsfälle in der SQL-Welt. Der Einfachheit halber behandeln wir eine einzige Datei im CSV-Format. Die Datei hat vier Felder: EmployeeID, EmployeeName, Gehalt, Gehaltsdatum

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Speichern Sie diese Datei unter dem Namen emp.dat. Im ersten Schritt erstellen wir ein Funken-Datenframe mit einem Spark-CSV-Paket aus Datenabricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF ist der Datenrahmen, der in der verbleibenden Übung verwendet wird. Da myDF wiederholt verwendet wird, wird empfohlen, es beizubehalten, damit es nicht erneut bewertet werden muss.

 myDF.persist()

Ausgabe des Datenrahmens anzeigen

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Fügen Sie dem Datenrahmen eine neue Spalte hinzu

Da Spark-Datenframes unveränderlich sind, wird durch das Hinzufügen einer neuen Spalte ein neues Datenframe mit hinzugefügter Spalte erstellt. Um eine Spalte hinzuzufügen, verwenden Sie withColumn (columnName, Transformation). In der folgenden Beispielspalte wird empName in Großbuchstaben formatiert.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Daten basierend auf einer Spalte sortieren

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Absteigend sortieren

desc ("Gehalt")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Vorherige Zeile abrufen und verwenden (Lag)

LAG ist eine Funktion in SQL, mit der auf vorherige Zeilenwerte in der aktuellen Zeile zugegriffen wird. Dies ist nützlich, wenn wir Anwendungsfälle wie den Vergleich mit dem vorherigen Wert haben. LAG in Spark-Datenrahmen ist in den Fensterfunktionen verfügbar

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Nächste Reihe holen und benutzen (Lead)

LEAD ist eine Funktion in SQL, mit der auf die nächsten Zeilenwerte in der aktuellen Zeile zugegriffen wird. Dies ist nützlich, wenn wir Beispiele wie einen Vergleich mit dem nächsten Wert haben. LEAD in Spark-Datenrahmen ist in den Fensterfunktionen verfügbar

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Trendanalyse mit Fensterfunktionen Lassen Sie uns nun die Fensterfunktion LAG für eine einfache Trendanalyse verwenden. Wenn das Gehalt unter dem Vormonat liegt, markieren wir es als "DOWN", wenn das Gehalt gestiegen ist, dann "UP". Der Code verwendet die Window-Funktion, um zu ordnen, zu verschieben und dann mit WHEN ein einfaches If else auszuführen.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow