Szukaj…


Spark DataFrames za pomocą JAVA

DataFrame to rozproszony zbiór danych zorganizowany w nazwane kolumny. Jest to koncepcyjnie odpowiednik tabeli w relacyjnej bazie danych. Ramki danych można konstruować z szerokiej gamy źródeł, takich jak: pliki danych strukturalnych, tabele w gałęzi, zewnętrzne bazy danych lub istniejące RDD.

Odczytywanie tabeli Oracle RDBMS do ramki danych Spark:

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

W razie potrzeby możemy również przekonwertować tę ramkę danych z powrotem na format rdd:

JavaRDD<Row> rdd=df.javaRDD();

Utwórz ramkę danych z pliku:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Teraz mamy ramkę danych z Oracle, a także z pliku. Podobnie możemy odczytać tabelę z ula. W ramce danych możemy pobrać dowolną kolumnę, tak jak w rdbms. Jak uzyskać wartość minimalną dla kolumny lub wartość maksymalną. Może obliczyć średnią / średnią dla kolumny. Dostępne są również inne funkcje, takie jak select, filter, agg, groupBy.

Spark Dataframe wyjaśnione

W Spark DataFrame to rozproszony zbiór danych zorganizowany w nazwane kolumny. Jest koncepcyjnie równoważny tabeli w relacyjnej bazie danych lub ramce danych w R / Python, ale z bogatszymi optymalizacjami pod maską. Ramki danych można konstruować z wielu różnych źródeł, takich jak pliki danych strukturalnych, tabele w gałęzi, zewnętrzne bazy danych lub istniejące RDD.

Sposoby tworzenia Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") w polu opcji możesz podać nagłówek, separator, zestaw znaków i wiele więcej

możesz również utworzyć ramkę danych z RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Jeśli chcesz utworzyć df ze schematem

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Dlaczego potrzebujemy Dataframe, jeśli Spark dostarczył RDD

RDD jest jedynie odpornym rozproszonym zestawem danych, który jest bardziej czarną skrzynką danych, której nie można zoptymalizować, ponieważ operacje, które można na nim wykonać, nie są tak ograniczone.

Brak wbudowanego silnika optymalizacyjnego: podczas pracy z danymi strukturalnymi RDD nie mogą korzystać z zaawansowanych optymalizatorów Spark, w tym optymalizatora katalizatora i silnika wykonawczego wolframu. Programiści muszą zoptymalizować każdy RDD na podstawie jego atrybutów. Obsługa danych strukturalnych: W przeciwieństwie do ramek danych i zestawów danych, RDD nie wywnioskują schematu pobieranych danych i wymagają od użytkownika ich określenia.

Dane w ramkach Data Spark są automatycznie optymalizowane przez optymalizator zapytań. Przed rozpoczęciem jakichkolwiek obliczeń w DataFrame optymalizator Catalyst kompiluje operacje użyte do zbudowania DataFrame w fizycznym planie wykonania. Ponieważ optymalizator rozumie semantykę operacji i strukturę danych, może podejmować inteligentne decyzje w celu przyspieszenia obliczeń.

Ograniczenie DataFrame

Bezpieczeństwo typu kompilacji: Interfejs API Dataframe nie obsługuje bezpieczeństwa czasu kompilacji, co ogranicza manipulowanie danymi, gdy struktura nie jest znana.



Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow