apache-spark ट्यूटोरियल                
            अपाचे-स्पार्क के साथ शुरुआत करना
        
        
            
    खोज…
टिप्पणियों
Apache Spark एक खुला स्रोत बड़ा डेटा प्रोसेसिंग फ्रेमवर्क है जो गति, उपयोग में आसानी और परिष्कृत एनालिटिक्स के आसपास बनाया गया है। एक डेवलपर को इसका उपयोग तब करना चाहिए जब वह बड़ी मात्रा में डेटा को संभालता है, जो आमतौर पर मेमोरी की सीमाओं और / या निषेधात्मक प्रसंस्करण समय को प्रभावित करता है।
यह अपाचे-स्पार्क के भीतर किसी भी बड़े विषयों का उल्लेख करना चाहिए, और संबंधित विषयों के लिए लिंक करना चाहिए। चूंकि अपाचे-स्पार्क के लिए दस्तावेज़ीकरण नया है, इसलिए आपको उन संबंधित विषयों के प्रारंभिक संस्करण बनाने की आवश्यकता हो सकती है।
संस्करण
| संस्करण | रिलीज़ की तारीख | 
|---|---|
| 2.2.0 | 2017/07/11 | 
| 2.1.1 | 2017/05/02 | 
| 2.1.0 | 2016/12/28 | 
| 2.0.1 | 2016/10/03 | 
| 2.0.0 | 2016/07/26 | 
| 1.6.0 | 2016/01/04 | 
| 1.5.0 | 2015/09/09 | 
| 1.4.0 | 2015/06/11 | 
| 1.3.0 | 2015-03-13 | 
| 1.2.0 | 2014-12-18 | 
| 1.1.0 | 2014-09-11 | 
| 1.0.0 | 2014-05-30 | 
| 0.9.0 | 2014-02-02 | 
| 0.8.0 | 2013-09-25 | 
| 0.7.0 | 2013-02-27 | 
| 0.6.0 | 2012-10-15 | 
परिचय
प्रोटोटाइप :
कुल (शून्यव्यू, seqOp, combOp)
विवरण :
 aggregate() आपको आरडीडी लेने और एकल मूल्य उत्पन्न करने की सुविधा देता है जो मूल आरडीडी में संग्रहीत की गई तुलना में भिन्न प्रकार का होता है। 
पैरामीटर :
-  zeroValue: आरंभिक मूल्य, आपके परिणाम के लिए, वांछित प्रारूप में।
-  seqOp: आप जिस ऑपरेशन कोseqOpरिकॉर्ड पर लागू करना चाहते हैं। विभाजन में हर रिकॉर्ड के लिए एक बार चलता है।
-  combOp: परिभाषित करता है किcombOpवस्तुएं (प्रत्येक विभाजन के लिए एक), संयुक्त कैसे हो जाती हैं।
उदाहरण :
किसी सूची के योग और उस सूची की लंबाई की गणना करें। परिणाम
(sum, length)की एक जोड़ी में लौटें।
स्पार्क शेल में, 4 विभाजन के साथ एक सूची बनाएं, जिसमें 2 विभाजन हैं :
listRDD = sc.parallelize([1,2,3,4], 2)
फिर seqOp को परिभाषित करें :
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
फिर कंघी को परिभाषित करें :
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
फिर एकत्रित:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
 पहले विभाजन में सबलिस्ट [1, 2] है। यह उस सूची के प्रत्येक तत्व पर seqOp को लागू करता है, जो एक स्थानीय परिणाम पैदा करता है - एक जोड़ी (sum, length) जो स्थानीय रूप से परिणाम को प्रतिबिंबित करेगा, केवल उस पहले विभाजन में। 
 local_result को zeroValue पैरामीटर aggregate() साथ आरंभ किया गया है। उदाहरण के लिए, (0, 0) और list_element सूची का पहला तत्व है: 
0 + 1 = 1
0 + 1 = 1
 स्थानीय परिणाम (1, 1) है, जिसका अर्थ है कि योग 1 भाग के लिए 1 और लंबाई 1 है, केवल पहले तत्व को संसाधित करने के बाद। local_result (0, 0), से (1, 1) तक अपडेट हो जाता है। 
1 + 2 = 3
1 + 1 = 2
स्थानीय परिणाम अब (3, 2) है, जो 1 विभाजन से अंतिम परिणाम होगा, क्योंकि वे 1 विभाजन के सबलिस्ट में कोई अन्य तत्व नहीं हैं। 2 डी रिटर्न (7, 2) के लिए समान करना।
अंतिम, वैश्विक परिणाम बनाने के लिए प्रत्येक स्थानीय परिणाम पर combOp लागू करें:
(3,2) + (7,2) = (10, 4)
'आंकड़ा' में वर्णित उदाहरण:
            (0, 0) <-- zeroValue
[1, 2]                  [3, 4]
0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1
1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)
परिवर्तन बनाम कार्रवाई
स्पार्क आलसी मूल्यांकन का उपयोग करता है; इसका मतलब यह है कि यह कोई भी काम नहीं करेगा, जब तक कि वास्तव में ऐसा न हो। यह दृष्टिकोण हमें अनावश्यक स्मृति उपयोग से बचने की अनुमति देता है, इस प्रकार हमें बड़े डेटा के साथ काम करने में सक्षम बनाता है।
किसी परिवर्तन का आलसी मूल्यांकन किया जाता है और वास्तविक कार्य तब होता है, जब कोई क्रिया होती है।
उदाहरण:
In [1]: lines = sc.textFile(file)        // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count()    // an action occurred, let the party start!
Out[3]: 0                              // no line with 'error', in this example
 इसलिए, [1] हमने स्पार्क को RDD में एक फाइल पढ़ने के लिए कहा, जिसका नाम है lines । स्पार्क ने हमें सुना और हमें बताया: "हाँ, मैं इसे करूँगा", लेकिन वास्तव में यह अभी तक फ़ाइल नहीं पढ़ी थी। 
 [२] में, हम फ़ाइल की लाइनों को फ़िल्टर कर रहे हैं, यह मानते हुए कि इसकी सामग्री में त्रुटि के साथ लाइनें हैं जो उनके प्रारंभ में error के साथ चिह्नित हैं। तो हम स्पार्क को एक नया आरडीडी बनाने के लिए कहते errors , जिसे errors कहा जाता है, जिसमें आरडीडी lines के तत्व होंगे, जिनके प्रारंभ में शब्द error थी। 
 अब [3] , हम स्पार्क को त्रुटियों को गिनने के लिए कहते हैं , अर्थात RDD नामक तत्वों की संख्या गिनते errors । count() एक क्रिया है , जो स्पार्क के लिए कोई विकल्प नहीं छोड़ती है, लेकिन वास्तव में ऑपरेशन करने के लिए, ताकि यह count() का परिणाम पा सके count() , जो एक पूर्णांक होगा। 
 परिणामस्वरूप, जब [3] पहुंच जाता है, [1] और [2] वास्तव में प्रदर्शन किया जाएगा, अर्थात जब हम [3] तक पहुँचेंगे, तब और तब: 
- फ़ाइल को पाठ में पढ़ा जा रहा है - textFile()(- [1])
- lines- filter()होंगी- filter()- [2]कारण
- count()निष्पादित करेगा, क्योंकि- [3]
 डीबग टिप: चूँकि स्पार्क कोई भी वास्तविक कार्य तब तक नहीं करेगा जब तक [3] पहुँच नहीं जाता है, यह समझना महत्वपूर्ण है कि यदि कोई त्रुटि [1] और / या [2] , तो यह तब तक प्रकट नहीं होगा, जब तक कि कार्रवाई में [3] वास्तविक काम करने के लिए स्पार्क को ट्रिगर करता है। उदाहरण के लिए यदि फ़ाइल में आपका डेटा startsWith() उपयोग नहीं करता है, तो [2] स्पार्क द्वारा ठीक से स्वीकार किया जा रहा है और यह कोई त्रुटि नहीं बढ़ाएगा, लेकिन जब [3] सबमिट किया जाएगा, और स्पार्क वास्तव में दोनों [1] और [2] मूल्यांकन करता है, और उसके बाद ही यह समझेगा कि कुछ [2] साथ सही नहीं है और एक वर्णनात्मक त्रुटि उत्पन्न करता है। 
 परिणामस्वरूप, [3] निष्पादित होने पर एक त्रुटि उत्पन्न हो सकती है, लेकिन इसका मतलब यह नहीं है कि त्रुटि [3] के कथन में निहित है! 
 ध्यान दें, [3] बाद मेमोरी में न तो lines और न ही errors संग्रहीत की जाएंगी। वे प्रसंस्करण निर्देशों के एक सेट के रूप में मौजूद रहेंगे। यदि इन RDD में से किसी एक पर कई कार्य किए जाएंगे, तो स्पार्क कई बार डेटा को पढ़ेगा और फ़िल्टर करेगा। एक ही RDD पर कई क्रियाओं को करते समय डुप्लिकेट संचालन से बचने के लिए, अक्सर cache का उपयोग करके डेटा को मेमोरी में स्टोर करना उपयोगी होता है। 
आप स्पार्क डॉक्स में अधिक परिवर्तन / क्रियाएं देख सकते हैं।
स्पार्क संस्करण की जाँच करें
 spark-shell : 
sc.version
आम तौर पर एक कार्यक्रम में:
SparkContext.version
 spark-submit का उपयोग spark-submit : 
 spark-submit --version