サーチ…


ブロードキャスト変数

ブロードキャスト変数は、 SparkContext.broadcastメソッドをSparkContext.broadcast作成できる読み取り専用の共有オブジェクトです。

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

valueメソッドを使用して読み取る:

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

アキュムレータ

アキュムレータは書き込み専用の変数で、 SparkContext.accumulatorで作成できます。

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

+=修正されました:

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

valueメソッドでアクセスしvalue

accumulator.value // 'value' is now equal to 10

アキュムレータを使用することは、最低1回の変換のためのSparkの実行保証によって複雑になります。何らかの理由で変換を再計算する必要がある場合、その変換中にアキュムレータが更新されます。つまり、アキュムレータの値は、タスクが1回だけ実行された場合とは大きく異なる場合があります。


注意:

  1. エグゼキュータアキュムレータの値を読み取ることができません 。ドライバプログラムのみが、そのvalueメソッドを使用してアキュムレータの値を読み取ることができます。
  2. Java / MapReduceのcounterとほぼ同じです。だからあなたは、アキュムレータをカウンターに関連させて、簡単に理解できるようにすることができます

Scalaのユーザー定義アキュムレータ

AccumulatorParam定義する

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

つかいます:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Pythonでのユーザ定義アキュムレータ

AccumulatorParam定義する:

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow