R Language
Spark API (SparkR)
Ricerca…
Osservazioni
Il pacchetto SparkR
ti consente di lavorare con frame di dati distribuiti su un cluster Spark . Questi ti permettono di fare operazioni come selezione, filtraggio, aggregazione su dataset molto grandi. SparkR panoramica sulla documentazione del pacchetto SparkR
Imposta il contesto Spark
Imposta contesto Spark in R
Per iniziare a lavorare con Sparks distribuiti con i dataframes, è necessario collegare il programma R con uno Spark Cluster esistente.
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
Ecco come collegare l'IDE a un cluster Spark.
Ottieni Spark Cluster
C'è un argomento introduttivo di Apache Spark con le istruzioni di installazione. In sostanza, è possibile utilizzare Spark Cluster localmente tramite java ( consultare le istruzioni ) o utilizzare applicazioni cloud (non libere) (ad es. Microsoft Azure [sito argomento] , IBM ).
Cache data
Che cosa:
Il caching può ottimizzare il calcolo in Spark. Il caching memorizza i dati in memoria ed è un caso speciale di persistenza. Qui viene spiegato cosa succede quando si memorizza nella cache un RDD in Spark.
Perché:
In sostanza, la memorizzazione nella cache consente di salvare un risultato parziale provvisorio, solitamente dopo le trasformazioni, dei dati originali. Pertanto, quando si utilizza l'RDD memorizzato nella cache, è possibile accedere ai dati già trasformati dalla memoria senza dover ricalcolare le trasformazioni precedenti.
Come:
Ecco un esempio di come accedere rapidamente ai dati di grandi dimensioni (qui 3 GB di grande csv) dalla memoria in memoria quando si accede più di una volta:
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
Creare RDD (dataset distribuiti resilienti)
Dal dataframe:
mtrdd <- createDataFrame(sqlContext, mtcars)
Da csv:
Per csv, è necessario aggiungere il pacchetto csv all'ambiente prima di iniziare il contesto Spark:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
Quindi, è possibile caricare il CSV o inferendo lo schema di dati dei dati nelle colonne:
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
O specificando in anticipo lo schema dei dati:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)