apache-spark
Apache Spark DataFramesの紹介
サーチ…
JAVAを使用したデータフレームのスパーク
DataFrameは、名前付き列にまとめられたデータの分散コレクションです。概念的には、リレーショナルデータベースのテーブルと同等です。データフレームは、構造化データファイル、Hive内のテーブル、外部データベース、または既存のRDDなど、さまざまなソースから構築できます。
Oracle RDBMSテーブルをsparkデータフレームに読み込む::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
必要に応じて、このデータフレームをrddに戻すこともできます。
JavaRDD<Row> rdd=df.javaRDD();
ファイルからデータフレームを作成する:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
今度はOracleからのデータフレームとファイルからのデータフレームがあります。同様に、ハイブからテーブルを読み取ることもできます。データフレームでは、rdbmsのように任意のカラムを取得できます。列や最大値の最小値を取得するのと同じです。列の平均/平均を計算できます。 select、filter、agg、groupByのような他の関数も利用できます。
Spark Dataframe説明
Sparkでは、DataFrameは名前付き列にまとめられたデータの分散コレクションです。リレーショナルデータベースのテーブルやR / Pythonのデータフレームと概念的には同等ですが、フードの下ではより豊富な最適化が行われています。データフレームは、構造化データファイル、ハイブのテーブル、外部データベース、または既存のRDDなど、幅広いソースから構築できます。
データフレームの作成方法
val data= spark.read.json("path to json")
オプションフィールドにval df = spark.read.format("com.databricks.spark.csv").load("test.txt")
を追加すると、ヘッダ、区切り文字、文字セットなどを提供することができます
RDDからDataframeを作成することもできます
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
スキーマでdfを作成する場合
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
SparkがRDDを提供している場合、Dataframeが必要な理由
RDDは、反復分散データセットであり、それに対して実行できる操作が制約されていないため、最適化できないデータのブラックボックスである。
ビルドされた最適化エンジンはありません:構造化されたデータを扱う場合、RDDはCatalyst OptimizerやTungsten実行エンジンなどのSparkの高度なオプティマイザを利用できません。開発者は、その属性に基づいて各RDDを最適化する必要があります。構造化データの処理:RDDは、データフレームやデータセットとは異なり、取り込まれたデータのスキーマを推測せず、ユーザーが指定する必要があります。
SparkのDataFramesは、クエリオプティマイザによって自動的に最適化されて実行されます。 DataFrameの計算が開始される前に、CatalystオプティマイザはDataFrameを構築するために使用された操作を実行のための物理計画にコンパイルします。オプティマイザは操作のセマンティクスとデータの構造を理解するため、計算を高速化するためにインテリジェントな決定を行うことができます。
DataFrameの制限
コンパイル時の型安全性:Dataframe APIは、構造がわからないときにデータを操作することを制限するコンパイル時の安全性をサポートしていません。