수색…


자바로 스파크 DataFrames

DataFrame은 명명 된 열로 구성된 데이터의 분산 된 컬렉션입니다. 관계형 데이터베이스의 테이블과 개념적으로 동일합니다. DataFrames는 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스로 구성 할 수 있습니다.

Oracle RDBMS 테이블을 스파크 데이터 프레임으로 읽는 방법 ::

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

필요한 경우이 데이터 프레임을 다시 rdd로 변환 할 수도 있습니다.

JavaRDD<Row> rdd=df.javaRDD();

파일에서 데이터 프레임 만들기 :

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

이제 우리는 오라클의 데이터 프레임을 파일에서 가져 왔습니다. 유사하게 우리는 하이브에서 테이블을 읽을 수 있습니다. 데이터 프레임에서 우리는 rdbms에서하는 것처럼 모든 컬럼을 가져올 수 있습니다. 열 또는 최대 값에 대해 최소값을 얻는 것과 같습니다. 열에 대한 평균 / 평균을 계산할 수 있습니다. select, filter, agg, groupBy와 같은 다른 함수도 사용할 수 있습니다.

Spark Dataframe 설명

Spark에서 DataFrame은 명명 된 열로 구성된 데이터의 분산 컬렉션입니다. 관계형 데이터베이스의 테이블이나 R / Python의 데이터 프레임과 개념적으로는 동일하지만 더 자세한 최적화가 필요합니다. DataFrames는 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스로 구성 할 수 있습니다.

데이터 프레임 생성 방법

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") 옵션 필드에 헤더, 구분 기호, 문자 세트 등을 제공 할 수 있습니다.

RDD에서 Dataframe을 생성 할 수도 있습니다.

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

스키마로 df를 생성하려면

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Spark에서 RDD를 제공 한 경우 Dataframe이 필요한 이유

RDD는 단지 탄력적 인 분산 데이터 세트로, 데이터에 대해 수행 할 수있는 작업이 제한되어 있지 않기 때문에 최적화 할 수없는 데이터의 블랙 박스에 가깝습니다.

inbuilt 최적화 엔진 없음 : 구조화 된 데이터로 작업 할 때 RDD는 Catalyst Optimizer 및 Tungsten 실행 엔진을 포함한 Spark의 고급 옵티 마이저를 이용할 수 없습니다. 개발자는 각 특성을 기반으로 각 RDD를 최적화해야합니다. 구조화 된 데이터 처리 : RDD는 데이터 프레임 및 데이터 세트와 달리 가져온 데이터의 스키마를 유추하지 않으며 사용자가 지정해야합니다.

Spark의 DataFrames는 쿼리 최적화 프로그램에 의해 자동으로 최적화되어 실행됩니다. DataFrame에 대한 계산이 시작되기 전에 Catalyst Optimizer는 DataFrame을 구축하는 데 사용 된 작업을 실제 계획으로 컴파일하여 실행합니다. 옵티마이 저는 조작의 의미 및 데이터 구조를 이해하므로 계산 속도를 높이기 위해 지능적인 결정을 내릴 수 있습니다.

DataFrame의 제한

컴파일 타임 유형 안전 : Dataframe API는 구조가 알려지지 않은 경우 데이터 조작을 제한하는 컴파일 타임 안전을 지원하지 않습니다.



Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow