apache-spark
Einführung in Apache Spark DataFrames
Suche…
Spark DataFrames mit JAVA
Ein DataFrame ist eine verteilte Sammlung von Daten, die in benannten Spalten organisiert sind. Sie entspricht konzeptionell einer Tabelle in einer relationalen Datenbank. DataFrames können aus einem breiten Spektrum von Quellen erstellt werden, z.
Lesen einer Oracle RDBMS-Tabelle in einen Spark-Datenrahmen:
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
Wir können diesen Datenrahmen bei Bedarf auch wieder in rdd konvertieren:
JavaRDD<Row> rdd=df.javaRDD();
Erstellen Sie einen Datenrahmen aus einer Datei:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
Jetzt haben wir auch Datenrahmen von Oracle sowie von einer Datei. Ebenso können wir eine Tabelle aus dem Bienenstock lesen. Im Datenrahmen können wir jede Spalte abrufen, wie in rdbms. B. einen Mindestwert für eine Spalte oder einen Höchstwert erhalten. Kann einen Mittelwert / Durchschnitt für eine Spalte berechnen. Einige andere Funktionen wie select, filter, agg, groupBy sind ebenfalls verfügbar.
Spark Dataframe erklärt
In Spark ist ein DataFrame eine verteilte Sammlung von Daten, die in benannten Spalten organisiert sind. Es entspricht konzeptionell einer Tabelle in einer relationalen Datenbank oder einem Datenrahmen in R / Python, jedoch mit reichhaltigeren Optimierungen unter der Haube. DataFrames können aus einer Vielzahl von Quellen erstellt werden, z. B. strukturierten Datendateien, Tabellen in Hive, externen Datenbanken oder vorhandenen RDDs.
Möglichkeiten zum Erstellen von Dataframe
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
Im Optionsfeld können Sie Header, Trennzeichen, Zeichensatz und vieles mehr val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
Sie können Dataframe auch aus einer RDD erstellen
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
Wenn Sie df mit Schema erstellen möchten
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Warum brauchen wir Dataframe, wenn Spark RDD bereitgestellt hat
Eine RDD ist lediglich eine ausfallsichere verteilte Datenmenge, die eher eine Blackbox von Daten ist, die nicht optimiert werden kann, da die Vorgänge, die für sie ausgeführt werden können, nicht so eingeschränkt sind.
Keine eingebaute Optimierungsengine: Bei der Arbeit mit strukturierten Daten können RDDs nicht die Vorteile der fortschrittlichen Optimierer von Spark nutzen, einschließlich Katalysatoroptimierer und Tungsten-Ausführungsengine. Entwickler müssen jede RDD basierend auf ihren Attributen optimieren. Umgang mit strukturierten Daten: Im Gegensatz zu Dataframe und Datasets können RDDs nicht das Schema der aufgenommenen Daten ableiten und müssen vom Benutzer angegeben werden.
Die Ausführung von DataFrames in Spark wird von einem Abfrageoptimierer automatisch optimiert. Vor dem Start einer Berechnung für einen DataFrame kompiliert der Catalyst-Optimierer die Vorgänge, die zum Erstellen des DataFrame verwendet wurden, in einen physischen Plan zur Ausführung. Da der Optimierer die Semantik von Operationen und die Struktur der Daten versteht, kann er intelligente Entscheidungen treffen, um die Berechnung zu beschleunigen.
Einschränkung von DataFrame
Typensicherheit beim Kompilieren: Die Dataframe-API unterstützt keine Sicherheit beim Kompilieren, sodass Sie keine Daten bearbeiten können, wenn die Struktur nicht bekannt ist.