Поиск…


Spark DataFrames с JAVA

DataFrame представляет собой распределенный сбор данных, организованный в именованные столбцы. Он концептуально эквивалентен таблице в реляционной базе данных. DataFrames может быть построен из широкого спектра источников, таких как: структурированные файлы данных, таблицы в Hive, внешние базы данных или существующие RDD.

Чтение таблицы RDBMS Oracle в кадр свечной информации ::

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

Мы также можем преобразовать этот кадр данных в rdd, если это необходимо:

JavaRDD<Row> rdd=df.javaRDD();

Создайте фрейм данных из файла:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Теперь у нас есть кадр данных из oracle, а также из файла. Точно так же мы можем прочитать таблицу и из улья. В кадре данных мы можем получить любой столбец, как в rdbms. Как получить минимальное значение для столбца или максимального значения. Можно вычислить среднее значение / avg для столбца. Также доступны некоторые другие функции, такие как select, filter, agg, groupBy.

Spark Dataframe объяснил

В Spark DataFrame представляет собой распределенный сбор данных, организованный в именованные столбцы. Он концептуально эквивалентен таблице в реляционной базе данных или кадре данных в R / Python, но с более богатыми оптимизациями под капотом. DataFrames может быть построен из широкого спектра источников, таких как структурированные файлы данных, таблицы в Hive, внешние базы данных или существующие RDD.

Способы создания Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") в поле параметров, вы можете предоставить заголовок, разделитель, кодировку и многое другое

вы также можете создать Dataframe из RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Если вы хотите создать df со схемой

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Почему нам нужен Dataframe, если Spark предоставил RDD

RDD - это просто Resilient Distributed Dataset, который представляет собой более черный ящик данных, которые нельзя оптимизировать, поскольку операции, которые могут быть выполнены против него, не так ограничены.

Нет встроенного механизма оптимизации: при работе со структурированными данными RDD не могут воспользоваться преимуществами усовершенствованных оптимизаторов Spark, включая оптимизатор катализатора и механизм выполнения вольфрама. Разработчикам необходимо оптимизировать каждый RDD на основе его атрибутов. Обработка структурированных данных. В отличие от Dataframe и наборов данных, RDD не выводят схему проглатываемых данных и требуют от пользователя ее указания.

DataFrames в Spark автоматически выполняет оптимизацию с помощью оптимизатора запросов. Перед началом любых вычислений в DataFrame оптимизатор Catalyst компилирует операции, которые были использованы для построения DataFrame в физический план для выполнения. Поскольку оптимизатор понимает семантику операций и структуру данных, он может принимать интеллектуальные решения для ускорения вычислений.

Ограничение DataFrame

Безопасность типа компиляции: API Dataframe не поддерживает безопасность времени компиляции, которая ограничивает вас от манипулирования данными, когда структура неизвестна.



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow