Szukaj…


Zmienne rozgłoszeniowe

Zmienne rozgłoszeniowe są obiektami współdzielonymi tylko do odczytu, które można utworzyć za SparkContext.broadcast metody SparkContext.broadcast :

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

i czytaj używając metody value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

Akumulatory

Akumulatory to zmienne tylko do zapisu, które można tworzyć za pomocą SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

zmodyfikowano za pomocą += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

i dostępne za pomocą metody value :

accumulator.value // 'value' is now equal to 10

Korzystanie z akumulatorów jest skomplikowane dzięki gwarancji Spark na uruchomienie przynajmniej raz. Jeśli z jakiegoś powodu konieczne jest ponowne obliczenie transformacji, aktualizacje akumulatora podczas tej transformacji zostaną powtórzone. Oznacza to, że wartości akumulatorów mogą być bardzo różne niż byłyby, gdyby zadania zostały uruchomione tylko raz.


Uwaga:

  1. Wykonawcy nie mogą odczytać wartości akumulatora. Tylko program sterownika może odczytać wartość akumulatora przy użyciu metody jego wartości.
  2. Jest prawie podobny do licznika w Javie / MapReduce. Abyś mógł łatwo powiązać akumulatory z licznikami

Akumulator zdefiniowany przez użytkownika w Scali

Zdefiniuj AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Posługiwać się:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Akumulator zdefiniowany przez użytkownika w języku Python

Zdefiniuj AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow