Buscar..


Spark DataFrames con JAVA

Un DataFrame es una colección distribuida de datos organizados en columnas nombradas. Conceptualmente es equivalente a una tabla en una base de datos relacional. Los DataFrames se pueden construir a partir de una amplia gama de fuentes, tales como: archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes.

Leyendo una tabla de Oracle RDBMS en el marco de datos de chispa ::

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

También podemos convertir este marco de datos de nuevo a rdd si es necesario:

JavaRDD<Row> rdd=df.javaRDD();

Crear un marco de datos a partir de un archivo:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Ahora tenemos el marco de datos de Oracle también de un archivo. Del mismo modo podemos leer una tabla de colmena también. En el marco de datos podemos obtener cualquier columna como lo hacemos en rdbms. Como obtener un valor mínimo para una columna o valor máximo. Puede calcular una media / promedio para una columna. Algunas otras funciones como seleccionar, filtrar, agregar, agrupar también están disponibles.

Spark Dataframe explicado

En Spark, un DataFrame es una colección distribuida de datos organizados en columnas con nombre. Conceptualmente es equivalente a una tabla en una base de datos relacional o un marco de datos en R / Python, pero con optimizaciones más ricas bajo el capó. Los DataFrames se pueden construir a partir de una amplia gama de fuentes, como archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes.

Formas de crear Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") en el campo de opciones, puede proporcionar encabezado, delimitador, conjunto de caracteres y mucho más

También puede crear Dataframe desde un RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Si quieres crear df con esquema

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

¿Por qué necesitamos Dataframe si Spark ha proporcionado RDD?

Un RDD es simplemente un conjunto de datos distribuidos resilientes que es más bien una caja negra de datos que no pueden optimizarse, ya que las operaciones que se pueden realizar en su contra no están tan limitadas.

Ningún motor de optimización incorporado: cuando se trabaja con datos estructurados, los RDD no pueden aprovechar las ventajas de los optimizadores avanzados de Spark, como el optimizador de catalizador y el motor de ejecución de Tungsten. Los desarrolladores necesitan optimizar cada RDD en función de sus atributos. Manejo de datos estructurados: a diferencia de los marcos de datos y los conjuntos de datos, los RDD no deducen el esquema de los datos ingeridos y requieren que el usuario los especifique.

Los DataFrames en Spark tienen su ejecución optimizada automáticamente por un optimizador de consultas. Antes de que se inicie cualquier cálculo en un DataFrame, el optimizador de Catalyst compila las operaciones que se usaron para construir el DataFrame en un plan físico para su ejecución. Debido a que el optimizador entiende la semántica de las operaciones y la estructura de los datos, puede tomar decisiones inteligentes para acelerar el cálculo.

Limitación de DataFrame

Tipo de seguridad de tiempo de compilación: Dataframe API no admite la seguridad de tiempo de compilación, lo que le impide manipular datos cuando no se conoce la estructura.



Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow