Zoeken…


Spark DataFrames met JAVA

Een DataFrame is een gedistribueerde verzameling gegevens georganiseerd in benoemde kolommen. Het is conceptueel equivalent aan een tabel in een relationele database. DataFrames kunnen worden opgebouwd uit een breed scala aan bronnen, zoals: gestructureerde gegevensbestanden, tabellen in Hive, externe databases of bestaande RDD's.

Een Oracle RDBMS-tabel in een vonkendataframe lezen:

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

We kunnen dit dataframe ook terugzetten naar rdd indien nodig:

JavaRDD<Row> rdd=df.javaRDD();

Maak een dataframe van een bestand:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Nu hebben we een gegevensframe van Oracle en een bestand. Op dezelfde manier kunnen we ook een tabel van bijenkorf lezen. Op dataframe kunnen we elke kolom ophalen zoals we doen in rdbms. Krijg bijvoorbeeld een minimumwaarde voor een kolom of een maximumwaarde. Kan een gemiddelde / gemiddelde voor een kolom berekenen. Enkele andere functies zoals selecteren, filteren, agg, groupBy zijn ook beschikbaar.

Spark Dataframe uitgelegd

In Spark is een DataFrame een gedistribueerde verzameling gegevens georganiseerd in benoemde kolommen. Het is conceptueel equivalent aan een tabel in een relationele database of een gegevensframe in R / Python, maar met rijkere optimalisaties onder de motorkap. DataFrames kunnen worden opgebouwd uit een breed scala aan bronnen, zoals gestructureerde gegevensbestanden, tabellen in Hive, externe databases of bestaande RDD's.

Manieren om Dataframe te maken

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") in het val df = spark.read.format("com.databricks.spark.csv").load("test.txt") , kunt u een header, scheidingsteken, tekenset en nog veel meer opgeven

u kunt ook Dataframe maken van een RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Als u df met schema wilt maken

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Waarom we Dataframe nodig hebben als Spark RDD heeft geleverd

Een RDD is slechts een veerkrachtige gedistribueerde gegevensset die meer een blackbox met gegevens is die niet kan worden geoptimaliseerd omdat de bewerkingen die erop kunnen worden uitgevoerd, niet zo beperkt zijn.

Geen ingebouwde optimalisatie-engine: bij het werken met gestructureerde gegevens kunnen RDD's niet profiteren van de geavanceerde optimizers van Spark, waaronder katalysator-optimizer en Tungsten-engine. Ontwikkelaars moeten elke RDD optimaliseren op basis van de kenmerken. Omgaan met gestructureerde gegevens: in tegenstelling tot Dataframe en datasets, leiden RDD's het schema van de ingenomen gegevens niet af en moeten de gebruikers deze specificeren.

DataFrames in Spark worden automatisch geoptimaliseerd door een query-optimizer. Voordat een berekening op een DataFrame begint, compileert de Catalyst-optimalisatie de bewerkingen die werden gebruikt om het DataFrame in een fysiek uitvoeringsplan op te bouwen. Omdat de optimizer de semantiek van bewerkingen en de structuur van de gegevens begrijpt, kan hij intelligente beslissingen nemen om de berekening te versnellen.

Beperking van DataFrame

Compilatie-type beveiliging: Dataframe API biedt geen ondersteuning voor compilatie-tijdbeveiliging die u beperkt in het manipuleren van gegevens wanneer de structuur niet bekend is.



Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow