apache-spark
Wspólne zmienne
Szukaj…
Zmienne rozgłoszeniowe
Zmienne rozgłoszeniowe są obiektami współdzielonymi tylko do odczytu, które można utworzyć za SparkContext.broadcast
metody SparkContext.broadcast
:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
i czytaj używając metody value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
Akumulatory
Akumulatory to zmienne tylko do zapisu, które można tworzyć za pomocą SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
zmodyfikowano za pomocą +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
i dostępne za pomocą metody value
:
accumulator.value // 'value' is now equal to 10
Korzystanie z akumulatorów jest skomplikowane dzięki gwarancji Spark na uruchomienie przynajmniej raz. Jeśli z jakiegoś powodu konieczne jest ponowne obliczenie transformacji, aktualizacje akumulatora podczas tej transformacji zostaną powtórzone. Oznacza to, że wartości akumulatorów mogą być bardzo różne niż byłyby, gdyby zadania zostały uruchomione tylko raz.
Uwaga:
- Wykonawcy nie mogą odczytać wartości akumulatora. Tylko program sterownika może odczytać wartość akumulatora przy użyciu metody jego wartości.
- Jest prawie podobny do licznika w Javie / MapReduce. Abyś mógł łatwo powiązać akumulatory z licznikami
Akumulator zdefiniowany przez użytkownika w Scali
Zdefiniuj AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Posługiwać się:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Akumulator zdefiniowany przez użytkownika w języku Python
Zdefiniuj AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)