Ricerca…


Variabili trasmesse

Le variabili broadcast sono letti solo oggetti condivisi che possono essere creati con il metodo SparkContext.broadcast :

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

e leggere usando il metodo del value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

accumulatori

Gli accumulatori sono variabili di sola scrittura che possono essere create con SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

modificato con += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

e accesso con il metodo del value :

accumulator.value // 'value' is now equal to 10

L'uso degli accumulatori è complicato dalla garanzia "per lo meno una volta" di Spark per le trasformazioni. Se una trasformazione deve essere ricalcolata per qualsiasi motivo, gli aggiornamenti dell'accumulatore durante tale trasformazione verranno ripetuti. Ciò significa che i valori dell'accumulatore potrebbero essere molto diversi da quelli che sarebbero se le attività fossero eseguite una sola volta.


Nota:

  1. Gli esecutori non possono leggere il valore dell'accumulatore. Solo il programma driver può leggere il valore dell'accumulatore, usando il suo metodo di valore.
  2. È quasi simile al contatore in Java / MapReduce. Quindi puoi collegare gli accumulatori ai contatori per comprenderlo facilmente

Accumulatore definito dall'utente in Scala

Definisci AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Uso:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Accumulatore definito dall'utente in Python

Definisci AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow