Ricerca…


Spark DataFrames con JAVA

Un DataFrame è una raccolta distribuita di dati organizzati in colonne nominate. È concettualmente equivalente a una tabella in un database relazionale. I DataFrames possono essere costruiti da una vasta gamma di fonti come: file di dati strutturati, tabelle in Hive, database esterni o RDD esistenti.

Lettura di una tabella Oracle RDBMS nel frame di dati spark ::

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

Possiamo anche convertire questo frame di dati in rdd se necessario:

JavaRDD<Row> rdd=df.javaRDD();

Crea un dataframe da un file:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Ora abbiamo dati frame da Oracle anche da un file. Allo stesso modo possiamo leggere un tavolo dall'alveare pure. Sul frame dei dati possiamo recuperare qualsiasi colonna come facciamo in rdbms. Come ottenere un valore minimo per una colonna o un valore massimo. Può calcolare una media / media per una colonna. Sono disponibili anche altre funzioni come select, filter, agg, groupBy.

Spark Dataframe ha spiegato

In Spark, DataFrame è una raccolta distribuita di dati organizzati in colonne nominate. È concettualmente equivalente a una tabella in un database relazionale o in un frame di dati in R / Python, ma con ottimizzazioni più ricche sotto il cofano. I DataFrames possono essere costruiti da una vasta gamma di fonti come file di dati strutturati, tabelle in Hive, database esterni o RDD esistenti.

Modi per creare Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") nel campo delle opzioni, puoi fornire header, delimitatore, set di caratteri e molto altro

puoi anche creare Dataframe da un RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Se si desidera creare df con schema

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Perché abbiamo bisogno di Dataframe se Spark ha fornito RDD

Un RDD è semplicemente un dataset distribuito resiliente che è più di un blackbox di dati che non può essere ottimizzato in quanto le operazioni che possono essere eseguite contro di esso non sono così limitati.

Nessun motore di ottimizzazione integrato: quando si lavora con dati strutturati, gli RDD non possono trarre vantaggio dagli ottimizzatori avanzati di Spark, inclusi catalyst optimizer e motore di esecuzione di tungsteno. Gli sviluppatori devono ottimizzare ciascun RDD in base ai suoi attributi. Gestione dei dati strutturati: diversamente da Dataframe e set di dati, gli RDD non determinano lo schema dei dati acquisiti e richiede all'utente di specificarli.

DataFrames in Spark hanno la loro esecuzione automaticamente ottimizzata da un ottimizzatore di query. Prima dell'avvio di qualsiasi calcolo su DataFrame, l'ottimizzatore Catalyst compila le operazioni utilizzate per creare DataFrame in un piano fisico per l'esecuzione. Poiché l'ottimizzatore comprende la semantica delle operazioni e della struttura dei dati, può prendere decisioni intelligenti per accelerare il calcolo.

Limitazione di DataFrame

Sicurezza del tipo in fase di compilazione: l'API Dataframe non supporta la sicurezza in fase di compilazione che limita la manipolazione dei dati quando la struttura non è nota.



Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow