Поиск…


Вступление

Функции окна используются для выполнения операций (как правило, агрегации) в наборе строк, коллективно называемых как окно. Функции окна работают в Spark 1.4 или новее. Функции окна предоставляют больше операций, чем встроенные функции или UDF, такие как substr или round (широко используются до Spark 1.4). Функции окна позволяют пользователям Spark SQL вычислять такие результаты, как ранг данной строки или скользящего среднего по ряду входных строк. Они значительно улучшают выразительность API Spark и API DataFrame.

По своей сути, функция окна вычисляет возвращаемое значение для каждой строки ввода таблицы на основе группы строк, называемой Frame. Каждая строка ввода может иметь уникальный фрейм, связанный с ним. Эта особенность оконных функций делает их более мощными, чем другие функции. Типы оконных функций

  • Функции ранжирования
  • Аналитические функции
  • Совокупные функции

Чтобы использовать функции окна, пользователям нужно отметить, что функция используется как функция окна либо

  • Добавление предложения OVER после поддерживаемой функции в SQL, например avg(revenue) OVER (...); или же
  • Вызов метода over на поддерживаемой функции в API DataFrame, например rank().over(...) .

Эта документация призвана продемонстрировать некоторые из этих функций с примером. Предполагается, что читатель имеет некоторые знания по основным операциям в Spark DataFrame, например: добавление нового столбца, переименование столбца и т. Д.

Чтение образца набора данных:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

Список требуемых импортных заявлений:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Первый оператор импортирует Window Specification . Спецификация окна содержит условия / спецификации, указывающие, какие строки должны быть включены в окно.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

Скользящее среднее

Для расчета скользящей средней заработной платы работодателей на основе их роли:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() создает новый столбец с именем movingAverage , выполняющий average в столбце « Salary
  • over() используется для определения спецификации окна.
  • partitionBy() разделяет данные по столбцу Role
  • rowsBetween(start, end) Эта функция определяет строки, которые должны быть включены в окно. Параметры ( start и end ) принимают числовые входы, 0 представляет текущую строку, -1 - предыдущую строку, 1 - следующую строку и так далее. Функция включает все строки между start и end , поэтому в этом примере три окна (-1,0,1) включены в окно.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark автоматически игнорирует предыдущую и следующую строки, если текущая строка - первая и последняя строка соответственно.

В приведенном выше примере moveAverage первой строки является средним только для текущей и следующей строки, поскольку предыдущая строка не существует. Точно так же последняя строка раздела (т.е. 6-я строка) является средней по текущей и предыдущей строке, так как следующая строка не существует.

Суммарная сумма

Для расчета скользящей средней заработной платы работодателей на основе их роли:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() сортирует столбец зарплаты и вычисляет суммарную сумму.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

Функции окна - сортировка, ведение, запаздывание, ранжирование, анализ тенденций

В этом разделе показано, как использовать функции, такие как withColumn, lead, lag, Level и т. Д., Используя Spark. Spark dataframe является абстрактным слоем sql по функциональным возможностям искрового ядра. Это позволяет пользователю писать SQL на распределенных данных. Spark SQL поддерживает hetrogenous форматы файлов, включая JSON, XML, CSV, TSV и т. Д.

В этом блоге мы кратко рассмотрим, как использовать искру SQL и dataframes для распространенных случаев использования в мире SQL. Для простоты мы рассмотрим один файл, который является CSV-форматом. Файл имеет четыре поля, employeeID, employeeName, зарплату, payDate

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

Сохраните этот файл как emp.dat. На первом этапе мы создадим искровую фреймворк, используя искровой CSV-пакет из databricks.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF - это dataframe, используемый в оставшихся упражнениях. Поскольку myDF используется повторно, рекомендуется продолжать его, чтобы его не нужно переоценивать.

 myDF.persist()

Вывод отображения данных

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

Добавить новый столбец в dataframe

Поскольку искровые информационные фреймы неизменяемы, добавление нового столбца создаст новый фрейм данных с добавленным столбцом. Чтобы добавить использование столбца с помощью столбца (columnName, Transformation). В приведенном ниже примере столбец empName форматируется в верхнем регистре.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

Сортировка данных на основе столбца

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

Сортировать по убыванию

убывание ( «Зарплата»)

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

Получите и используйте предыдущую строку (Lag)

LAG - это функция в SQL, которая используется для доступа к предыдущим значениям строк в текущей строке. Это полезно, когда мы используем такие случаи, как сравнение с предыдущим значением. LAG в пакетах данных Spark доступна в функциях Window

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

Получить и использовать следующую строку (Lead)

LEAD - это функция в SQL, которая используется для доступа к следующим значениям строк в текущей строке. Это полезно, когда у нас есть такие случаи, как сравнение со следующим значением. LEAD в Spark dataframes доступен в функциях Window

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

Анализ тренда с помощью оконных функций. Теперь давайте использовать функцию LAG окна с использованием простого анализа тренда. Если зарплата меньше, чем в предыдущем месяце, мы будем отмечать ее как «DOWN», если зарплата увеличилась до «UP». Код использует функцию окна для упорядочивания, лаги, а затем делает простую, если else с WHEN.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow