apache-spark
共有変数
サーチ…
ブロードキャスト変数
ブロードキャスト変数は、 SparkContext.broadcast
メソッドをSparkContext.broadcast
作成できる読み取り専用の共有オブジェクトです。
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
value
メソッドを使用して読み取る:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
アキュムレータ
アキュムレータは書き込み専用の変数で、 SparkContext.accumulator
で作成できます。
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
+=
修正されました:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
value
メソッドでアクセスしvalue
。
accumulator.value // 'value' is now equal to 10
アキュムレータを使用することは、最低1回の変換のためのSparkの実行保証によって複雑になります。何らかの理由で変換を再計算する必要がある場合、その変換中にアキュムレータが更新されます。つまり、アキュムレータの値は、タスクが1回だけ実行された場合とは大きく異なる場合があります。
注意:
- エグゼキュータはアキュムレータの値を読み取ることができません 。ドライバプログラムのみが、そのvalueメソッドを使用してアキュムレータの値を読み取ることができます。
- Java / MapReduceのcounterとほぼ同じです。だからあなたは、アキュムレータをカウンターに関連させて、簡単に理解できるようにすることができます
Scalaのユーザー定義アキュムレータ
AccumulatorParam
定義する
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
つかいます:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Pythonでのユーザ定義アキュムレータ
AccumulatorParam
定義する:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)
Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow