apache-spark
Gedeelde variabelen
Zoeken…
Variabelen uitzenden
SparkContext.broadcast
zijn alleen-lezen gedeelde objecten die kunnen worden gemaakt met de SparkContext.broadcast
methode:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
en lees met behulp van value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
accumulatoren
Accumulatoren zijn alleen-schrijven variabelen die kunnen worden gemaakt met SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
gewijzigd met +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
en toegankelijk met value
:
accumulator.value // 'value' is now equal to 10
Het gebruik van accumulatoren wordt bemoeilijkt door Spark's minimaal één keer garantie voor transformaties. Als een transformatie om welke reden dan ook opnieuw moet worden berekend, wordt de accumulator-updates tijdens die transformatie herhaald. Dit betekent dat de accumulatiewaarden heel anders kunnen zijn dan wanneer een taak slechts eenmaal was uitgevoerd.
Notitie:
- Uitvoerders kunnen de waarde van de accumulator niet lezen. Alleen het stuurprogramma kan de waarde van de accumulator lezen met behulp van de waardemethode.
- Het is bijna hetzelfde als teller in Java / MapReduce. U kunt dus accumulatoren aan tellers relateren om het gemakkelijk te begrijpen
Door de gebruiker gedefinieerde accumulator in Scala
Definieer AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Gebruik:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Door de gebruiker gedefinieerde accumulator in Python
Definieer AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)