apache-spark チュートリアル
apache-sparkを使い始める
サーチ…
備考
Apache Sparkは、速度、使いやすさ、洗練された分析を基盤に構築されたオープンソースの大規模データ処理フレームワークです。開発者は、大量のデータを処理するときに使用する必要があります。大量のデータは、通常、メモリの制限や処理に時間がかかることを意味します。
また、apache-spark内の大きなテーマについても言及し、関連するトピックにリンクする必要があります。 apache-sparkのドキュメンテーションは新しいものなので、それらの関連トピックの初期バージョンを作成する必要があります。
バージョン
バージョン | 発売日 |
---|---|
2.2.0 | 2017-07-11 |
2.1.1 | 2017-05-02 |
2.1.0 | 2016-12-28 |
2.0.1 | 2016年10月03日 |
2.0.0 | 2016-07-26 |
1.6.0 | 2016年1月4日 |
1.5.0 | 2015-09-09 |
1.4.0 | 2015-06-11 |
1.3.0 | 2015-03-13 |
1.2.0 | 2014年12月18日 |
1.1.0 | 2014-09-11 |
1.0.0 | 2014-05-30 |
0.9.0 | 2014-02-02 |
0.8.0 | 2013-09-25 |
0.7.0 | 2013-02-27 |
0.6.0 | 2012-10-15 |
前書き
プロトタイプ :
集約(zeroValue、seqOp、combOp)
説明 :
aggregate()
使用すると、RDDを使用して元のRDDに格納されていたものとは異なるタイプの単一の値を生成できます。
パラメータ :
-
zeroValue
:結果の初期値。目的の形式で指定します。 -
seqOp
:RDDレコードに適用する操作。パーティション内のすべてのレコードに対して1回実行されます。 -
combOp
:結果オブジェクト(各パーティションに1つ)がどのように組み合わされるかを定義します。
例 :
リストとそのリストの長さの合計を計算します。結果を
(sum, length)
ペアで返します。
Sparkシェルでは、2つのパーティションを持つ4つの要素からなるリストを作成します。
listRDD = sc.parallelize([1,2,3,4], 2)
次に、 seqOpを定義します。
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
次に、 combOpを定義します。
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
次に、
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
最初のパーティションにはサブリスト[1、2]があります。これにより、そのリストの各要素にseqOpが適用され、ローカル結果が生成されます。結果をローカルに反映する(sum, length)
ペア。最初のパーティションのみ。
local_result
は、 zeroValue
られたzeroValue
パラメータaggregate()
に初期化されます。たとえば、(0、0)およびlist_element
はリストの最初の要素です。
0 + 1 = 1
0 + 1 = 1
ローカル結果は(1,1)です。つまり、最初の要素のみを処理した後の合計は1で、最初のパーティションの長さは1です。 local_result
は(0、0)から( local_result
に更新されます。
1 + 2 = 3
1 + 1 = 2
ローカル結果は(3,2)になり、第1パーティションのサブリストの他の要素ではないため、第1パーティションの最終結果となります。 2番目のパーティションで同じ処理を実行すると(7,2)が返されます。
各ローカル結果にcombOpを適用して、最終的なグローバル結果を作成します。
(3,2) + (7,2) = (10, 4)
'figure'に記載されている例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
トランスフォーメーション対アクション
Sparkは遅延評価を使用します。それは本当にしなければ、それは何の仕事もしないことを意味します。このアプローチにより、不要なメモリ使用を避けることができ、大きなデータを扱うことができます。
変換が遅延評価され、 アクションが発生したときに実際の作業が行われます。
例:
In [1]: lines = sc.textFile(file) // will run instantly, regardless file's size
In [2]: errors = lines.filter(lambda line: line.startsWith("error")) // run instantly
In [3]: errorCount = errors.count() // an action occurred, let the party start!
Out[3]: 0 // no line with 'error', in this example
だから、中に[1]
我々はRDD、という名前にファイルを読み取るためにスパークを語ったlines
。スパークは私たちのことを聞いて、私に言った: "はい私はそれをやる"しかし、実際にはまだファイルを読んでいない。
[2]では、私たちは、その内容がでマークされているエラーのある行含まれていると仮定して、ファイルの行をフィルタリングしているerror
彼らのスタートでは。そこでSparkに、 errors
と呼ばれる新しいRDDを作成するように指示します。これはRDD lines
要素を持ち、 error
時にerror
します。
今では[3]
我々はすなわちRDDと呼ばれる要素の数カウントし、 エラーを カウントするスパークを尋ねるerrors
持っています。 count()
はアクションであり、Sparkには選択肢を残すことなく、実際に操作を行うためcount()
結果が整数になります。
その結果、 [3]
に達すると、 [1]
と[2]
が実際に実行されます。すなわち、 [3]
に達したとき、
ファイルは
textFile()
読み込まれtextFile()
[1]
ため)lines
はfilter()
されfilter()
[2]
ため)[3]
ためにcount()
が実行され[3]
デバッグのヒント:Sparkは[3]
に達するまで実際の作業を行わないので、 [1]
や[2]
にエラーが存在する場合は表示されません。 [3]
実際の仕事をするために火花を発する。たとえば、ファイル内のデータが私が使用したstartsWith()
をサポートしていない場合、 [2]
はSparkによって正しく受け入れられ、エラーは発生しませんが、 [3]
が送信され、実際にSpark両方の評価[1]
と[2]
その後だけにして、それは何かがと正しくないことを理解するであろう[2]
と記述エラーを生成します。
その結果、エラーをしたときにトリガすることができる[3]
実行されたが、それはエラーがの声明の中で存在しなければならないことを意味するわけではない[3]
注意すべき点は、 [3]
あとでlines
やerrors
がメモリに保存されないことです。それらは一連の処理命令としてのみ存在し続けます。これらのRDDのいずれかで複数のアクションが実行される場合、sparkはデータを複数回読み取ってフィルタリングします。 1つのRDDで複数のアクションを実行する際の操作の重複を避けるため、 cache
を使用してデータをメモリに保存すると便利です。
Sparkのドキュメントでは、より多くの変換/アクションを見ることができます。
スパークのバージョンを確認する
spark-shell
:
sc.version
一般的にプログラムでは:
SparkContext.version
spark-submit
を使用spark-submit
:
spark-submit --version