수색…


소개

창 함수는 창으로 집합 적으로 호출되는 행 집합에 대한 연산 (일반적으로 집계)을 수행하는 데 사용됩니다. 창 함수는 Spark 1.4 이상에서 작동합니다. Window 함수는 substr 또는 round (Spark 1.4 이전에 광범위하게 사용됨)와 같은 내장 함수 또는 UDF보다 많은 연산을 제공합니다. Spark SQL 사용자는 창 함수를 사용하여 주어진 행의 순위 또는 입력 행 범위에 대한 이동 평균과 같은 결과를 계산할 수 있습니다. Spark의 SQL 및 DataFrame API의 표현력을 크게 향상시킵니다.

윈도우 함수는 프레임이라는 행 그룹을 기반으로 테이블의 모든 입력 행에 대한 반환 값을 계산합니다. 모든 입력 행에는 고유 한 프레임이 연관 될 수 있습니다. 창 함수의 이러한 특성은 다른 함수보다 더 강력합니다. 창 함수의 유형은 다음과 같습니다.

  • 순위 기능
  • 분석 함수
  • 집계 함수

창 함수를 사용하려면 사용자가 함수를 창 함수로 사용하거나

  • SQL에서 지원되는 함수 뒤에 OVER 절을 추가합니다 avg(revenue) OVER (...); 예 : avg(revenue) OVER (...); 또는
  • DataFrame API에서 지원되는 함수 rank().over(...) 예 : rank().over(...) 에서 over 메서드를 호출합니다.

이 문서는 예제를 통해 이러한 기능 중 일부를 보여줍니다. 독자가 Spark DataFrame의 기본 작업에 대해 몇 가지 지식을 갖고 있다고 가정합니다 : 새 열 추가, 열 이름 바꾸기 등.

샘플 데이터 세트 읽기 :

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

필요한 수입 명세서 목록 :

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

첫 번째 문은 Window Specification 가져옵니다. Window Specification은 어떤 행이 윈도우에 포함되는지를 나타내는 조건 / 사양을 포함합니다.

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

이동 평균

역할에 따라 고용주의 급여 이동 평균을 계산하려면 :

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn()movingAverage 라는 새 열을 만들고 Salary 열에서 average 를 수행합니다.
  • over() 는 윈도우 스펙을 정의하는 데 사용됩니다.
  • partitionBy() 는 열을 통해 데이터를 분할합니다. Role
  • rowsBetween(start, end) 이 함수는 윈도우에 포함될 행을 정의합니다. 매개 변수 ( startend )는 숫자 입력을 취하고 0 은 현재 행을 나타내고 -1 은 이전 행을 나타내며 1 은 다음 행을 나타냅니다. 이 함수에는 startend 사이의 모든 행이 포함되므로이 예제에서는 3 개의 행 (-1,0,1)이 창에 포함됩니다.
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

Spark은 현재 행이 각각 첫 행과 마지막 행인 경우 이전 행과 다음 행을 자동으로 무시합니다.

위의 예제에서 첫 번째 행의 movingAverage는 이전 행이 존재하지 않으므로 현재 행과 다음 행의 평균값입니다. 마찬가지로 파티션의 마지막 행 (즉, 6 행)은 다음 행이 존재하지 않기 때문에 현재 & 이전 행의 평균입니다.

누적 합계

역할에 따라 고용주의 급여 이동 평균을 계산하려면 :

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() 는 급여 열을 정렬하고 누적 합계를 계산합니다.
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

창 함수 - 정렬, 리드, 지연, 순위, 추세 분석

이 항목에서는 Spark를 사용하여 withColumn, lead, lag, Level 등의 함수를 사용하는 방법을 보여줍니다. 스파크 데이터 프레임은 스파크 코어 기능에 대한 SQL 추상 레이어입니다. 이를 통해 사용자는 분산 데이터에 SQL을 쓸 수 있습니다. Spark SQL은 JSON, XML, CSV, TSV 등의 파일 형식을 지원합니다.

이 블로그에서 우리는 SQL 세계에서 일반적인 사용 사례에 대해 spark SQL 및 데이터 프레임을 사용하는 방법에 대한 간단한 개요를 가지고 있습니다. 단순화를 위해 CSV 형식의 단일 파일을 다루겠습니다. 파일에는 employeeID, employeeName, salary, salaryDate 네 개의 필드가 있습니다.

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

이 파일을 emp.dat로 저장하십시오. 첫 번째 단계에서는 databricks에서 spark CSV 패키지를 사용하여 스파크 데이터 프레임을 만듭니다.

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF는 남은 운동에 사용되는 데이터 프레임입니다. myDF가 반복적으로 사용되기 때문에 재사용 할 필요가 없도록 myDF를 지속하는 것이 좋습니다.

 myDF.persist()

데이터 프레임 출력보기

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

데이터 프레임에 새 열 추가

스파크 데이터 프레임은 불변이므로 새로운 열을 추가하면 열이 추가 된 새로운 데이터 프레임이 생성됩니다. 열을 추가하려면 withColumn (columnName, Transformation)과 함께 사용하십시오. 아래 예제 열에서 empName은 대문자로 지정됩니다.

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

열을 기반으로 데이터 정렬

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

내림차순 정렬

desc ( "급여")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

이전 행 가져 오기 및 사용 (지연)

LAG는 현재 행의 이전 행 값에 액세스하는 데 사용되는 SQL 함수입니다. 이전 값과의 비교와 같은 사용 사례가있는 경우 유용합니다. Spark 데이터 프레임의 LAG는 Window 함수에서 사용할 수 있습니다.

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

다음 행 가져 오기 및 사용 (리드)

LEAD는 SQL에서 현재 행의 다음 행 값에 액세스하는 데 사용되는 함수입니다. 이것은 우리가 다음 가치와의 비교와 같은 활용법을 가지고있을 때 유용합니다. Spark 데이터 프레임의 LEAD는 Window 함수에서 사용할 수 있습니다.

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

윈도우 함수와 추세 분석 이제, 우리는 간단한 경향 분석과 함께 사용하는 윈도우 함수의 LAG를 만들어 보자. 급여가 지난 달보다 작 으면 급여가 "UP"으로 증가하면 "DOWN"으로 표시합니다. 코드는 Window 함수를 사용하여 순서를 지정하고, 지연 한 다음 WHEN을 사용하여 다른 경우 간단한 작업을 수행합니다.

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow