Suche…


Broadcast-Variablen

Broadcast-Variablen sind schreibgeschützte, gemeinsam genutzte Objekte, die mit der SparkContext.broadcast Methode erstellt werden können:

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

und mit value lesen:

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

Akkumulatoren

Akkumulatoren sind schreibgeschützte Variablen, die mit SparkContext.accumulator erstellt werden SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

modifiziert mit += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

und mit value aufgerufen:

accumulator.value // 'value' is now equal to 10

Die Verwendung von Akkumulatoren wird durch die Run-least-Once-Garantie von Spark für Transformationen kompliziert. Wenn eine Umwandlung aus irgendeinem Grund neu berechnet werden muss, werden die Aktualisierungen des Akkumulators während dieser Umwandlung wiederholt. Dies bedeutet, dass die Akkumulatorwerte sehr unterschiedlich sein können, wenn die Aufgaben nur einmal ausgeführt wurden.


Hinweis:

  1. Ausführende können den Wert des Akkumulators nicht lesen. Nur das Treiberprogramm kann den Wert des Akkumulators mit seiner Wertemethode lesen.
  2. Es ist fast ähnlich wie in Java / MapReduce. So können Sie Akkumulatoren mit Zählern verknüpfen, um sie leicht zu verstehen

Benutzerdefinierter Akkumulator in Scala

AccumulatorParam definieren

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Benutzen:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Benutzerdefinierter Akkumulator in Python

AccumulatorParam definieren:

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow