Zoeken…


Variabelen uitzenden

SparkContext.broadcast zijn alleen-lezen gedeelde objecten die kunnen worden gemaakt met de SparkContext.broadcast methode:

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

en lees met behulp van value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

accumulatoren

Accumulatoren zijn alleen-schrijven variabelen die kunnen worden gemaakt met SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

gewijzigd met += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

en toegankelijk met value :

accumulator.value // 'value' is now equal to 10

Het gebruik van accumulatoren wordt bemoeilijkt door Spark's minimaal één keer garantie voor transformaties. Als een transformatie om welke reden dan ook opnieuw moet worden berekend, wordt de accumulator-updates tijdens die transformatie herhaald. Dit betekent dat de accumulatiewaarden heel anders kunnen zijn dan wanneer een taak slechts eenmaal was uitgevoerd.


Notitie:

  1. Uitvoerders kunnen de waarde van de accumulator niet lezen. Alleen het stuurprogramma kan de waarde van de accumulator lezen met behulp van de waardemethode.
  2. Het is bijna hetzelfde als teller in Java / MapReduce. U kunt dus accumulatoren aan tellers relateren om het gemakkelijk te begrijpen

Door de gebruiker gedefinieerde accumulator in Scala

Definieer AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Gebruik:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Door de gebruiker gedefinieerde accumulator in Python

Definieer AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow