R Language
Spark API (SparkR)
Zoeken…
Opmerkingen
Met het SparkR
pakket kunt u werken met gedistribueerde SparkR
bovenop een Spark-cluster . Hiermee kunt u bewerkingen uitvoeren zoals selecteren, filteren, aggregeren op zeer grote gegevenssets. SparkR overzicht SparkR-pakketdocumentatie
Spark context instellen
Spark-context instellen in R
Om met Sparks gedistribueerde dataframes te kunnen werken, moet u uw R-programma verbinden met een bestaand Spark Cluster.
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
Hier vindt u informatie over het aansluiten van uw IDE op een Spark-cluster.
Koop Spark Cluster
Er is een introductieonderwerp van Apache Spark met installatie-instructies. Kortom, u kunt lokaal een Spark Cluster gebruiken via Java ( zie instructies ) of (niet-gratis) cloud-applicaties gebruiken (bijvoorbeeld Microsoft Azure [topic site] , IBM ).
Cachegegevens
Wat:
Caching kan de berekening in Spark optimaliseren. Caching slaat gegevens op in het geheugen en is een speciaal geval van persistentie. Hier wordt uitgelegd wat er gebeurt als u een RDD in de cache opslaat.
Waarom:
Kort gezegd slaat caching een tussentijds gedeeltelijk resultaat op - meestal na transformaties - van uw oorspronkelijke gegevens. Wanneer u dus de in de cache opgeslagen RDD gebruikt, worden de reeds getransformeerde gegevens uit het geheugen geopend zonder de eerdere transformaties opnieuw te berekenen.
Hoe:
Hier is een voorbeeld van hoe u snel toegang kunt krijgen tot grote gegevens (hier 3 GB grote csv) vanuit geheugenopslag wanneer u er meerdere keren toegang toe hebt:
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
RDD's maken (veerkrachtige gedistribueerde gegevenssets)
Van dataframe:
mtrdd <- createDataFrame(sqlContext, mtcars)
Van csv:
Voor csv's moet u het csv-pakket aan de omgeving toevoegen voordat u de Spark-context start:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
Vervolgens kunt u de csv laden door het gegevensschema van de gegevens in de kolommen af te leiden:
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
Of door vooraf het gegevensschema op te geven:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)