apache-spark
स्पार्क SQL में विंडो फ़ंक्शंस
खोज…
परिचय
विंडो फ़ंक्शंस का उपयोग ऑपरेशन (आम तौर पर एकत्रीकरण) को सामूहिक रूप से विंडो के रूप में कहा जाता है। स्पार्क 1.4 या बाद में विंडो फ़ंक्शंस काम करते हैं। विंडो फ़ंक्शंस अधिक संचालन प्रदान करता है फिर बिल्ट-इन फ़ंक्शंस या यूडीएफ, जैसे कि रूट या राउंड (स्पार्क 1.4 से पहले बड़े पैमाने पर उपयोग किया जाता है)। विंडो फ़ंक्शंस स्पार्क SQL के उपयोगकर्ताओं को किसी पंक्ति की श्रेणी या इनपुट पंक्तियों की एक चलती औसत जैसे परिणामों की गणना करने की अनुमति देते हैं। वे स्पार्क की एसक्यूएल और डेटाफ्रेम एपीआई की अभिव्यक्तियों में काफी सुधार करते हैं।
इसके मूल में, एक विंडो फ़ंक्शन एक तालिका के प्रत्येक इनपुट पंक्ति के लिए रिटर्न वैल्यू की गणना करता है, जिसे पंक्तियों के समूह के आधार पर फ़्रेम कहा जाता है। हर इनपुट पंक्ति में एक अद्वितीय फ्रेम जुड़ा हो सकता है। खिड़की के कार्यों की यह विशेषता उन्हें अन्य कार्यों की तुलना में अधिक शक्तिशाली बनाती है। विंडो फ़ंक्शंस के प्रकार हैं
- रैंकिंग कार्य
- विश्लेषणात्मक कार्य
- अलग-अलग कार्य
विंडो फ़ंक्शंस का उपयोग करने के लिए, उपयोगकर्ताओं को यह चिह्नित करने की आवश्यकता होती है कि फ़ंक्शन किसी विंडो फ़ंक्शन के रूप में उपयोग किया जाता है
- SQL में एक समर्थित फ़ंक्शन के बाद
OVER
क्लॉज जोड़ना, जैसेavg(revenue) OVER (...);
या - DataFrame API में एक समर्थित फ़ंक्शन पर ओवर मेथड को कॉल करना, जैसे
rank().over(...)
।
इस प्रलेखन का उद्देश्य उदाहरण के साथ उन कार्यों में से कुछ को प्रदर्शित करना है। यह माना जाता है कि पाठक को स्पार्क डेटाफ्रेम पर बुनियादी कार्यों के बारे में कुछ ज्ञान है जैसे: एक नया कॉलम जोड़ना, एक कॉलम का नाम बदलना आदि।
एक नमूना डेटासेट पढ़ना:
val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")
आयात विवरणों की सूची आवश्यक:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
पहला बयान Window Specification
आयात करता है। विंडो स्पेसिफिकेशन में ऐसी स्थितियां / स्पेसिफिकेशन्स होते हैं जो यह दर्शाते हैं कि विंडो में कौन सी पंक्तियाँ शामिल की जानी हैं।
scala> sampleData.show
+------+---------+------+
| Name| Role|Salary|
+------+---------+------+
| bob|Developer|125000|
| mark|Developer|108000|
| carl| Tester| 70000|
| peter|Developer|185000|
| jon| Tester| 65000|
| roman| Tester| 82000|
| simon|Developer| 98000|
| eric|Developer|144000|
|carlos| Tester| 75000|
| henry|Developer|110000|
+------+---------+------+
सामान्य गति
अपनी भूमिका के आधार पर नियोक्ताओं के वेतन के बढ़ते औसत की गणना करने के लिए:
val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
.over( Window.partitionBy("Role").rowsBetween(-1,1)) )
-
withColumn()
movingAverage
नामक एक नया कॉलम बनाता है, जोSalary
कॉलम परaverage
प्रदर्शन करता है -
over()
का उपयोग विंडो विनिर्देश को परिभाषित करने के लिए किया जाता है। -
partitionBy()
कॉलमRole
पर डेटा का विभाजन करता है -
rowsBetween(start, end)
यह फ़ंक्शन विंडो में शामिल होने वाली पंक्तियों को परिभाषित करता है। पैरामीटर (start
औरend
) संख्यात्मक इनपुट लेता है,0
वर्तमान पंक्ति का प्रतिनिधित्व करता है,-1
पिछली पंक्ति है,1
अगली पंक्ति है और इसी तरह। फ़ंक्शन मेंstart
औरend
बीच की सभी पंक्तियाँ शामिल हैं, इस प्रकार इस उदाहरण में तीन पंक्तियाँ (-1,0,1) विंडो में शामिल हैं।
scala> movAvg.show
+------+---------+------+------------------+
| Name| Role|Salary| movingAverage|
+------+---------+------+------------------+
| bob|Developer|125000| 116500.0|
| mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
| eric|Developer|144000|117333.33333333333|
| henry|Developer|110000| 127000.0|
| carl| Tester| 70000| 67500.0|
| jon| Tester| 65000| 72333.33333333333|
| roman| Tester| 82000| 74000.0|
|carlos| Tester| 75000| 78500.0|
+------+---------+------+------------------+
स्पार्क स्वचालित रूप से पिछली और अगली पंक्तियों को अनदेखा करता है, यदि वर्तमान पंक्ति क्रमशः पहली और अंतिम पंक्ति है।
उपरोक्त उदाहरण में, पहली पंक्ति का मूविंग एवरेज वर्तमान और अगली पंक्ति का औसत है, क्योंकि पिछली पंक्ति मौजूद नहीं है। इसी तरह विभाजन की अंतिम पंक्ति (यानी 6 वीं पंक्ति) वर्तमान और पिछली पंक्ति का औसत है, क्योंकि अगली पंक्ति मौजूद नहीं है।
संचयी योग
अपनी भूमिका के आधार पर नियोक्ताओं के वेतन के बढ़ते औसत की गणना करने के लिए:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
.over( Window.partitionBy("Role").orderBy("Salary")))
-
orderBy()
वेतन कॉलम को सॉर्ट करता है और संचयी योग की गणना करता है।
scala> cumSum.show +------+---------+------+-------------+ | Name| Role|Salary|cumulativeSum| +------+---------+------+-------------+ | simon|Developer| 98000| 98000| | mark|Developer|108000| 206000| | henry|Developer|110000| 316000| | bob|Developer|125000| 441000| | eric|Developer|144000| 585000| | peter|Developer|185000| 770000| | jon| Tester| 65000| 65000| | carl| Tester| 70000| 135000| |carlos| Tester| 75000| 210000| | roman| Tester| 82000| 292000| +------+---------+------+-------------+
विंडो फ़ंक्शंस - सॉर्ट, लीड, लैग, रैंक, ट्रेंड एनालिसिस
यह विषय दर्शाता है कि स्पार्क का उपयोग करके फ़ोकसुलेशन, लीड, लैग, लेवल आदि जैसे फ़ंक्शंस का उपयोग कैसे किया जाता है। स्पार्क डेटाफ्रेम स्पार्क कोर फंक्शनलिटीज पर एक sql अमूर्त परत है। यह वितरित डेटा पर SQL लिखने के लिए उपयोगकर्ता को सक्षम करता है। स्पार्क एसक्यूएल जेएसएन, एक्सएमएल, सीएसवी, टीएसवी आदि सहित विषम फ़ाइल स्वरूपों का समर्थन करता है।
इस ब्लॉग में हमने बताया कि एसक्यूएल दुनिया में आम उपयोग के मामलों के लिए स्पार्क एसक्यूएल और डेटाफ्रेम का उपयोग करने का एक त्वरित अवलोकन है। सादगी के लिए हम एक एकल फ़ाइल से निपटेंगे जो सीएसवी प्रारूप है। फ़ाइल के चार क्षेत्र हैं, कर्मचारी, कर्मचारी, वेतन, वेतन
1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016
इस फ़ाइल को emp.dat के रूप में सहेजें। पहले चरण में हम एक स्पार्क डेटाफ्रेम बनाकर उपयोग करेंगे, डेटाब्रीक से सीएसवी पैकेज को स्पार्क करेंगे।
val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
StructField("EmpName", StringType, false),
StructField("Salary", DoubleType, false),
StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("dateFormat", "MM/dd/yyyy")
.schema(schema)
.load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()
myDF शेष एक्सर्साइज़ में उपयोग किया जाने वाला डेटाफ्रेम है। चूंकि myDF का उपयोग बार-बार किया जाता है इसलिए इसे जारी रखने की सिफारिश की जाती है ताकि इसे पुनर्मूल्यांकित करने की आवश्यकता न हो।
myDF.persist()
डेटाफ्रेम शो का आउटपुट
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+
डेटाफ्रेम में एक नया कॉलम जोड़ें
चिंगारी डेटाफ़्रेम अपरिवर्तनीय होने के कारण, नए कॉलम को जोड़ने से जोड़े गए कॉलम के साथ एक नया डेटाफ़्रेम बन जाएगा। कॉलम कॉलम (कॉलमनाम, परिवर्तन) के साथ एक कॉलम उपयोग जोड़ने के लिए। नीचे के उदाहरण में स्तंभ का नाम अपरकेस पर स्वरूपित किया गया है।
withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()
+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+
एक कॉलम के आधार पर डेटा सॉर्ट करें
val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+
अवरोही क्रम
desc ( "वेतन")
myDF.sort(desc("Salary")).show()
+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+
पिछली पंक्ति (लाग) प्राप्त करें और उसका उपयोग करें
LAG SQL में एक फ़ंक्शन है जिसका उपयोग वर्तमान पंक्ति में पिछले पंक्ति मानों तक पहुंचने के लिए किया जाता है। यह उपयोगी है जब हमारे पास पिछले मूल्य के साथ तुलना जैसे मामलों का उपयोग होता है। स्पार्क डेटाफ्रेम में एलएजी विंडो कार्यों में उपलब्ध है
lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()
+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+
अगली पंक्ति (लीड) प्राप्त करें और उसका उपयोग करें
LEAD SQL में एक फ़ंक्शन है जो वर्तमान पंक्ति में अगली पंक्ति के मूल्यों तक पहुंचने के लिए उपयोग किया जाता है। यह उपयोगी है जब हमारे पास अगले मूल्य के साथ तुलना की तरह usecases है। स्पार्क डेटाफ्रेम में LEAD विंडो कार्यों में उपलब्ध है
lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.
import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()
+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+
विंडो फ़ंक्शंस के साथ ट्रेंड विश्लेषण अब, एक सरल ट्रेंड एनालिसिस के साथ उपयोग करने के लिए विंडो फ़ंक्शन LAG डालते हैं। यदि वेतन पिछले महीने से कम है, तो हम इसे "DOWN" के रूप में चिह्नित करेंगे, यदि वेतन में वृद्धि हुई है तो "UP"। कोड का उपयोग विंडो फ़ंक्शन द्वारा आदेश देने के लिए किया जाता है, अंतराल और उसके बाद एक सरल करें, जब अन्य के साथ।
val window = Window.orderBy("SalaryDate")
//Derive lag column for salary
val laggingCol = lag(col("Salary"), 1).over(trend_window)
//Use derived column LastSalary to find difference between current and previous row
val salaryDifference = col("Salary") - col("LastSalary")
//Calculate trend based on the difference
//IF ELSE / CASE can be written using when.otherwise in spark
val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
.when(col("SalaryDiff").>(0), "UP")
.otherwise("DOWN")
myDF.withColumn("LastSalary", laggingCol)
.withColumn("SalaryDiff",salaryDifference)
.withColumn("Trend", trend).show()
+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+