apache-spark
Fönsterfunktioner i Spark SQL
Sök…
Introduktion
Fönsterfunktioner används för att utföra operationer (vanligtvis aggregering) på en uppsättning rader som tillsammans kallas fönster. Fönsterfunktioner fungerar i Spark 1.4 eller senare. Fönsterfunktioner ger fler funktioner än de inbyggda funktionerna eller UDF: erna som underlag eller runda (som används mycket före Spark 1.4). Fönsterfunktioner gör det möjligt för användare av Spark SQL att beräkna resultat som rangordningen för en given rad eller ett rörligt medelvärde över ett antal inmatningsrader. De förbättrar avsevärt uttrycket för Sparks SQL- och DataFrame-API: er.
I sin kärna beräknar en fönsterfunktion ett returvärde för varje inmatningsrad i en tabell baserat på en grupp rader, kallad ramen. Varje inmatningsrad kan ha en unik ram associerad med den. Denna egenskap hos fönsterfunktioner gör dem mer kraftfulla än andra funktioner. Typerna av fönsterfunktioner är
- Rankingfunktioner
- Analytiska funktioner
- Samlade funktioner
För att kunna använda fönsterfunktioner måste användare markera att en funktion används som fönsterfunktion av endera
- Lägga till en
OVER
klausul efter en stödd funktion i SQL, t.ex.avg(revenue) OVER (...);
eller - Att kalla över-metoden på en stödd funktion i DataFrame API, t.ex.
rank().over(...)
.
Denna dokumentation syftar till att visa några av dessa funktioner med exempel. Det antas att läsaren har viss kunskap om grundläggande funktioner på Spark DataFrame som: lägga till en ny kolumn, byta namn på en kolumn etc.
Läser ett exempeldatasats:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
Lista över importmeddelanden som krävs:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
Det första uttalandet importerar Window Specification
. En fönsterspecifikation innehåller villkor / specifikationer som anger vilka rader som ska inkluderas i fönstret.
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
Glidande medelvärde
För att beräkna rörligt genomsnitt av lönen för arbetsgivarna baserat på deras roll:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
skapar en ny kolumn med namnetmovingAverage
, utföraverage
påSalary
-
over()
används för att definiera fönsterspecifikation. -
partitionBy()
partitionerar data över kolumnenRole
-
rowsBetween(start, end)
här funktionen definierar raderna som ska inkluderas i fönstret. Parametrarna (start
ochend
) tar numeriska ingångar,0
representerar den aktuella raden,-1
är den föregående raden,1
är nästa rad och så vidare. Funktionen inkluderar alla rader mellanstart
ochend
, så i detta exempel ingår tre rader (-1,0,1) i fönstret.
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
Spark ignorerar automatiskt föregående och nästa rader, om den aktuella raden är respektive den första raden.
I exemplet ovan är flyttning av den första raden genomsnittet av nuvarande och nästa rad, eftersom föregående rad inte finns. På liknande sätt är den sista raden i partitionen (dvs 6: e raden) genomsnittet för den aktuella och föregående raden, eftersom nästa rad inte finns.
Kumulativ summa
För att beräkna rörligt genomsnitt av lönen för arbetsgivarna baserat på deras roll:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
sorterar lönekolumnen och beräknar kumulativ summa.
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
Fönsterfunktioner - Sortera, Lead, Lag, Rank, Trend Analys
Det här ämnet visar hur man använder funktioner som withColumn, lead, lag, Level etc med Spark. Spark dataframe är ett sql abstrakt lager på gnistkärnfunktionaliteter. Detta gör det möjligt för användare att skriva SQL på distribuerad data. Spark SQL stöder hetrogena filformat inklusive JSON, XML, CSV, TSV etc.
I den här bloggen har vi en snabb översikt över hur man använder gnist SQL och dataframe för vanliga användningsfall i SQL-världen. För enkelhetens skull kommer vi att hantera en enda fil som är CSV-format. Filen har fyra fält, medarbetare-ID, medarbetarnamn, lön, lönedatum
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
Spara den här filen som emp.dat. I det första steget kommer vi att skapa en gnista dataframe med, sparka CSV-paket från databricks.
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDF är dataframe som används i återstående övningar. Eftersom myDF används upprepade gånger rekommenderas att fortsätta den så att den inte behöver omvärderas.
myDF.persist()
Utgång från dataframe-show
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
Lägg till en ny kolumn i dataframe
Eftersom gnistdataframe är oföränderliga, kommer att lägga till en ny kolumn skapa en ny datafram med tillagd kolumn. För att lägga till en kolumn använder du med kolumn (kolumnnamn, transformation). I nedanstående exempel är kolumnen empName formaterad till versaler.
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
Sortera data baserat på en kolumn
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
Sortera fallande
desc ( "lön")
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
Skaffa och använda föregående rad (Lag)
LAG är en funktion i SQL som används för att komma åt tidigare radvärden i den aktuella raden. Detta är användbart när vi använder fall som jämförelse med tidigare värde. LAG i Spark dataframes är tillgängliga i Window-funktioner
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
Skaffa och använda nästa rad (Lead)
LEAD är en funktion i SQL som används för att komma åt nästa radvärden i den aktuella raden. Detta är användbart när vi har usecases som jämförelse med nästa värde. LEAD i Spark dataframes är tillgängliga i Window-funktioner
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
Trendanalys med fönsterfunktioner Låt oss nu sätta fönsterfunktion LAG att använda med en enkel trendanalys. Om lönen är mindre än föregående månad kommer vi att markera den som "NER", om lönen har ökat så "UPP". Koden använder Fönsterfunktion för att beställa efter, lagra och sedan göra en enkel om annars med NÄR.
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+