apache-spark
Spark SQLのウィンドウ関数
サーチ…
前書き
ウィンドウ関数は、集合的にウィンドウと呼ばれる一連の行に対して操作(通常は集約)を行うために使用されます。ウィンドウ関数は、Spark 1.4以降で動作します。 Window関数は、substrまたはround(Spark 1.4より前に広く使用されている)などの組込み関数またはUDFより多くの操作を提供します。ウィンドウ関数を使用すると、Spark SQLのユーザーは、特定の行のランクや入力行の範囲にわたる移動平均などの結果を計算できます。 SparkのSQLおよびDataFrame APIの表現力を大幅に向上させます。
その中心にあるウィンドウ関数は、フレームと呼ばれる行のグループに基づいてテーブルの各入力行の戻り値を計算します。すべての入力行には、固有のフレームが関連付けられています。ウィンドウ関数のこの特性は、他の関数よりも強力です。ウィンドウ関数のタイプは次のとおりです。
- ランキング関数
- 分析関数
- 集計関数
ウィンドウ関数を使用するには、関数がウィンドウ関数として使用されることをユーザーがマークする必要があります
- SQLでサポートされている関数の後に
OVER
句を追加します。たとえば、avg(revenue) OVER (...);
または -
rank().over(...)
、DataFrame APIでサポートされている関数のoverメソッドを呼び出します。
このドキュメントは、これらの関数のいくつかを例で示すことを目的としています。読者は、Spark DataFrameの基本的な操作について、新しい列の追加、列の名前の変更などの知識があると仮定しています。
サンプルデータセットの読み込み:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
必要なインポートステートメントのリスト:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
最初のステートメントはWindow Specification
をインポートします。ウィンドウ仕様には、どの行がウィンドウに含まれるかを示す条件/仕様が含まれています。
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
移動平均
職種に基づいて雇用主の給与の移動平均を計算するには:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
という名前の新しい列を作成movingAverage
行い、average
上Salary
コラム -
over()
は、ウィンドウ指定を定義するために使用されます。 -
partitionBy()
列を介してデータを分割Role
-
rowsBetween(start, end)
この関数は、ウィンドウに含める行を定義します。パラメータ(start
とend
)は数値入力を取り、0
は現在の行を表し、-1
は前の行を表し、1
は次の行などを表します。この関数には、start
とend
間のすべての行が含まれているため、この例では3つの行(-1,0,1)がウィンドウに含まれています。
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
現在の行がそれぞれ最初の行と最後の行である場合、Sparkは自動的に前の行と次の行を無視します。
上記の例では、最初の行のmovingAverageは、前の行が存在しないため、現在の行と次の行のみの平均です。同様に、パーティションの最後の行(つまり6行目)は、次の行が存在しないため、現在の行と前の行の平均です。
累積合計
職種に基づいて雇用主の給与の移動平均を計算するには:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
salaryカラムをソートし、累積合計を計算します。
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
ウィンドウ関数 - ソート、リード、ラグ、ランク、傾向分析
このトピックでは、Sparkを使用してwithColumn、lead、lag、Levelなどの関数を使用する方法を示します。 Sparkのデータフレームは、スパークのコア機能のSQL抽象レイヤです。これにより、ユーザーは分散データにSQLを書き込むことができます。 Spark SQLは、JSON、XML、CSV、TSVなどのファイル形式をサポートしています。
このブログでは、スパークSQLとデータフレームをSQLワールドの一般的な使用例に使用する方法を簡単に紹介します。簡単にするために、CSV形式の1つのファイルを扱います。ファイルには、employeeID、employeeName、salary、salaryDateの4つのフィールドがあります。
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
このファイルをemp.datという名前で保存します。最初のステップでは、databricksからspark CSVパッケージを使用してスパークデータフレームを作成します。
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDFは、残りのエクササイズで使用されるデータフレームです。 myDFは繰り返し使用されるため、再評価する必要がないように永続化することをお勧めします。
myDF.persist()
データフレームの出力
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
データフレームに新しい列を追加する
スパークのデータフレームは不変なので、新しい列を追加すると、列が追加された新しいデータフレームが作成されます。列を追加するには、withColumn(columnName、Transformation)を使用します。以下の例の列では、empNameは大文字にフォーマットされています。
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
列に基づいてデータをソートする
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
降順ソート
desc( "給料")
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
前の行を取得して使用する(遅れ)
LAGはSQLの関数で、現在の行の前の行の値にアクセスするために使用されます。これは、以前の値との比較などの使用例がある場合に便利です。 SparkデータフレームのLAGは、ウィンドウ関数で利用可能です
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
次の行を取得して使用する(Lead)
LEADは、現在の行の次の行の値にアクセスするために使用されるSQLの関数です。これは、次の値との比較のような用途がある場合に便利です。 SparkデータフレームのLEADは、ウィンドウ関数で使用できます
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
ウィンドウ関数を使用したトレンド分析今度は、ウィンドウ関数LAGを単純なトレンド分析で使用するようにしましょう。給与が前月よりも低い場合、給与が「UP」になった場合は「DOWN」と表示します。このコードでは、WHENを使用して順序付け、遅延、単純化を行うためにWindow関数を使用します。
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+