Buscar..


Variables de difusión

Las variables de difusión son solo objetos compartidos de lectura que se pueden crear con el método SparkContext.broadcast :

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

y leer usando el método de value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

Acumuladores

Los acumuladores son variables de solo escritura que se pueden crear con SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

modificado con += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

y accedido con método de value :

accumulator.value // 'value' is now equal to 10

El uso de acumuladores se complica por la garantía de Spark de ejecutar por lo menos una vez para transformaciones. Si es necesario volver a calcular una transformación por cualquier motivo, las actualizaciones del acumulador durante esa transformación se repetirán. Esto significa que los valores del acumulador pueden ser muy diferentes de lo que serían si las tareas se ejecutaran solo una vez.


Nota:

  1. Los ejecutores no pueden leer el valor del acumulador. Solo el programa controlador puede leer el valor del acumulador, utilizando su método de valor.
  2. Es casi similar a contrarrestar en Java / MapReduce. Para que puedas relacionar los acumuladores con los contadores y entenderlos fácilmente.

Acumulador definido por el usuario en Scala

Define AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Utilizar:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Acumulador definido por el usuario en Python

Define AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow