apache-spark ट्यूटोरियल
अपाचे-स्पार्क के साथ शुरुआत करना
खोज…
टिप्पणियों
Apache Spark एक खुला स्रोत बड़ा डेटा प्रोसेसिंग फ्रेमवर्क है जो गति, उपयोग में आसानी और परिष्कृत एनालिटिक्स के आसपास बनाया गया है। एक डेवलपर को इसका उपयोग तब करना चाहिए जब वह बड़ी मात्रा में डेटा को संभालता है, जो आमतौर पर मेमोरी की सीमाओं और / या निषेधात्मक प्रसंस्करण समय को प्रभावित करता है।
यह अपाचे-स्पार्क के भीतर किसी भी बड़े विषयों का उल्लेख करना चाहिए, और संबंधित विषयों के लिए लिंक करना चाहिए। चूंकि अपाचे-स्पार्क के लिए दस्तावेज़ीकरण नया है, इसलिए आपको उन संबंधित विषयों के प्रारंभिक संस्करण बनाने की आवश्यकता हो सकती है।
संस्करण
संस्करण | रिलीज़ की तारीख |
---|---|
2.2.0 | 2017/07/11 |
2.1.1 | 2017/05/02 |
2.1.0 | 2016/12/28 |
2.0.1 | 2016/10/03 |
2.0.0 | 2016/07/26 |
1.6.0 | 2016/01/04 |
1.5.0 | 2015/09/09 |
1.4.0 | 2015/06/11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014-12-18 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 2014-02-02 |
0.8.0 | 2013-09-25 |
0.7.0 | 2013-02-27 |
0.6.0 | 2012-10-15 |
परिचय
प्रोटोटाइप :
कुल (शून्यव्यू, seqOp, combOp)
विवरण :
aggregate()
आपको आरडीडी लेने और एकल मूल्य उत्पन्न करने की सुविधा देता है जो मूल आरडीडी में संग्रहीत की गई तुलना में भिन्न प्रकार का होता है।
पैरामीटर :
-
zeroValue
: आरंभिक मूल्य, आपके परिणाम के लिए, वांछित प्रारूप में। -
seqOp
: आप जिस ऑपरेशन कोseqOp
रिकॉर्ड पर लागू करना चाहते हैं। विभाजन में हर रिकॉर्ड के लिए एक बार चलता है। -
combOp
: परिभाषित करता है किcombOp
वस्तुएं (प्रत्येक विभाजन के लिए एक), संयुक्त कैसे हो जाती हैं।
उदाहरण :
किसी सूची के योग और उस सूची की लंबाई की गणना करें। परिणाम
(sum, length)
की एक जोड़ी में लौटें।
स्पार्क शेल में, 4 विभाजन के साथ एक सूची बनाएं, जिसमें 2 विभाजन हैं :
listRDD = sc.parallelize([1,2,3,4], 2)
फिर seqOp को परिभाषित करें :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
फिर कंघी को परिभाषित करें :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
फिर एकत्रित:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
पहले विभाजन में सबलिस्ट [1, 2] है। यह उस सूची के प्रत्येक तत्व पर seqOp को लागू करता है, जो एक स्थानीय परिणाम पैदा करता है - एक जोड़ी (sum, length)
जो स्थानीय रूप से परिणाम को प्रतिबिंबित करेगा, केवल उस पहले विभाजन में।
local_result
को zeroValue
पैरामीटर aggregate()
साथ आरंभ किया गया है। उदाहरण के लिए, (0, 0) और list_element
सूची का पहला तत्व है:
0 + 1 = 1
0 + 1 = 1
स्थानीय परिणाम (1, 1) है, जिसका अर्थ है कि योग 1 भाग के लिए 1 और लंबाई 1 है, केवल पहले तत्व को संसाधित करने के बाद। local_result
(0, 0), से (1, 1) तक अपडेट हो जाता है।
1 + 2 = 3
1 + 1 = 2
स्थानीय परिणाम अब (3, 2) है, जो 1 विभाजन से अंतिम परिणाम होगा, क्योंकि वे 1 विभाजन के सबलिस्ट में कोई अन्य तत्व नहीं हैं। 2 डी रिटर्न (7, 2) के लिए समान करना।
अंतिम, वैश्विक परिणाम बनाने के लिए प्रत्येक स्थानीय परिणाम पर combOp लागू करें:
(3,2) + (7,2) = (10, 4)
'आंकड़ा' में वर्णित उदाहरण:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
परिवर्तन बनाम कार्रवाई
स्पार्क आलसी मूल्यांकन का उपयोग करता है; इसका मतलब यह है कि यह कोई भी काम नहीं करेगा, जब तक कि वास्तव में ऐसा न हो। यह दृष्टिकोण हमें अनावश्यक स्मृति उपयोग से बचने की अनुमति देता है, इस प्रकार हमें बड़े डेटा के साथ काम करने में सक्षम बनाता है।
किसी परिवर्तन का आलसी मूल्यांकन किया जाता है और वास्तविक कार्य तब होता है, जब कोई क्रिया होती है।
उदाहरण:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
इसलिए, [1]
हमने स्पार्क को RDD में एक फाइल पढ़ने के लिए कहा, जिसका नाम है lines
। स्पार्क ने हमें सुना और हमें बताया: "हाँ, मैं इसे करूँगा", लेकिन वास्तव में यह अभी तक फ़ाइल नहीं पढ़ी थी।
[२] में, हम फ़ाइल की लाइनों को फ़िल्टर कर रहे हैं, यह मानते हुए कि इसकी सामग्री में त्रुटि के साथ लाइनें हैं जो उनके प्रारंभ में error
के साथ चिह्नित हैं। तो हम स्पार्क को एक नया आरडीडी बनाने के लिए कहते errors
, जिसे errors
कहा जाता है, जिसमें आरडीडी lines
के तत्व होंगे, जिनके प्रारंभ में शब्द error
थी।
अब [3]
, हम स्पार्क को त्रुटियों को गिनने के लिए कहते हैं , अर्थात RDD नामक तत्वों की संख्या गिनते errors
। count()
एक क्रिया है , जो स्पार्क के लिए कोई विकल्प नहीं छोड़ती है, लेकिन वास्तव में ऑपरेशन करने के लिए, ताकि यह count()
का परिणाम पा सके count()
, जो एक पूर्णांक होगा।
परिणामस्वरूप, जब [3]
पहुंच जाता है, [1]
और [2]
वास्तव में प्रदर्शन किया जाएगा, अर्थात जब हम [3]
तक पहुँचेंगे, तब और तब:
फ़ाइल को पाठ में पढ़ा जा रहा है
textFile()
([1]
)lines
filter()
होंगीfilter()
[2]
कारणcount()
निष्पादित करेगा, क्योंकि[3]
डीबग टिप: चूँकि स्पार्क कोई भी वास्तविक कार्य तब तक नहीं करेगा जब तक [3]
पहुँच नहीं जाता है, यह समझना महत्वपूर्ण है कि यदि कोई त्रुटि [1]
और / या [2]
, तो यह तब तक प्रकट नहीं होगा, जब तक कि कार्रवाई में [3]
वास्तविक काम करने के लिए स्पार्क को ट्रिगर करता है। उदाहरण के लिए यदि फ़ाइल में आपका डेटा startsWith()
उपयोग नहीं करता है, तो [2]
स्पार्क द्वारा ठीक से स्वीकार किया जा रहा है और यह कोई त्रुटि नहीं बढ़ाएगा, लेकिन जब [3]
सबमिट किया जाएगा, और स्पार्क वास्तव में दोनों [1]
और [2]
मूल्यांकन करता है, और उसके बाद ही यह समझेगा कि कुछ [2]
साथ सही नहीं है और एक वर्णनात्मक त्रुटि उत्पन्न करता है।
परिणामस्वरूप, [3]
निष्पादित होने पर एक त्रुटि उत्पन्न हो सकती है, लेकिन इसका मतलब यह नहीं है कि त्रुटि [3]
के कथन में निहित है!
ध्यान दें, [3]
बाद मेमोरी में न तो lines
और न ही errors
संग्रहीत की जाएंगी। वे प्रसंस्करण निर्देशों के एक सेट के रूप में मौजूद रहेंगे। यदि इन RDD में से किसी एक पर कई कार्य किए जाएंगे, तो स्पार्क कई बार डेटा को पढ़ेगा और फ़िल्टर करेगा। एक ही RDD पर कई क्रियाओं को करते समय डुप्लिकेट संचालन से बचने के लिए, अक्सर cache
का उपयोग करके डेटा को मेमोरी में स्टोर करना उपयोगी होता है।
आप स्पार्क डॉक्स में अधिक परिवर्तन / क्रियाएं देख सकते हैं।
स्पार्क संस्करण की जाँच करें
spark-shell
:
sc.version
आम तौर पर एक कार्यक्रम में:
SparkContext.version
spark-submit
का उपयोग spark-submit
:
spark-submit --version