Buscar..


Observaciones

El paquete SparkR permite trabajar con marcos de datos distribuidos en la parte superior de un clúster Spark . Estos le permiten realizar operaciones como la selección, el filtrado y la agregación en conjuntos de datos muy grandes. Descripción general de SparkR Documentación del paquete SparkR

Configurar el contexto de Spark

Configurar el contexto de Spark en R

Para comenzar a trabajar con los marcos de datos distribuidos de Sparks, debe conectar su programa R con un clúster Spark existente.

library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context

Aquí hay información sobre cómo conectar su IDE a un clúster Spark.

Obtener Spark Cluster

Hay un tema de introducción de Apache Spark con instrucciones de instalación. Básicamente, puede emplear un Spark Cluster localmente a través de java ( consulte las instrucciones ) o usar aplicaciones en la nube (no gratuitas) (por ejemplo, Microsoft Azure [sitio del tema] , IBM ).

Datos de caché

Qué:

El almacenamiento en caché puede optimizar el cálculo en Spark. El almacenamiento en caché almacena los datos en la memoria y es un caso especial de persistencia. Aquí se explica lo que sucede cuando almacena en caché un RDD en Spark.

Por qué:

Básicamente, el almacenamiento en caché guarda un resultado parcial provisional, generalmente después de las transformaciones, de sus datos originales. Por lo tanto, cuando usa el RDD en caché, se accede a los datos ya transformados de la memoria sin volver a calcular las transformaciones anteriores.

Cómo:

Aquí hay un ejemplo de cómo acceder rápidamente a datos grandes (aquí, 3 GB csv grande) desde el almacenamiento en memoria cuando se accede a ellos más de una vez:

library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

# loading 3 GB big csv file:  
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)

Crear RDDs (Conjuntos de Datos Distribuidos Resistentes)

Desde el marco de datos:

mtrdd <- createDataFrame(sqlContext, mtcars)

Desde csv:

Para los csv, debe agregar el paquete csv al entorno antes de iniciar el contexto Spark:

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv -> 
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

Luego, puede cargar el csv deduciendo el esquema de datos de los datos en las columnas:

train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")

O especificando el esquema de datos de antemano:

 customSchema <- structType(
    structField("margin", "integer"),
    structField("gross", "integer"),
    structField("name", "string"))

 train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow