apache-spark
Общие переменные
Поиск…
Широковещательные переменные
Широковещательные переменные - это только общедоступные объекты, доступные только для чтения, которые могут быть созданы с SparkContext.broadcast
метода SparkContext.broadcast
:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
и читать с использованием метода value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
Аккумуляторы
Аккумуляторы являются переменными только для записи, которые могут быть созданы с помощью SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
изменено с помощью +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
и доступ с помощью метода value
:
accumulator.value // 'value' is now equal to 10
Использование аккумуляторов усложняется гарантией Spark по меньшей мере для преобразований. Если преобразование необходимо перерасчитать по любой причине, обновления аккумулятора в течение этого преобразования будут повторяться. Это означает, что значения аккумулятора могут быть очень разными, чем если бы задачи выполнялись только один раз.
Замечания:
- Исполнители не могут читать значение аккумулятора. Только программа драйвера может считывать значение аккумулятора, используя его метод значений.
- Он почти похож на счетчик в Java / MapReduce. Таким образом, вы можете связать аккумуляторы с счетчиками, чтобы понять это легко
Пользовательский Аккумулятор в Scala
Определить AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Использование:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Пользовательский Аккумулятор в Python
Определить AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)