Sök…


Broadcast-variabler

Broadcast-variabler är bara SparkContext.broadcast delade objekt som kan skapas med SparkContext.broadcast metoden:

val broadcastVariable = sc.broadcast(Array(1, 2, 3))

och läs med hjälp av value :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))

someRDD.map(
    i => broadcastVariable.value.apply(i % broadcastVariable.value.size)
)

ackumulatorer

Ackumulatorer är endast skrivvariabler som kan skapas med SparkContext.accumulator :

val accumulator = sc.accumulator(0, name = "My accumulator") // name is optional

modifierad med += :

val someRDD = sc.parallelize(Array(1, 2, 3, 4))
someRDD.foreach(element => accumulator += element)

och nås med value :

accumulator.value // 'value' is now equal to 10

Att använda ackumulatorer är komplicerat av Sparks körning minst en gång-garanti för transformationer. Om en omvandling behöver beräknas av någon anledning, uppdateras ackumulatorn under den transformationen. Detta innebär att ackumulatorvärden kan vara mycket annorlunda än de skulle vara om uppgifterna bara hade körts en gång.


Notera:

  1. Exekutorer kan inte läsa ackumulatorns värde. Endast drivprogrammet kan läsa ackumulatorns värde med hjälp av dess värdemetod.
  2. Det liknar nästan räknaren i Java / MapReduce. Så du kan relatera ackumulatorer till räknare för att förstå det lätt

Användardefinierad ackumulator i Scala

Definiera AccumulatorParam

import org.apache.spark.AccumulatorParam

object StringAccumulator extends AccumulatorParam[String] {
  def zero(s: String): String = s
  def addInPlace(s1: String, s2: String)=  s1 + s2
}

Använda sig av:

val accumulator = sc.accumulator("")(StringAccumulator)
sc.parallelize(Array("a", "b", "c")).foreach(accumulator += _)

Användardefinierad ackumulator i Python

Definiera AccumulatorParam :

from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = sc.accumulator("", StringAccumulator())

def add(x): 
    global accumulator
    accumulator += x

sc.parallelize(["a", "b", "c"]).foreach(add)


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow