apache-spark
Delade variabler
Sök…
Broadcast-variabler
Broadcast-variabler är bara SparkContext.broadcast
delade objekt som kan skapas med SparkContext.broadcast
metoden:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
och läs med hjälp av value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
ackumulatorer
Ackumulatorer är endast skrivvariabler som kan skapas med SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
modifierad med +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
och nås med value
:
accumulator.value // 'value' is now equal to 10
Att använda ackumulatorer är komplicerat av Sparks körning minst en gång-garanti för transformationer. Om en omvandling behöver beräknas av någon anledning, uppdateras ackumulatorn under den transformationen. Detta innebär att ackumulatorvärden kan vara mycket annorlunda än de skulle vara om uppgifterna bara hade körts en gång.
Notera:
- Exekutorer kan inte läsa ackumulatorns värde. Endast drivprogrammet kan läsa ackumulatorns värde med hjälp av dess värdemetod.
- Det liknar nästan räknaren i Java / MapReduce. Så du kan relatera ackumulatorer till räknare för att förstå det lätt
Användardefinierad ackumulator i Scala
Definiera AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Använda sig av:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Användardefinierad ackumulator i Python
Definiera AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)
Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow