apache-spark
Variables partagées
Recherche…
Variables de diffusion
Les variables de diffusion sont des objets partagés en lecture seule qui peuvent être créés avec la méthode SparkContext.broadcast
:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
et lire en utilisant value
méthode value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
Accumulateurs
Les accumulateurs sont des variables en écriture seule pouvant être créées avec SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
modifié avec +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
et accessible avec value
méthode value
:
accumulator.value // 'value' is now equal to 10
L'utilisation des accumulateurs est compliquée par la garantie d'exécution de Spark pour les transformations. Si une transformation doit être recalculée pour une raison quelconque, les mises à jour de l'accumulateur pendant cette transformation seront répétées. Cela signifie que les valeurs de l’accumulateur peuvent être très différentes de ce qu’elles seraient si les tâches n’avaient été exécutées qu’une seule fois.
Remarque:
- Les exécuteurs ne peuvent pas lire la valeur de l'accumulateur. Seul le programme pilote peut lire la valeur de l'accumulateur, en utilisant sa méthode de valeur.
- Il est presque similaire à counter dans Java / MapReduce. Vous pouvez donc associer les accumulateurs aux compteurs pour les comprendre facilement
Accumulateur défini par l'utilisateur dans Scala
Définir AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Utilisation:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Accumulateur défini par l'utilisateur en Python
Définir AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)