apache-spark
Funciones de ventana en Spark SQL
Buscar..
Introducción
Las funciones de ventana se utilizan para realizar operaciones (generalmente agregación) en un conjunto de filas llamadas colectivamente como ventana. Las funciones de la ventana funcionan en Spark 1.4 o posterior. Las funciones de la ventana proporcionan más operaciones que las funciones integradas o UDF, como substr o round (utilizadas ampliamente antes de Spark 1.4). Las funciones de la ventana permiten a los usuarios de Spark SQL calcular resultados como el rango de una fila dada o un promedio móvil en un rango de filas de entrada. Mejoran significativamente la expresividad de las API de SQL y DataFrame de Spark.
En su núcleo, una función de ventana calcula un valor de retorno para cada fila de entrada de una tabla basada en un grupo de filas, llamado Marco. Cada fila de entrada puede tener un marco único asociado a ella. Esta característica de las funciones de ventana las hace más poderosas que otras funciones. Los tipos de funciones de ventana son
- Funciones de clasificación
- Funciones analiticas
- Funciones agregadas
Para usar las funciones de la ventana, los usuarios deben marcar que una función se usa como una función de la ventana ya sea por
- Agregar una cláusula
OVER
después de una función compatible en SQL, por ejemplo,avg(revenue) OVER (...);
o - Llamar al método over en una función admitida en la API DataFrame, por ejemplo,
rank().over(...)
Overrank().over(...)
.
Esta documentación pretende demostrar algunas de esas funciones con ejemplo. Se supone que el lector tiene algún conocimiento sobre las operaciones básicas en Spark DataFrame como: agregar una nueva columna, cambiar el nombre de una columna, etc.
Leyendo un conjunto de datos de muestra:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
Lista de declaraciones de importación requeridas:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
La primera declaración importa la Window Specification
. Una especificación de ventana contiene condiciones / especificaciones que indican qué filas deben incluirse en la ventana.
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
Media móvil
Para calcular la media móvil del salario de los empleadores en función de su función:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
crea una nueva columna llamadamovingAverage
, que realiza unaverage
en la columnaSalary
-
over()
se utiliza para definir la especificación de la ventana. -
partitionBy()
particiona los datos sobre la columnaRole
-
rowsBetween(start, end)
Esta función define las filas que se incluirán en la ventana. Los parámetros (start
yend
) toman entradas numéricas,0
representa la fila actual,-1
es la fila anterior,1
es la fila siguiente y así sucesivamente. La función incluye todas las filas entre elstart
y elend
, por lo que en este ejemplo se incluyen tres filas (-1,0,1) en la ventana.
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
Spark ignora automáticamente las filas anteriores y siguientes, si la fila actual es la primera y la última fila respectivamente.
En el ejemplo anterior, movingAverage de la primera fila es el promedio de la fila actual y la siguiente solamente, ya que la fila anterior no existe. De manera similar, la última fila de la partición (es decir, la sexta fila) es el promedio de la fila actual y anterior, ya que la siguiente fila no existe.
Suma acumulativa
Para calcular la media móvil del salario de los empleadores en función de su función:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
ordena la columna de salario y calcula la suma acumulada.
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
Funciones de la ventana: clasificación, avance, desfase, clasificación, análisis de tendencias
Este tema muestra cómo usar funciones como con Columna, plomo, retraso, Nivel, etc. usando Spark. El marco de datos de Spark es una capa abstracta de sql en las funcionalidades de Spark Core Esto permite al usuario escribir SQL en datos distribuidos. Spark SQL es compatible con formatos de archivo heterogéneos, incluidos JSON, XML, CSV, TSV, etc.
En este blog, tenemos una descripción general rápida de cómo usar spark SQL y marcos de datos para casos de uso comunes en el mundo de SQL. Por simplicidad, trataremos un solo archivo con formato CSV. El archivo tiene cuatro campos, employeeID, employeeName ,larre, sueldoFecha
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
Guarde este archivo como emp.dat. En el primer paso, crearemos un marco de datos de chispa usando el paquete CSV de chispa de databricks.
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDF es el marco de datos utilizado en el ejercicio restante. Debido a que myDF se usa repetidamente, se recomienda persistir para que no sea necesario reevaluarlo.
myDF.persist()
Salida de muestra de datos
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
Agregar una nueva columna a dataframe
Dado que los marcos de datos de chispa son inmutables, agregar una nueva columna creará un nuevo marco de datos con una columna adicional. Para agregar una columna, use withColumn (columnName, Transformation). En la columna de ejemplo a continuación empName está formateada en mayúsculas.
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
Ordenar los datos en base a una columna
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
Orden descendiente
desc ("salario")
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
Obtener y usar la fila anterior (Lag)
LAG es una función en SQL que se utiliza para acceder a los valores de las filas anteriores en la fila actual. Esto es útil cuando tenemos casos de uso como la comparación con el valor anterior. LAG en los marcos de datos de Spark está disponible en las funciones de Windows
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
Obtener y usar la siguiente fila (Lead)
LEAD es una función en SQL que se utiliza para acceder a los valores de la siguiente fila en la fila actual. Esto es útil cuando tenemos casos como la comparación con el siguiente valor. LEAD in Spark dataframes está disponible en las funciones de Windows
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
Análisis de tendencias con funciones de ventana Ahora, pongamos la función de ventana LAG para usar con un simple análisis de tendencias. Si el salario es menor que el mes anterior, lo marcaremos como "ABAJO", si el salario aumentó, entonces "ARRIBA". El código usa la función de la ventana para ordenar, retrasar y luego hacer una simple si no con WHEN.
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+