Поиск…


Широковещательные переменные

Широковещательные переменные - это только общедоступные объекты, доступные только для чтения, которые могут быть созданы с SparkContext.broadcast метода SparkContext.broadcast :

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

и читать с использованием метода value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

Аккумуляторы

Аккумуляторы являются переменными только для записи, которые могут быть созданы с помощью SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

изменено с помощью += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

и доступ с помощью метода value :

accumulator.value // 'value' is now equal to 10

Использование аккумуляторов усложняется гарантией Spark по меньшей мере для преобразований. Если преобразование необходимо перерасчитать по любой причине, обновления аккумулятора в течение этого преобразования будут повторяться. Это означает, что значения аккумулятора могут быть очень разными, чем если бы задачи выполнялись только один раз.


Замечания:

  1. Исполнители не могут читать значение аккумулятора. Только программа драйвера может считывать значение аккумулятора, используя его метод значений.
  2. Он почти похож на счетчик в Java / MapReduce. Таким образом, вы можете связать аккумуляторы с счетчиками, чтобы понять это легко

Пользовательский Аккумулятор в Scala

Определить AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Использование:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Пользовательский Аккумулятор в Python

Определить AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow