R Language
Spark API (SparkR)
Buscar..
Observaciones
El paquete SparkR
permite trabajar con marcos de datos distribuidos en la parte superior de un clúster Spark . Estos le permiten realizar operaciones como la selección, el filtrado y la agregación en conjuntos de datos muy grandes. Descripción general de SparkR Documentación del paquete SparkR
Configurar el contexto de Spark
Configurar el contexto de Spark en R
Para comenzar a trabajar con los marcos de datos distribuidos de Sparks, debe conectar su programa R con un clúster Spark existente.
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
Aquí hay información sobre cómo conectar su IDE a un clúster Spark.
Obtener Spark Cluster
Hay un tema de introducción de Apache Spark con instrucciones de instalación. Básicamente, puede emplear un Spark Cluster localmente a través de java ( consulte las instrucciones ) o usar aplicaciones en la nube (no gratuitas) (por ejemplo, Microsoft Azure [sitio del tema] , IBM ).
Datos de caché
Qué:
El almacenamiento en caché puede optimizar el cálculo en Spark. El almacenamiento en caché almacena los datos en la memoria y es un caso especial de persistencia. Aquí se explica lo que sucede cuando almacena en caché un RDD en Spark.
Por qué:
Básicamente, el almacenamiento en caché guarda un resultado parcial provisional, generalmente después de las transformaciones, de sus datos originales. Por lo tanto, cuando usa el RDD en caché, se accede a los datos ya transformados de la memoria sin volver a calcular las transformaciones anteriores.
Cómo:
Aquí hay un ejemplo de cómo acceder rápidamente a datos grandes (aquí, 3 GB csv grande) desde el almacenamiento en memoria cuando se accede a ellos más de una vez:
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
Crear RDDs (Conjuntos de Datos Distribuidos Resistentes)
Desde el marco de datos:
mtrdd <- createDataFrame(sqlContext, mtcars)
Desde csv:
Para los csv, debe agregar el paquete csv al entorno antes de iniciar el contexto Spark:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
Luego, puede cargar el csv deduciendo el esquema de datos de los datos en las columnas:
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
O especificando el esquema de datos de antemano:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)