apache-spark
Funkcje okien w Spark SQL
Szukaj…
Wprowadzenie
Funkcje okna służą do wykonywania operacji (zwykle agregacji) na zestawie wierszy zwanych zbiorczo jako okno. Funkcje okien działają w Spark 1.4 lub nowszym. Funkcje okna zapewniają więcej operacji niż funkcje wbudowane lub UDF, takie jak substr lub round (szeroko stosowane przed Spark 1.4). Funkcje okien pozwalają użytkownikom Spark SQL na obliczanie wyników, takich jak ranga danego wiersza lub średnia ruchoma w zakresie wierszy wejściowych. Znacząco poprawiają one wyrazistość interfejsów API Spark i SQL DataFrame.
U podstaw funkcja okna oblicza wartość zwracaną dla każdego wiersza wejściowego tabeli na podstawie grupy wierszy, zwanej ramką. Każdy wiersz wejściowy może mieć przypisaną unikalną ramkę. Ta cecha funkcji okna sprawia, że są one bardziej wydajne niż inne funkcje. Rodzaje funkcji okna są
- Funkcje rankingowe
- Funkcje analityczne
- Funkcje agregujące
Aby korzystać z funkcji okna, użytkownicy muszą zaznaczyć, że funkcja jest używana przez funkcję okna
- Dodanie klauzuli
OVER
po obsługiwanej funkcji w SQL, np.avg(revenue) OVER (...);
lub - Wywołanie metody over dla obsługiwanej funkcji w DataFrame API, np.
rank().over(...)
.
Ta dokumentacja ma na celu wykazanie niektórych z tych funkcji na przykładzie. Zakłada się, że czytelnik ma pewną wiedzę na temat podstawowych operacji na Spark DataFrame, takich jak: dodawanie nowej kolumny, zmiana nazwy kolumny itp.
Odczytywanie przykładowego zestawu danych:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
Wymagana lista wyciągów importowych:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Pierwsza instrukcja importuje Window Specification
. Specyfikacja okna zawiera warunki / specyfikacje wskazujące, które wiersze mają zostać uwzględnione w oknie.
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
Średnia ruchoma
Aby obliczyć średnią ruchomą wynagrodzenia pracodawców na podstawie ich roli:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
tworzy nową kolumnę o nazwiemovingAverage
, wykonującaverage
dla kolumnySalary
-
over()
służy do definiowania specyfikacji okna. -
partitionBy()
dzieli dane na kolumnęRole
-
rowsBetween(start, end)
Ta funkcja definiuje wiersze, które mają być zawarte w oknie. Parametry (start
iend
) przyjmują dane liczbowe,0
oznacza bieżący wiersz,-1
to poprzedni wiersz,1
to następny wiersz i tak dalej. Funkcja obejmuje wszystkie wiersze międzystart
aend
, dlatego w tym przykładzie w oknie znajdują się trzy wiersze (-1,0,1).
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
Spark automatycznie ignoruje poprzednie i następne wiersze, jeśli bieżący wiersz jest odpowiednio pierwszym i ostatnim wierszem.
W powyższym przykładzie wartość ruchomaŚrednia pierwszego wiersza jest średnią tylko bieżącego i następnego wiersza, ponieważ poprzedni wiersz nie istnieje. Podobnie ostatni rząd partycji (tj. 6. rząd) jest średnią bieżącego i poprzedniego rzędu, ponieważ następny rząd nie istnieje.
Suma skumulowana
Aby obliczyć średnią ruchomą wynagrodzenia pracodawców na podstawie ich roli:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
sortuje kolumnę wynagrodzenia i oblicza sumę skumulowaną.
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
Funkcje okna - Sort, Lead, Lag, Rank, Analiza trendów
W tym temacie pokazano, jak używać funkcji takich jak withColumn, lead, lag, Level itp. Przy użyciu Spark. Spark dataframe to sqlowa warstwa abstrakcyjna na temat funkcjonalności rdzenia iskrowego. To pozwala użytkownikowi pisać SQL na rozproszonych danych. Spark SQL obsługuje hetrogeniczne formaty plików, w tym JSON, XML, CSV, TSV itp.
W tym blogu mamy szybki przegląd tego, jak używać Spark SQL i ramek danych do typowych przypadków użycia w świecie SQL. Dla uproszczenia zajmiemy się pojedynczym plikiem w formacie CSV. Plik ma cztery pola, ID pracownika, imię i nazwisko pracownika, wynagrodzenie, data wynagrodzenia
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
Zapisz ten plik jako emp.dat. W pierwszym kroku utworzymy ramkę danych Spark za pomocą pakietu Spark CSV z paczek danych.
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDF to ramka danych używana w pozostałych ćwiczeniach. Ponieważ myDF jest używany wielokrotnie, zaleca się jego utrzymanie, aby nie trzeba go było ponownie oceniać.
myDF.persist()
Dane wyjściowe pokazu ramki danych
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
Dodaj nową kolumnę do ramki danych
Ponieważ ramki danych Spark są niezmienne, dodanie nowej kolumny spowoduje utworzenie nowej ramki danych z dodaną kolumną. Aby dodać kolumnę, użyj withColumn (columnName, Transformation). W poniższym przykładzie kolumna empName jest sformatowana wielkimi literami.
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
Sortuj dane według kolumny
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
Sortuj malejąco
desc („Wynagrodzenie”)
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
Pobierz i użyj poprzedniego wiersza (opóźnienie)
LAG to funkcja w języku SQL, która służy do uzyskiwania dostępu do poprzednich wartości wiersza w bieżącym wierszu. Jest to przydatne, gdy mamy przypadki użycia, takie jak porównanie z poprzednią wartością. LAG w ramkach danych Spark jest dostępny w funkcjach Windows
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
Zdobądź i użyj następnego rzędu (ołów)
LEAD to funkcja SQL, która służy do uzyskiwania dostępu do wartości następnego wiersza w bieżącym wierszu. Jest to przydatne, gdy mamy przypadki użycia, takie jak porównanie z następną wartością. LEAD w ramkach danych Spark jest dostępny w funkcjach Windows
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
Analiza trendów z funkcjami okna Teraz zastosujmy funkcję okna LAG do użycia z prostą analizą trendu. Jeśli wynagrodzenie jest niższe niż w poprzednim miesiącu, oznaczymy je jako „W DÓŁ”, jeśli wynagrodzenie wzrosło, wówczas „W GÓRĘ”. Kod korzysta z funkcji Window, aby uporządkować, opóźnić, a następnie wykonać proste, jeśli inaczej z KIEDY.
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+