apache-spark
Introduktion till Apache Spark DataFrames
Sök…
Spark DataFrames med JAVA
En DataFrame är en distribuerad samling av data organiserade i namngivna kolumner. Det är konceptuellt ekvivalent med en tabell i en relationsdatabas. DataFrames kan konstrueras från ett brett utbud av källor som: strukturerade datafiler, tabeller i Hive, externa databaser eller befintliga RDD: er.
Läsa en Oracle RDBMS-tabell i gnistdataram:
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
Vi kan också konvertera denna dataram till rdd om det behövs:
JavaRDD<Row> rdd=df.javaRDD();
Skapa en dataframe från en fil:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
Nu har vi dataram från oracle och från en fil. På liknande sätt kan vi också läsa en tabell från bikupa. På dataram kan vi hämta valfri kolumn som vi gör i rdbms. Som att få ett minvärde för en kolumn eller maxvärde. Kan beräkna ett medelvärde / medelvärde för en kolumn. Vissa andra funktioner som select, filter, agg, groupBy finns också tillgängliga.
Spark Dataframe förklarade
I Spark är en DataFrame en distribuerad samling av data organiserade i namngivna kolumner. Det är konceptuellt ekvivalent med en tabell i en relationsdatabas eller en dataram i R / Python, men med rikare optimeringar under huven. DataFrames kan konstrueras från en mängd olika källor som strukturerade datafiler, tabeller i Hive, externa databaser eller befintliga RDD: er.
Sätt att skapa Dataframe
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
i alternativfältet, du kan tillhandahålla rubrik, avgränsare, charset och mycket mer
Du kan också skapa Dataframe från en RDD
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
Om du vill skapa df med schema
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Varför vi behöver Dataframe om Spark har tillhandahållit RDD
En RDD är bara en fjädrande distribuerad databas som mer är en blackbox med data som inte kan optimeras eftersom de operationer som kan utföras mot den inte är lika begränsade.
Ingen inbyggd optimeringsmotor: När man arbetar med strukturerad data kan RDD: er inte dra fördel av Sparks avancerade optimisatorer inklusive katalysatoroptimerare och Tungsten exekveringsmotor. Utvecklare måste optimera varje RDD baserat på dess attribut. Hantera strukturerade data: Till skillnad från Dataframe och datasätt drar RDD inte ut schemat för den intagna informationen och kräver att användaren anger den.
DataFrames i Spark optimeras automatiskt med en frågaoptimerare. Innan någon beräkning på en DataFrame startar, sammanställer Catalyst-optimatorn de operationer som användes för att bygga DataFrame till en fysisk plan för exekvering. Eftersom optimeringsprogrammet förstår semantiken i operationerna och strukturen för data, kan den fatta intelligenta beslut för att påskynda beräkningen.
Begränsning av DataFrame
Säkerhet för kompileringstid: Dataframe API stöder inte kompilering av tidssäkerhet som begränsar dig från att manipulera data när strukturen inte är känd.