apache-spark
Variabili condivise
Ricerca…
Variabili trasmesse
Le variabili broadcast sono letti solo oggetti condivisi che possono essere creati con il metodo SparkContext.broadcast
:
val broadcastVariable = sc.broadcast(Array(1, 2, 3))
e leggere usando il metodo del value
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.map(
i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)
accumulatori
Gli accumulatori sono variabili di sola scrittura che possono essere create con SparkContext.accumulator
:
val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional
modificato con +=
:
val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)
e accesso con il metodo del value
:
accumulator.value // 'value' is now equal to 10
L'uso degli accumulatori è complicato dalla garanzia "per lo meno una volta" di Spark per le trasformazioni. Se una trasformazione deve essere ricalcolata per qualsiasi motivo, gli aggiornamenti dell'accumulatore durante tale trasformazione verranno ripetuti. Ciò significa che i valori dell'accumulatore potrebbero essere molto diversi da quelli che sarebbero se le attività fossero eseguite una sola volta.
Nota:
- Gli esecutori non possono leggere il valore dell'accumulatore. Solo il programma driver può leggere il valore dell'accumulatore, usando il suo metodo di valore.
- È quasi simile al contatore in Java / MapReduce. Quindi puoi collegare gli accumulatori ai contatori per comprenderlo facilmente
Accumulatore definito dall'utente in Scala
Definisci AccumulatorParam
import org.apache.spark.AccumulatorParam
object StringAccumulator extends AccumulatorParam[String] {
def zero(s: String): String = s
def addInPlace(s1: String, s2: String)= s1 + s2
}
Uso:
val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)
Accumulatore definito dall'utente in Python
Definisci AccumulatorParam
:
from pyspark import AccumulatorParam
class StringAccumulator(AccumulatorParam):
def zero(self, s):
return s
def addInPlace(self, s1, s2):
return s1 + s2
accumulator = sc.accumulator("", StringAccumulator())
def add(x):
global accumulator
accumulator += x
sc.parallelize(["a", "b", "c"]).foreach(add)