Recherche…


Variables de diffusion

Les variables de diffusion sont des objets partagés en lecture seule qui peuvent être créés avec la méthode SparkContext.broadcast :

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

et lire en utilisant value méthode value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

Accumulateurs

Les accumulateurs sont des variables en écriture seule pouvant être créées avec SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

modifié avec += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

et accessible avec value méthode value :

accumulator.value // 'value' is now equal to 10

L'utilisation des accumulateurs est compliquée par la garantie d'exécution de Spark pour les transformations. Si une transformation doit être recalculée pour une raison quelconque, les mises à jour de l'accumulateur pendant cette transformation seront répétées. Cela signifie que les valeurs de l’accumulateur peuvent être très différentes de ce qu’elles seraient si les tâches n’avaient été exécutées qu’une seule fois.


Remarque:

  1. Les exécuteurs ne peuvent pas lire la valeur de l'accumulateur. Seul le programme pilote peut lire la valeur de l'accumulateur, en utilisant sa méthode de valeur.
  2. Il est presque similaire à counter dans Java / MapReduce. Vous pouvez donc associer les accumulateurs aux compteurs pour les comprendre facilement

Accumulateur défini par l'utilisateur dans Scala

Définir AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Utilisation:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Accumulateur défini par l'utilisateur en Python

Définir AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow