apache-spark
Apache Spark DataFrames 소개
수색…
자바로 스파크 DataFrames
DataFrame은 명명 된 열로 구성된 데이터의 분산 된 컬렉션입니다. 관계형 데이터베이스의 테이블과 개념적으로 동일합니다. DataFrames는 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스로 구성 할 수 있습니다.
Oracle RDBMS 테이블을 스파크 데이터 프레임으로 읽는 방법 ::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
필요한 경우이 데이터 프레임을 다시 rdd로 변환 할 수도 있습니다.
JavaRDD<Row> rdd=df.javaRDD();
파일에서 데이터 프레임 만들기 :
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
이제 우리는 오라클의 데이터 프레임을 파일에서 가져 왔습니다. 유사하게 우리는 하이브에서 테이블을 읽을 수 있습니다. 데이터 프레임에서 우리는 rdbms에서하는 것처럼 모든 컬럼을 가져올 수 있습니다. 열 또는 최대 값에 대해 최소값을 얻는 것과 같습니다. 열에 대한 평균 / 평균을 계산할 수 있습니다. select, filter, agg, groupBy와 같은 다른 함수도 사용할 수 있습니다.
Spark Dataframe 설명
Spark에서 DataFrame은 명명 된 열로 구성된 데이터의 분산 컬렉션입니다. 관계형 데이터베이스의 테이블이나 R / Python의 데이터 프레임과 개념적으로는 동일하지만 더 자세한 최적화가 필요합니다. DataFrames는 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스로 구성 할 수 있습니다.
데이터 프레임 생성 방법
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
옵션 필드에 헤더, 구분 기호, 문자 세트 등을 제공 할 수 있습니다.
RDD에서 Dataframe을 생성 할 수도 있습니다.
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
스키마로 df를 생성하려면
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Spark에서 RDD를 제공 한 경우 Dataframe이 필요한 이유
RDD는 단지 탄력적 인 분산 데이터 세트로, 데이터에 대해 수행 할 수있는 작업이 제한되어 있지 않기 때문에 최적화 할 수없는 데이터의 블랙 박스에 가깝습니다.
inbuilt 최적화 엔진 없음 : 구조화 된 데이터로 작업 할 때 RDD는 Catalyst Optimizer 및 Tungsten 실행 엔진을 포함한 Spark의 고급 옵티 마이저를 이용할 수 없습니다. 개발자는 각 특성을 기반으로 각 RDD를 최적화해야합니다. 구조화 된 데이터 처리 : RDD는 데이터 프레임 및 데이터 세트와 달리 가져온 데이터의 스키마를 유추하지 않으며 사용자가 지정해야합니다.
Spark의 DataFrames는 쿼리 최적화 프로그램에 의해 자동으로 최적화되어 실행됩니다. DataFrame에 대한 계산이 시작되기 전에 Catalyst Optimizer는 DataFrame을 구축하는 데 사용 된 작업을 실제 계획으로 컴파일하여 실행합니다. 옵티마이 저는 조작의 의미 및 데이터 구조를 이해하므로 계산 속도를 높이기 위해 지능적인 결정을 내릴 수 있습니다.
DataFrame의 제한
컴파일 타임 유형 안전 : Dataframe API는 구조가 알려지지 않은 경우 데이터 조작을 제한하는 컴파일 타임 안전을 지원하지 않습니다.