apache-spark
Введение в Apache Spark DataFrames
Поиск…
Spark DataFrames с JAVA
DataFrame представляет собой распределенный сбор данных, организованный в именованные столбцы. Он концептуально эквивалентен таблице в реляционной базе данных. DataFrames может быть построен из широкого спектра источников, таких как: структурированные файлы данных, таблицы в Hive, внешние базы данных или существующие RDD.
Чтение таблицы RDBMS Oracle в кадр свечной информации ::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
Мы также можем преобразовать этот кадр данных в rdd, если это необходимо:
JavaRDD<Row> rdd=df.javaRDD();
Создайте фрейм данных из файла:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
Теперь у нас есть кадр данных из oracle, а также из файла. Точно так же мы можем прочитать таблицу и из улья. В кадре данных мы можем получить любой столбец, как в rdbms. Как получить минимальное значение для столбца или максимального значения. Можно вычислить среднее значение / avg для столбца. Также доступны некоторые другие функции, такие как select, filter, agg, groupBy.
Spark Dataframe объяснил
В Spark DataFrame представляет собой распределенный сбор данных, организованный в именованные столбцы. Он концептуально эквивалентен таблице в реляционной базе данных или кадре данных в R / Python, но с более богатыми оптимизациями под капотом. DataFrames может быть построен из широкого спектра источников, таких как структурированные файлы данных, таблицы в Hive, внешние базы данных или существующие RDD.
Способы создания Dataframe
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
в поле параметров, вы можете предоставить заголовок, разделитель, кодировку и многое другое
вы также можете создать Dataframe из RDD
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
Если вы хотите создать df со схемой
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
Почему нам нужен Dataframe, если Spark предоставил RDD
RDD - это просто Resilient Distributed Dataset, который представляет собой более черный ящик данных, которые нельзя оптимизировать, поскольку операции, которые могут быть выполнены против него, не так ограничены.
Нет встроенного механизма оптимизации: при работе со структурированными данными RDD не могут воспользоваться преимуществами усовершенствованных оптимизаторов Spark, включая оптимизатор катализатора и механизм выполнения вольфрама. Разработчикам необходимо оптимизировать каждый RDD на основе его атрибутов. Обработка структурированных данных. В отличие от Dataframe и наборов данных, RDD не выводят схему проглатываемых данных и требуют от пользователя ее указания.
DataFrames в Spark автоматически выполняет оптимизацию с помощью оптимизатора запросов. Перед началом любых вычислений в DataFrame оптимизатор Catalyst компилирует операции, которые были использованы для построения DataFrame в физический план для выполнения. Поскольку оптимизатор понимает семантику операций и структуру данных, он может принимать интеллектуальные решения для ускорения вычислений.
Ограничение DataFrame
Безопасность типа компиляции: API Dataframe не поддерживает безопасность времени компиляции, которая ограничивает вас от манипулирования данными, когда структура неизвестна.