apache-spark
Freigegebene Variablen
Suche…
Broadcast-Variablen
Broadcast-Variablen sind schreibgeschützte, gemeinsam genutzte Objekte, die mit der SparkContext.broadcast
Methode erstellt werden können:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
und mit value
lesen:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
Akkumulatoren
Akkumulatoren sind schreibgeschützte Variablen, die mit SparkContext.accumulator
erstellt werden SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
modifiziert mit +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
und mit value
aufgerufen:
accumulator.value // 'value' is now equal to 10
Die Verwendung von Akkumulatoren wird durch die Run-least-Once-Garantie von Spark für Transformationen kompliziert. Wenn eine Umwandlung aus irgendeinem Grund neu berechnet werden muss, werden die Aktualisierungen des Akkumulators während dieser Umwandlung wiederholt. Dies bedeutet, dass die Akkumulatorwerte sehr unterschiedlich sein können, wenn die Aufgaben nur einmal ausgeführt wurden.
Hinweis:
- Ausführende können den Wert des Akkumulators nicht lesen. Nur das Treiberprogramm kann den Wert des Akkumulators mit seiner Wertemethode lesen.
- Es ist fast ähnlich wie in Java / MapReduce. So können Sie Akkumulatoren mit Zählern verknüpfen, um sie leicht zu verstehen
Benutzerdefinierter Akkumulator in Scala
AccumulatorParam
definieren
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Benutzen:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Benutzerdefinierter Akkumulator in Python
AccumulatorParam
definieren:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)