Szukaj…


Wprowadzenie

Funkcje okna służą do wykonywania operacji (zwykle agregacji) na zestawie wierszy zwanych zbiorczo jako okno. Funkcje okien działają w Spark 1.4 lub nowszym. Funkcje okna zapewniają więcej operacji niż funkcje wbudowane lub UDF, takie jak substr lub round (szeroko stosowane przed Spark 1.4). Funkcje okien pozwalają użytkownikom Spark SQL na obliczanie wyników, takich jak ranga danego wiersza lub średnia ruchoma w zakresie wierszy wejściowych. Znacząco poprawiają one wyrazistość interfejsów API Spark i SQL DataFrame.

U podstaw funkcja okna oblicza wartość zwracaną dla każdego wiersza wejściowego tabeli na podstawie grupy wierszy, zwanej ramką. Każdy wiersz wejściowy może mieć przypisaną unikalną ramkę. Ta cecha funkcji okna sprawia, że są one bardziej wydajne niż inne funkcje. Rodzaje funkcji okna są

  • Funkcje rankingowe
  • Funkcje analityczne
  • Funkcje agregujące

Aby korzystać z funkcji okna, użytkownicy muszą zaznaczyć, że funkcja jest używana przez funkcję okna

  • Dodanie klauzuli OVER po obsługiwanej funkcji w SQL, np. avg(revenue) OVER (...); lub
  • Wywołanie metody over dla obsługiwanej funkcji w DataFrame API, np. rank().over(...) .

Ta dokumentacja ma na celu wykazanie niektórych z tych funkcji na przykładzie. Zakłada się, że czytelnik ma pewną wiedzę na temat podstawowych operacji na Spark DataFrame, takich jak: dodawanie nowej kolumny, zmiana nazwy kolumny itp.

Odczytywanie przykładowego zestawu danych:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Wymagana lista wyciągów importowych:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Pierwsza instrukcja importuje Window Specification . Specyfikacja okna zawiera warunki / specyfikacje wskazujące, które wiersze mają zostać uwzględnione w oknie.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Średnia ruchoma

Aby obliczyć średnią ruchomą wynagrodzenia pracodawców na podstawie ich roli:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() tworzy nową kolumnę o nazwie movingAverage , wykonując average dla kolumny Salary
  • over() służy do definiowania specyfikacji okna.
  • partitionBy() dzieli dane na kolumnę Role
  • rowsBetween(start, end) Ta funkcja definiuje wiersze, które mają być zawarte w oknie. Parametry ( start i end ) przyjmują dane liczbowe, 0 oznacza bieżący wiersz, -1 to poprzedni wiersz, 1 to następny wiersz i tak dalej. Funkcja obejmuje wszystkie wiersze między start a end , dlatego w tym przykładzie w oknie znajdują się trzy wiersze (-1,0,1).
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark automatycznie ignoruje poprzednie i następne wiersze, jeśli bieżący wiersz jest odpowiednio pierwszym i ostatnim wierszem.

W powyższym przykładzie wartość ruchomaŚrednia pierwszego wiersza jest średnią tylko bieżącego i następnego wiersza, ponieważ poprzedni wiersz nie istnieje. Podobnie ostatni rząd partycji (tj. 6. rząd) jest średnią bieżącego i poprzedniego rzędu, ponieważ następny rząd nie istnieje.

Suma skumulowana

Aby obliczyć średnią ruchomą wynagrodzenia pracodawców na podstawie ich roli:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() sortuje kolumnę wynagrodzenia i oblicza sumę skumulowaną.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Funkcje okna - Sort, Lead, Lag, Rank, Analiza trendów

W tym temacie pokazano, jak używać funkcji takich jak withColumn, lead, lag, Level itp. Przy użyciu Spark. Spark dataframe to sqlowa warstwa abstrakcyjna na temat funkcjonalności rdzenia iskrowego. To pozwala użytkownikowi pisać SQL na rozproszonych danych. Spark SQL obsługuje hetrogeniczne formaty plików, w tym JSON, XML, CSV, TSV itp.

W tym blogu mamy szybki przegląd tego, jak używać Spark SQL i ramek danych do typowych przypadków użycia w świecie SQL. Dla uproszczenia zajmiemy się pojedynczym plikiem w formacie CSV. Plik ma cztery pola, ID pracownika, imię i nazwisko pracownika, wynagrodzenie, data wynagrodzenia

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Zapisz ten plik jako emp.dat. W pierwszym kroku utworzymy ramkę danych Spark za pomocą pakietu Spark CSV z paczek danych.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF to ramka danych używana w pozostałych ćwiczeniach. Ponieważ myDF jest używany wielokrotnie, zaleca się jego utrzymanie, aby nie trzeba go było ponownie oceniać.

 myDF.persist()

Dane wyjściowe pokazu ramki danych

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Dodaj nową kolumnę do ramki danych

Ponieważ ramki danych Spark są niezmienne, dodanie nowej kolumny spowoduje utworzenie nowej ramki danych z dodaną kolumną. Aby dodać kolumnę, użyj withColumn (columnName, Transformation). W poniższym przykładzie kolumna empName jest sformatowana wielkimi literami.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Sortuj dane według kolumny

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Sortuj malejąco

desc („Wynagrodzenie”)

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Pobierz i użyj poprzedniego wiersza (opóźnienie)

LAG to funkcja w języku SQL, która służy do uzyskiwania dostępu do poprzednich wartości wiersza w bieżącym wierszu. Jest to przydatne, gdy mamy przypadki użycia, takie jak porównanie z poprzednią wartością. LAG w ramkach danych Spark jest dostępny w funkcjach Windows

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Zdobądź i użyj następnego rzędu (ołów)

LEAD to funkcja SQL, która służy do uzyskiwania dostępu do wartości następnego wiersza w bieżącym wierszu. Jest to przydatne, gdy mamy przypadki użycia, takie jak porównanie z następną wartością. LEAD w ramkach danych Spark jest dostępny w funkcjach Windows

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Analiza trendów z funkcjami okna Teraz zastosujmy funkcję okna LAG do użycia z prostą analizą trendu. Jeśli wynagrodzenie jest niższe niż w poprzednim miesiącu, oznaczymy je jako „W DÓŁ”, jeśli wynagrodzenie wzrosło, wówczas „W GÓRĘ”. Kod korzysta z funkcji Window, aby uporządkować, opóźnić, a następnie wykonać proste, jeśli inaczej z KIEDY.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow