R Language
Spark-API (SparkR)
Suche…
Bemerkungen
Mit dem SparkR
Paket können Sie mit verteilten Datenrahmen über einem Spark-Cluster arbeiten . Damit können Sie Vorgänge wie Auswahl, Filterung und Aggregation für sehr große Datensätze durchführen. SparkR-Übersicht SparkR-Paketdokumentation
Spark-Kontext einrichten
Spark-Kontext in R einrichten
Um mit verteilten Sparks-Datenrahmen arbeiten zu können, müssen Sie Ihr R-Programm mit einem vorhandenen Spark-Cluster verbinden.
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
Hier erfahren Sie, wie Sie Ihre IDE mit einem Spark-Cluster verbinden.
Spark-Cluster abrufen
Es gibt ein Einführungsthema von Apache Spark mit Installationsanweisungen. Grundsätzlich können Sie einen Spark - Cluster lokal über Java (verwenden siehe Anleitung ) oder verwenden (non-free) Cloud - Anwendungen (zB Microsoft Azure [Themenseite] , IBM ).
Daten zwischenspeichern
Was:
Durch das Caching kann die Berechnung in Spark optimiert werden. Die Zwischenspeicherung speichert Daten im Speicher und ist ein Sonderfall von Persistenz. Hier wird erklärt, was passiert, wenn Sie eine RDD in Spark zwischenspeichern.
Warum:
Grundsätzlich speichert das Zwischenspeichern ein Zwischenergebnis - normalerweise nach Transformationen - Ihrer Originaldaten. Wenn Sie also die zwischengespeicherte RDD verwenden, wird auf die bereits umgewandelten Daten aus dem Speicher zugegriffen, ohne die vorherigen Umwandlungen neu zu berechnen.
Wie:
Hier ein Beispiel, wie Sie schnell auf große Daten (hier 3 GB große csv) zugreifen können, wenn Sie mehrmals darauf zugreifen:
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
Erstellen Sie RDDs (Resilient Distributed Datasets)
Vom Datenrahmen:
mtrdd <- createDataFrame(sqlContext, mtcars)
Von csv:
Für csvs müssen Sie das csv-Paket zur Umgebung hinzufügen, bevor Sie den Spark-Kontext initiieren:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
Dann können Sie die csv laden, indem Sie auf das Datenschema der Daten in den Spalten schließen:
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
Oder indem Sie vorher das Datenschema angeben:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)