Zoeken…


Invoering

Vensterfuncties worden gebruikt om bewerkingen uit te voeren (meestal aggregatie) op een reeks rijen die gezamenlijk als venster worden genoemd. Vensterfuncties werken in Spark 1.4 of hoger. Vensterfuncties bieden meer bewerkingen dan de ingebouwde functies of UDF's, zoals substr of round (veel gebruikt voor Spark 1.4). Met vensterfuncties kunnen gebruikers van Spark SQL resultaten berekenen, zoals de rang van een bepaalde rij of een voortschrijdend gemiddelde over een bereik van invoerrijen. Ze verbeteren de expressiviteit van Spark's SQL- en DataFrame-API's aanzienlijk.

In de kern berekent een vensterfunctie een retourwaarde voor elke invoerrij van een tabel op basis van een groep rijen, het frame genoemd. Aan elke invoerrij kan een uniek frame worden gekoppeld. Deze eigenschap van vensterfuncties maakt ze krachtiger dan andere functies. De soorten vensterfuncties zijn

  • Rangorde functies
  • Analytische functies
  • Geaggregeerde functies

Om vensterfuncties te gebruiken, moeten gebruikers markeren dat een functie door beide wordt gebruikt als vensterfunctie

  • Een OVER clausule toevoegen na een ondersteunde functie in SQL, bijv. avg(revenue) OVER (...); of
  • De over-methode aanroepen voor een ondersteunde functie in de DataFrame API, bijvoorbeeld rank().over(...) .

Deze documentatie is bedoeld om enkele van die functies aan de hand van voorbeelden te demonstreren. Er wordt aangenomen dat de lezer enige kennis heeft van basisbewerkingen op Spark DataFrame zoals: een nieuwe kolom toevoegen, een kolom hernoemen, enz.

Voorbeeldgegevensset lezen:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Lijst met vereiste invoerverklaringen:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

De eerste instructie importeert Window Specification . Een vensterspecificatie bevat voorwaarden / specificaties die aangeven welke rijen in het venster moeten worden opgenomen.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Voortschrijdend gemiddelde

Het voortschrijdend gemiddelde van het salaris van de werkgevers berekenen op basis van hun rol:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() maakt een nieuwe kolom met de naam movingAverage , het uitvoeren van average op Salary kolom
  • over() wordt gebruikt om de vensterspecificatie te definiëren.
  • partitionBy() verdeelt de gegevens over de kolom Role
  • rowsBetween(start, end) Deze functie definieert de rijen die in het venster moeten worden opgenomen. De parameters ( start en end ) hebben numerieke invoer, 0 vertegenwoordigt de huidige rij, -1 is de vorige rij, 1 is de volgende rij enzovoort. De functie omvat alle rijen tussen start en end , dus in dit voorbeeld zijn drie rijen (-1,0,1) in het venster opgenomen.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark negeert automatisch vorige en volgende rijen, als de huidige rij respectievelijk de eerste en de laatste rij is.

In het bovenstaande voorbeeld is het verplaatsen van Gemiddeld van de eerste rij alleen het gemiddelde van de huidige & volgende rij, omdat de vorige rij niet bestaat. Evenzo is de laatste rij van de partitie (dwz 6e rij) het gemiddelde van de huidige en vorige rij, omdat de volgende rij niet bestaat.

Cumulatieve som

Het voortschrijdend gemiddelde van het salaris van de werkgevers berekenen op basis van hun rol:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() sorteert de salariskolom en berekent de cumulatieve som.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Vensterfuncties - Sorteren, Lead, Lag, Rank, Trendanalyse

Dit onderwerp laat zien hoe u functies zoals WithColumn, Lead, Lag, Level enz. Kunt gebruiken met Spark. Spark dataframe is een sql abstract layer over spark core functionaliteiten. Hiermee kan de gebruiker SQL op gedistribueerde gegevens schrijven. Spark SQL ondersteunt hetrogene bestandsformaten, waaronder JSON, XML, CSV, TSV etc.

In deze blog hebben we een snel overzicht van het gebruik van spark SQL en dataframes voor veelvoorkomende gebruikssituaties in SQL-wereld. Voor de eenvoud zullen we een enkel bestand behandelen in CSV-formaat. Bestand heeft vier velden, employeeID, employeeName, salaris, salarisDatum

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Sla dit bestand op als emp.dat. In de eerste stap zullen we een vonkendataframe maken met behulp van het vonken CSV-pakket van databricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF is het dataframe dat wordt gebruikt in de resterende oefening. Omdat myDF herhaaldelijk wordt gebruikt, wordt aanbevolen om het aan te houden zodat het niet opnieuw hoeft te worden geëvalueerd.

 myDF.persist()

Uitvoer van dataframeshow

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Voeg een nieuwe kolom toe aan het dataframe

Aangezien spark-dataframes onveranderlijk zijn, maakt het toevoegen van een nieuwe kolom een nieuw dataframe met toegevoegde kolom. Gebruik een kolom met Kolom (kolomnaam, transformatie) om een kolom toe te voegen. In het onderstaande voorbeeld is de kolom empName opgemaakt in hoofdletters.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Gegevens sorteren op basis van een kolom

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Aflopend sorteren

desc ( "Salaris")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Vorige rij ophalen en gebruiken (Lag)

LAG is een functie in SQL die wordt gebruikt om toegang te krijgen tot vorige rijwaarden in de huidige rij. Dit is handig wanneer we gebruikssituaties zoals vergelijking met vorige waarde hebben. LAG in Spark-dataframes is beschikbaar in Window-functies

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Volgende rij ophalen en gebruiken (Lead)

LEAD is een functie in SQL die wordt gebruikt om toegang te krijgen tot de volgende rijwaarden in de huidige rij. Dit is handig wanneer we gebruik hebben zoals vergelijking met de volgende waarde. LEAD in Spark-dataframes is beschikbaar in Window-functies

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Trendanalyse met vensterfuncties Laten we nu de vensterfunctie LAG gebruiken voor een eenvoudige trendanalyse. Als het salaris lager is dan vorige maand, markeren we het als "DOWN", als het salaris is gestegen, dan "UP". De code gebruikt de Window-functie om te bestellen, te vertragen en vervolgens een eenvoudige als anders te doen met WHEN.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow