Ricerca…


Osservazioni

Il pacchetto SparkR ti consente di lavorare con frame di dati distribuiti su un cluster Spark . Questi ti permettono di fare operazioni come selezione, filtraggio, aggregazione su dataset molto grandi. SparkR panoramica sulla documentazione del pacchetto SparkR

Imposta il contesto Spark

Imposta contesto Spark in R

Per iniziare a lavorare con Sparks distribuiti con i dataframes, è necessario collegare il programma R con uno Spark Cluster esistente.

library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context

Ecco come collegare l'IDE a un cluster Spark.

Ottieni Spark Cluster

C'è un argomento introduttivo di Apache Spark con le istruzioni di installazione. In sostanza, è possibile utilizzare Spark Cluster localmente tramite java ( consultare le istruzioni ) o utilizzare applicazioni cloud (non libere) (ad es. Microsoft Azure [sito argomento] , IBM ).

Cache data

Che cosa:

Il caching può ottimizzare il calcolo in Spark. Il caching memorizza i dati in memoria ed è un caso speciale di persistenza. Qui viene spiegato cosa succede quando si memorizza nella cache un RDD in Spark.

Perché:

In sostanza, la memorizzazione nella cache consente di salvare un risultato parziale provvisorio, solitamente dopo le trasformazioni, dei dati originali. Pertanto, quando si utilizza l'RDD memorizzato nella cache, è possibile accedere ai dati già trasformati dalla memoria senza dover ricalcolare le trasformazioni precedenti.

Come:

Ecco un esempio di come accedere rapidamente ai dati di grandi dimensioni (qui 3 GB di grande csv) dalla memoria in memoria quando si accede più di una volta:

library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

# loading 3 GB big csv file:  
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)

Creare RDD (dataset distribuiti resilienti)

Dal dataframe:

mtrdd <- createDataFrame(sqlContext, mtcars)

Da csv:

Per csv, è necessario aggiungere il pacchetto csv all'ambiente prima di iniziare il contesto Spark:

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv -> 
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

Quindi, è possibile caricare il CSV o inferendo lo schema di dati dei dati nelle colonne:

train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")

O specificando in anticipo lo schema dei dati:

 customSchema <- structType(
    structField("margin", "integer"),
    structField("gross", "integer"),
    structField("name", "string"))

 train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow