apache-spark
Introduzione a Apache Spark DataFrames
Ricerca…
Spark DataFrames con JAVA
Un DataFrame è una raccolta distribuita di dati organizzati in colonne nominate. È concettualmente equivalente a una tabella in un database relazionale. I DataFrames possono essere costruiti da una vasta gamma di fonti come: file di dati strutturati, tabelle in Hive, database esterni o RDD esistenti.
Lettura di una tabella Oracle RDBMS nel frame di dati spark ::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
Possiamo anche convertire questo frame di dati in rdd se necessario:
JavaRDD<Row> rdd=df.javaRDD();
Crea un dataframe da un file:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
Ora abbiamo dati frame da Oracle anche da un file. Allo stesso modo possiamo leggere un tavolo dall'alveare pure. Sul frame dei dati possiamo recuperare qualsiasi colonna come facciamo in rdbms. Come ottenere un valore minimo per una colonna o un valore massimo. Può calcolare una media / media per una colonna. Sono disponibili anche altre funzioni come select, filter, agg, groupBy.
Spark Dataframe ha spiegato
In Spark, DataFrame è una raccolta distribuita di dati organizzati in colonne nominate. È concettualmente equivalente a una tabella in un database relazionale o in un frame di dati in R / Python, ma con ottimizzazioni più ricche sotto il cofano. I DataFrames possono essere costruiti da una vasta gamma di fonti come file di dati strutturati, tabelle in Hive, database esterni o RDD esistenti.
Modi per creare Dataframe
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
nel campo delle opzioni, puoi fornire header, delimitatore, set di caratteri e molto altro
puoi anche creare Dataframe da un RDD
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
Se si desidera creare df con schema
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Perché abbiamo bisogno di Dataframe se Spark ha fornito RDD
Un RDD è semplicemente un dataset distribuito resiliente che è più di un blackbox di dati che non può essere ottimizzato in quanto le operazioni che possono essere eseguite contro di esso non sono così limitati.
Nessun motore di ottimizzazione integrato: quando si lavora con dati strutturati, gli RDD non possono trarre vantaggio dagli ottimizzatori avanzati di Spark, inclusi catalyst optimizer e motore di esecuzione di tungsteno. Gli sviluppatori devono ottimizzare ciascun RDD in base ai suoi attributi. Gestione dei dati strutturati: diversamente da Dataframe e set di dati, gli RDD non determinano lo schema dei dati acquisiti e richiede all'utente di specificarli.
DataFrames in Spark hanno la loro esecuzione automaticamente ottimizzata da un ottimizzatore di query. Prima dell'avvio di qualsiasi calcolo su DataFrame, l'ottimizzatore Catalyst compila le operazioni utilizzate per creare DataFrame in un piano fisico per l'esecuzione. Poiché l'ottimizzatore comprende la semantica delle operazioni e della struttura dei dati, può prendere decisioni intelligenti per accelerare il calcolo.
Limitazione di DataFrame
Sicurezza del tipo in fase di compilazione: l'API Dataframe non supporta la sicurezza in fase di compilazione che limita la manipolazione dei dati quando la struttura non è nota.