Recherche…


Spark DataFrames avec JAVA

Un DataFrame est une collection distribuée de données organisées en colonnes nommées. Il est conceptuellement équivalent à une table dans une base de données relationnelle. Les DataFrames peuvent être construits à partir d'un large éventail de sources telles que: les fichiers de données structurés, les tables dans Hive, les bases de données externes ou les RDD existants.

Lire une table Oracle RDBMS dans le bloc de données spark:

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

Nous pouvons également convertir ce bloc de données en rdd si besoin est:

JavaRDD<Row> rdd=df.javaRDD();

Créez un dataframe à partir d'un fichier:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Maintenant, nous avons aussi un bloc de données d'Oracle à partir d'un fichier. De même, nous pouvons également lire un tableau de ruche. Sur les trames de données, nous pouvons récupérer n'importe quelle colonne comme nous le faisons dans les rdbms. Comme pour obtenir une valeur min pour une colonne ou une valeur maximale. Peut calculer une moyenne / moyenne pour une colonne. Certaines autres fonctions comme select, filter, agg, groupBy sont également disponibles.

Spark Dataframe expliqué

Dans Spark, un DataFrame est une collection distribuée de données organisées en colonnes nommées. Il est conceptuellement équivalent à une table dans une base de données relationnelle ou un bloc de données dans R / Python, mais avec des optimisations plus riches sous le capot. Les DataFrames peuvent être construits à partir d'un large éventail de sources telles que des fichiers de données structurés, des tables dans Hive, des bases de données externes ou des RDD existants.

Façons de créer Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") dans le champ d'options, vous pouvez fournir en-tête, délimiteur, jeu de caractères et bien plus encore

vous pouvez également créer un Dataframe à partir d'un RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Si vous voulez créer df avec le schéma

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Pourquoi nous avons besoin de Dataframe si Spark a fourni RDD

Un RDD est simplement un ensemble de données distribué résilient qui est davantage une boîte noire de données qui ne peut pas être optimisée car les opérations qui peuvent être effectuées sur ce dernier ne sont pas aussi limitées.

Aucun moteur d'optimisation intégré: Lorsque vous travaillez avec des données structurées, les RDD ne peuvent pas tirer parti des optimiseurs avancés de Spark, notamment de l'optimiseur de catalyseur et du moteur d'exécution Tungsten. Les développeurs doivent optimiser chaque RDD en fonction de ses attributs. Gestion des données structurées: contrairement aux Dataframe et aux jeux de données, les RDD ne déduisent pas le schéma des données ingérées et exigent que l'utilisateur le spécifie.

Les DataFrames dans Spark ont ​​leur exécution optimisée automatiquement par un optimiseur de requête. Avant tout calcul sur un DataFrame, l'optimiseur Catalyst compile les opérations utilisées pour créer le DataFrame dans un plan physique à exécuter. Comme l'optimiseur comprend la sémantique des opérations et la structure des données, il peut prendre des décisions intelligentes pour accélérer le calcul.

Limitation de DataFrame

Sécurité du type au moment de la compilation: l'API Dataframe ne prend pas en charge la sécurité au moment de la compilation, ce qui vous empêche de manipuler des données lorsque la structure n'est pas connue.



Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow