Buscar..


Introducción

Las funciones de ventana se utilizan para realizar operaciones (generalmente agregación) en un conjunto de filas llamadas colectivamente como ventana. Las funciones de la ventana funcionan en Spark 1.4 o posterior. Las funciones de la ventana proporcionan más operaciones que las funciones integradas o UDF, como substr o round (utilizadas ampliamente antes de Spark 1.4). Las funciones de la ventana permiten a los usuarios de Spark SQL calcular resultados como el rango de una fila dada o un promedio móvil en un rango de filas de entrada. Mejoran significativamente la expresividad de las API de SQL y DataFrame de Spark.

En su núcleo, una función de ventana calcula un valor de retorno para cada fila de entrada de una tabla basada en un grupo de filas, llamado Marco. Cada fila de entrada puede tener un marco único asociado a ella. Esta característica de las funciones de ventana las hace más poderosas que otras funciones. Los tipos de funciones de ventana son

  • Funciones de clasificación
  • Funciones analiticas
  • Funciones agregadas

Para usar las funciones de la ventana, los usuarios deben marcar que una función se usa como una función de la ventana ya sea por

  • Agregar una cláusula OVER después de una función compatible en SQL, por ejemplo, avg(revenue) OVER (...); o
  • Llamar al método over en una función admitida en la API DataFrame, por ejemplo, rank().over(...) Over rank().over(...) .

Esta documentación pretende demostrar algunas de esas funciones con ejemplo. Se supone que el lector tiene algún conocimiento sobre las operaciones básicas en Spark DataFrame como: agregar una nueva columna, cambiar el nombre de una columna, etc.

Leyendo un conjunto de datos de muestra:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Lista de declaraciones de importación requeridas:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

La primera declaración importa la Window Specification . Una especificación de ventana contiene condiciones / especificaciones que indican qué filas deben incluirse en la ventana.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Media móvil

Para calcular la media móvil del salario de los empleadores en función de su función:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() crea una nueva columna llamada movingAverage , que realiza un average en la columna Salary
  • over() se utiliza para definir la especificación de la ventana.
  • partitionBy() particiona los datos sobre la columna Role
  • rowsBetween(start, end) Esta función define las filas que se incluirán en la ventana. Los parámetros ( start y end ) toman entradas numéricas, 0 representa la fila actual, -1 es la fila anterior, 1 es la fila siguiente y así sucesivamente. La función incluye todas las filas entre el start y el end , por lo que en este ejemplo se incluyen tres filas (-1,0,1) en la ventana.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark ignora automáticamente las filas anteriores y siguientes, si la fila actual es la primera y la última fila respectivamente.

En el ejemplo anterior, movingAverage de la primera fila es el promedio de la fila actual y la siguiente solamente, ya que la fila anterior no existe. De manera similar, la última fila de la partición (es decir, la sexta fila) es el promedio de la fila actual y anterior, ya que la siguiente fila no existe.

Suma acumulativa

Para calcular la media móvil del salario de los empleadores en función de su función:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() ordena la columna de salario y calcula la suma acumulada.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Funciones de la ventana: clasificación, avance, desfase, clasificación, análisis de tendencias

Este tema muestra cómo usar funciones como con Columna, plomo, retraso, Nivel, etc. usando Spark. El marco de datos de Spark es una capa abstracta de sql en las funcionalidades de Spark Core Esto permite al usuario escribir SQL en datos distribuidos. Spark SQL es compatible con formatos de archivo heterogéneos, incluidos JSON, XML, CSV, TSV, etc.

En este blog, tenemos una descripción general rápida de cómo usar spark SQL y marcos de datos para casos de uso comunes en el mundo de SQL. Por simplicidad, trataremos un solo archivo con formato CSV. El archivo tiene cuatro campos, employeeID, employeeName ,larre, sueldoFecha

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Guarde este archivo como emp.dat. En el primer paso, crearemos un marco de datos de chispa usando el paquete CSV de chispa de databricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF es el marco de datos utilizado en el ejercicio restante. Debido a que myDF se usa repetidamente, se recomienda persistir para que no sea necesario reevaluarlo.

 myDF.persist()

Salida de muestra de datos

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Agregar una nueva columna a dataframe

Dado que los marcos de datos de chispa son inmutables, agregar una nueva columna creará un nuevo marco de datos con una columna adicional. Para agregar una columna, use withColumn (columnName, Transformation). En la columna de ejemplo a continuación empName está formateada en mayúsculas.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Ordenar los datos en base a una columna

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Orden descendiente

desc ("salario")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Obtener y usar la fila anterior (Lag)

LAG es una función en SQL que se utiliza para acceder a los valores de las filas anteriores en la fila actual. Esto es útil cuando tenemos casos de uso como la comparación con el valor anterior. LAG en los marcos de datos de Spark está disponible en las funciones de Windows

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Obtener y usar la siguiente fila (Lead)

LEAD es una función en SQL que se utiliza para acceder a los valores de la siguiente fila en la fila actual. Esto es útil cuando tenemos casos como la comparación con el siguiente valor. LEAD in Spark dataframes está disponible en las funciones de Windows

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Análisis de tendencias con funciones de ventana Ahora, pongamos la función de ventana LAG para usar con un simple análisis de tendencias. Si el salario es menor que el mes anterior, lo marcaremos como "ABAJO", si el salario aumentó, entonces "ARRIBA". El código usa la función de la ventana para ordenar, retrasar y luego hacer una simple si no con WHEN.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow