खोज…


परिचय

विंडो फ़ंक्शंस का उपयोग ऑपरेशन (आम तौर पर एकत्रीकरण) को सामूहिक रूप से विंडो के रूप में कहा जाता है। स्पार्क 1.4 या बाद में विंडो फ़ंक्शंस काम करते हैं। विंडो फ़ंक्शंस अधिक संचालन प्रदान करता है फिर बिल्ट-इन फ़ंक्शंस या यूडीएफ, जैसे कि रूट या राउंड (स्पार्क 1.4 से पहले बड़े पैमाने पर उपयोग किया जाता है)। विंडो फ़ंक्शंस स्पार्क SQL के उपयोगकर्ताओं को किसी पंक्ति की श्रेणी या इनपुट पंक्तियों की एक चलती औसत जैसे परिणामों की गणना करने की अनुमति देते हैं। वे स्पार्क की एसक्यूएल और डेटाफ्रेम एपीआई की अभिव्यक्तियों में काफी सुधार करते हैं।

इसके मूल में, एक विंडो फ़ंक्शन एक तालिका के प्रत्येक इनपुट पंक्ति के लिए रिटर्न वैल्यू की गणना करता है, जिसे पंक्तियों के समूह के आधार पर फ़्रेम कहा जाता है। हर इनपुट पंक्ति में एक अद्वितीय फ्रेम जुड़ा हो सकता है। खिड़की के कार्यों की यह विशेषता उन्हें अन्य कार्यों की तुलना में अधिक शक्तिशाली बनाती है। विंडो फ़ंक्शंस के प्रकार हैं

  • रैंकिंग कार्य
  • विश्लेषणात्मक कार्य
  • अलग-अलग कार्य

विंडो फ़ंक्शंस का उपयोग करने के लिए, उपयोगकर्ताओं को यह चिह्नित करने की आवश्यकता होती है कि फ़ंक्शन किसी विंडो फ़ंक्शन के रूप में उपयोग किया जाता है

  • SQL में एक समर्थित फ़ंक्शन के बाद OVER क्लॉज जोड़ना, जैसे avg(revenue) OVER (...); या
  • DataFrame API में एक समर्थित फ़ंक्शन पर ओवर मेथड को कॉल करना, जैसे rank().over(...)

इस प्रलेखन का उद्देश्य उदाहरण के साथ उन कार्यों में से कुछ को प्रदर्शित करना है। यह माना जाता है कि पाठक को स्पार्क डेटाफ्रेम पर बुनियादी कार्यों के बारे में कुछ ज्ञान है जैसे: एक नया कॉलम जोड़ना, एक कॉलम का नाम बदलना आदि।

एक नमूना डेटासेट पढ़ना:

val sampleData = Seq( ("bob","Developer",125000),("mark","Developer",108000),("carl","Tester",70000),("peter","Developer",185000),("jon","Tester",65000),("roman","Tester",82000),("simon","Developer",98000),("eric","Developer",144000),("carlos","Tester",75000),("henry","Developer",110000)).toDF("Name","Role","Salary")

आयात विवरणों की सूची आवश्यक:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

पहला बयान Window Specification आयात करता है। विंडो स्पेसिफिकेशन में ऐसी स्थितियां / स्पेसिफिकेशन्स होते हैं जो यह दर्शाते हैं कि विंडो में कौन सी पंक्तियाँ शामिल की जानी हैं।

scala> sampleData.show
+------+---------+------+
|  Name|     Role|Salary|
+------+---------+------+
|   bob|Developer|125000|
|  mark|Developer|108000|
|  carl|   Tester| 70000|
| peter|Developer|185000|
|   jon|   Tester| 65000|
| roman|   Tester| 82000|
| simon|Developer| 98000|
|  eric|Developer|144000|
|carlos|   Tester| 75000|
| henry|Developer|110000|
+------+---------+------+

सामान्य गति

अपनी भूमिका के आधार पर नियोक्ताओं के वेतन के बढ़ते औसत की गणना करने के लिए:

val movAvg = sampleData.withColumn("movingAverage", avg(sampleData("Salary"))
             .over( Window.partitionBy("Role").rowsBetween(-1,1)) )
  • withColumn() movingAverage नामक एक नया कॉलम बनाता है, जो Salary कॉलम पर average प्रदर्शन करता है
  • over() का उपयोग विंडो विनिर्देश को परिभाषित करने के लिए किया जाता है।
  • partitionBy() कॉलम Role पर डेटा का विभाजन करता है
  • rowsBetween(start, end) यह फ़ंक्शन विंडो में शामिल होने वाली पंक्तियों को परिभाषित करता है। पैरामीटर ( start और end ) संख्यात्मक इनपुट लेता है, 0 वर्तमान पंक्ति का प्रतिनिधित्व करता है, -1 पिछली पंक्ति है, 1 अगली पंक्ति है और इसी तरह। फ़ंक्शन में start और end बीच की सभी पंक्तियाँ शामिल हैं, इस प्रकार इस उदाहरण में तीन पंक्तियाँ (-1,0,1) विंडो में शामिल हैं।
    scala> movAvg.show
+------+---------+------+------------------+
|  Name|     Role|Salary|     movingAverage|
+------+---------+------+------------------+
|   bob|Developer|125000|          116500.0|
|  mark|Developer|108000|139333.33333333334|
| peter|Developer|185000|130333.33333333333|
| simon|Developer| 98000|142333.33333333334|
|  eric|Developer|144000|117333.33333333333|
| henry|Developer|110000|          127000.0|
|  carl|   Tester| 70000|           67500.0|
|   jon|   Tester| 65000| 72333.33333333333|
| roman|   Tester| 82000|           74000.0|
|carlos|   Tester| 75000|           78500.0|
+------+---------+------+------------------+

स्पार्क स्वचालित रूप से पिछली और अगली पंक्तियों को अनदेखा करता है, यदि वर्तमान पंक्ति क्रमशः पहली और अंतिम पंक्ति है।

उपरोक्त उदाहरण में, पहली पंक्ति का मूविंग एवरेज वर्तमान और अगली पंक्ति का औसत है, क्योंकि पिछली पंक्ति मौजूद नहीं है। इसी तरह विभाजन की अंतिम पंक्ति (यानी 6 वीं पंक्ति) वर्तमान और पिछली पंक्ति का औसत है, क्योंकि अगली पंक्ति मौजूद नहीं है।

संचयी योग

अपनी भूमिका के आधार पर नियोक्ताओं के वेतन के बढ़ते औसत की गणना करने के लिए:

val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))
  • orderBy() वेतन कॉलम को सॉर्ट करता है और संचयी योग की गणना करता है।
scala> cumSum.show
+------+---------+------+-------------+                                         
|  Name|     Role|Salary|cumulativeSum|
+------+---------+------+-------------+
| simon|Developer| 98000|        98000|
|  mark|Developer|108000|       206000|
| henry|Developer|110000|       316000|
|   bob|Developer|125000|       441000|
|  eric|Developer|144000|       585000|
| peter|Developer|185000|       770000|
|   jon|   Tester| 65000|        65000|
|  carl|   Tester| 70000|       135000|
|carlos|   Tester| 75000|       210000|
| roman|   Tester| 82000|       292000|
+------+---------+------+-------------+

विंडो फ़ंक्शंस - सॉर्ट, लीड, लैग, रैंक, ट्रेंड एनालिसिस

यह विषय दर्शाता है कि स्पार्क का उपयोग करके फ़ोकसुलेशन, लीड, लैग, लेवल आदि जैसे फ़ंक्शंस का उपयोग कैसे किया जाता है। स्पार्क डेटाफ्रेम स्पार्क कोर फंक्शनलिटीज पर एक sql अमूर्त परत है। यह वितरित डेटा पर SQL लिखने के लिए उपयोगकर्ता को सक्षम करता है। स्पार्क एसक्यूएल जेएसएन, एक्सएमएल, सीएसवी, टीएसवी आदि सहित विषम फ़ाइल स्वरूपों का समर्थन करता है।

इस ब्लॉग में हमने बताया कि एसक्यूएल दुनिया में आम उपयोग के मामलों के लिए स्पार्क एसक्यूएल और डेटाफ्रेम का उपयोग करने का एक त्वरित अवलोकन है। सादगी के लिए हम एक एकल फ़ाइल से निपटेंगे जो सीएसवी प्रारूप है। फ़ाइल के चार क्षेत्र हैं, कर्मचारी, कर्मचारी, वेतन, वेतन

1,John,1000,01/01/2016
1,John,2000,02/01/2016
1,John,1000,03/01/2016
1,John,2000,04/01/2016
1,John,3000,05/01/2016
1,John,1000,06/01/2016

इस फ़ाइल को emp.dat के रूप में सहेजें। पहले चरण में हम एक स्पार्क डेटाफ्रेम बनाकर उपयोग करेंगे, डेटाब्रीक से सीएसवी पैकेज को स्पार्क करेंगे।

val sqlCont = new HiveContext(sc)
//Define a schema for file
val schema = StructType(Array(StructField("EmpId", IntegerType, false),
          StructField("EmpName", StringType, false),
          StructField("Salary", DoubleType, false),
          StructField("SalaryDate", DateType, false)))
//Apply Shema and read data to a dataframe
val myDF = sqlCont.read.format("com.databricks.spark.csv")
          .option("header", "false")
          .option("dateFormat", "MM/dd/yyyy")
          .schema(schema)
          .load("src/resources/data/employee_salary.dat")
//Show dataframe
myDF.show()

myDF शेष एक्सर्साइज़ में उपयोग किया जाने वाला डेटाफ्रेम है। चूंकि myDF का उपयोग बार-बार किया जाता है इसलिए इसे जारी रखने की सिफारिश की जाती है ताकि इसे पुनर्मूल्यांकित करने की आवश्यकता न हो।

 myDF.persist()

डेटाफ्रेम शो का आउटपुट

+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|1000.0|2016-03-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
| 1| John|1000.0|2016-06-01|
+-----+-------+------+----------+

डेटाफ्रेम में एक नया कॉलम जोड़ें

चिंगारी डेटाफ़्रेम अपरिवर्तनीय होने के कारण, नए कॉलम को जोड़ने से जोड़े गए कॉलम के साथ एक नया डेटाफ़्रेम बन जाएगा। कॉलम कॉलम (कॉलमनाम, परिवर्तन) के साथ एक कॉलम उपयोग जोड़ने के लिए। नीचे के उदाहरण में स्तंभ का नाम अपरकेस पर स्वरूपित किया गया है।

withColumn(columnName,transformation)
myDF.withColumn("FormatedName", upper(col("EmpName"))).show()


+-----+-------+------+----------+------------+
|EmpId|EmpName|Salary|SalaryDate|FormatedName|
+-----+-------+------+----------+------------+
| 1| John|1000.0|2016-01-01| JOHN|
| 1| John|2000.0|2016-02-01| JOHN|
| 1| John|1000.0|2016-03-01| JOHN|
| 1| John|2000.0|2016-04-01| JOHN|
| 1| John|3000.0|2016-05-01| JOHN|
| 1| John|1000.0|2016-06-01| JOHN|
+-----+-------+------+----------+------------+

एक कॉलम के आधार पर डेटा सॉर्ट करें

val sortedDf = myDF.sort(myDF.col("Salary"))
sortedDf.show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|1000.0|2016-03-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|3000.0|2016-05-01|
+-----+-------+------+----------+

अवरोही क्रम

desc ( "वेतन")

 myDF.sort(desc("Salary")).show()


+-----+-------+------+----------+
|EmpId|EmpName|Salary|SalaryDate|
+-----+-------+------+----------+
| 1| John|3000.0|2016-05-01|
| 1| John|2000.0|2016-02-01|
| 1| John|2000.0|2016-04-01|
| 1| John|1000.0|2016-06-01|
| 1| John|1000.0|2016-01-01|
| 1| John|1000.0|2016-03-01|
+-----+-------+------+----------+

पिछली पंक्ति (लाग) प्राप्त करें और उसका उपयोग करें

LAG SQL में एक फ़ंक्शन है जिसका उपयोग वर्तमान पंक्ति में पिछले पंक्ति मानों तक पहुंचने के लिए किया जाता है। यह उपयोगी है जब हमारे पास पिछले मूल्य के साथ तुलना जैसे मामलों का उपयोग होता है। स्पार्क डेटाफ्रेम में एलएजी विंडो कार्यों में उपलब्ध है

lag(Column e, int offset)
Window function: returns the value that is offset rows before the current row, and null if there is less than offset rows before the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary.
//For first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val lagCol = lag(col("Salary"), 1).over(window)
myDF.withColumn("LagCol", lagCol).show()

+-----+-------+------+----------+------+
|EmpId|EmpName|Salary|SalaryDate|LagCol|
+-----+-------+------+----------+------+
| 1| John|1000.0|2016-01-01| null|
| 1| John|2000.0|2016-02-01|1000.0|
| 1| John|1000.0|2016-03-01|2000.0|
| 1| John|2000.0|2016-04-01|1000.0|
| 1| John|3000.0|2016-05-01|2000.0|
| 1| John|1000.0|2016-06-01|3000.0|
+-----+-------+------+----------+------+

अगली पंक्ति (लीड) प्राप्त करें और उसका उपयोग करें

LEAD SQL में एक फ़ंक्शन है जो वर्तमान पंक्ति में अगली पंक्ति के मूल्यों तक पहुंचने के लिए उपयोग किया जाता है। यह उपयोगी है जब हमारे पास अगले मूल्य के साथ तुलना की तरह usecases है। स्पार्क डेटाफ्रेम में LEAD विंडो कार्यों में उपलब्ध है

lead(Column e, int offset)
Window function: returns the value that is offset rows after the current row, and null if there is less than offset rows after the current row.


import org.apache.spark.sql.expressions.Window
//order by Salary Date to get previous salary. F
//or first row we will get NULL
val window = Window.orderBy("SalaryDate")
//use lag to get previous row value for salary, 1 is the offset
val leadCol = lead(col("Salary"), 1).over(window)
myDF.withColumn("LeadCol", leadCol).show()





+-----+-------+------+----------+-------+
|EmpId|EmpName|Salary|SalaryDate|LeadCol|
+-----+-------+------+----------+-------+
| 1| John|1000.0|2016-01-01| 1000.0|
| 1| John|1000.0|2016-03-01| 1000.0|
| 1| John|1000.0|2016-06-01| 2000.0|
| 1| John|2000.0|2016-02-01| 2000.0|
| 1| John|2000.0|2016-04-01| 3000.0|
| 1| John|3000.0|2016-05-01| null|
+-----+-------+------+----------+-------+

विंडो फ़ंक्शंस के साथ ट्रेंड विश्लेषण अब, एक सरल ट्रेंड एनालिसिस के साथ उपयोग करने के लिए विंडो फ़ंक्शन LAG डालते हैं। यदि वेतन पिछले महीने से कम है, तो हम इसे "DOWN" के रूप में चिह्नित करेंगे, यदि वेतन में वृद्धि हुई है तो "UP"। कोड का उपयोग विंडो फ़ंक्शन द्वारा आदेश देने के लिए किया जाता है, अंतराल और उसके बाद एक सरल करें, जब अन्य के साथ।

   val window = Window.orderBy("SalaryDate")
    //Derive lag column for salary
    val laggingCol = lag(col("Salary"), 1).over(trend_window)
    //Use derived column LastSalary to find difference between current and previous row
    val salaryDifference = col("Salary") - col("LastSalary")
    //Calculate trend based on the difference
    //IF ELSE / CASE can be written using when.otherwise in spark
    val trend = when(col("SalaryDiff").isNull || col("SalaryDiff").===(0), "SAME")
    .when(col("SalaryDiff").>(0), "UP")
    .otherwise("DOWN")
    myDF.withColumn("LastSalary", laggingCol)
    .withColumn("SalaryDiff",salaryDifference)
   .withColumn("Trend", trend).show()

+-----+-------+------+----------+----------+----------+-----+
|EmpId|EmpName|Salary|SalaryDate|LastSalary|SalaryDiff|Trend|
+-----+-------+------+----------+----------+----------+-----+
| 1| John|1000.0|2016-01-01| null| null| SAME|
| 1| John|2000.0|2016-02-01| 1000.0| 1000.0| UP|
| 1| John|1000.0|2016-03-01| 2000.0| -1000.0| DOWN|
| 1| John|2000.0|2016-04-01| 1000.0| 1000.0| UP|
| 1| John|3000.0|2016-05-01| 2000.0| 1000.0| UP|
| 1| John|1000.0|2016-06-01| 3000.0| -2000.0| DOWN|
+-----+-------+------+----------+----------+----------+-----+


Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow