Sök…


Spark DataFrames med JAVA

En DataFrame är en distribuerad samling av data organiserade i namngivna kolumner. Det är konceptuellt ekvivalent med en tabell i en relationsdatabas. DataFrames kan konstrueras från ett brett utbud av källor som: strukturerade datafiler, tabeller i Hive, externa databaser eller befintliga RDD: er.

Läsa en Oracle RDBMS-tabell i gnistdataram:

SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

sparkConf.registerKryoClasses(new Class<?>[]{  
        Class.forName("org.apache.hadoop.io.Text"),
        Class.forName("packageName.className")
});

JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);

Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format

Vi kan också konvertera denna dataram till rdd om det behövs:

JavaRDD<Row> rdd=df.javaRDD();

Skapa en dataframe från en fil:

public class LoadSaveTextFile {

    //static schema class
    public static class Schema implements Serializable {

        public String getTimestamp() {
            return timestamp;
        }
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
        public String getMachId() {
            return machId;
        }
        public void setMachId(String machId) {
            this.machId = machId;
        }
        public String getSensorType() {
            return sensorType;
        }
        public void setSensorType(String sensorType) {
            this.sensorType = sensorType;
        }    
        
        //instance variables
        private String timestamp;
        private String machId;
        private String sensorType;
    }
    
    public static void main(String[] args) throws ClassNotFoundException {

        SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");

        sparkConf.registerKryoClasses(new Class<?>[]{  
                Class.forName("org.apache.hadoop.io.Text"),
                Class.forName("oracle.table.join.LoadSaveTextFile")
        });

        JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
        SQLContext sqlcontext= new SQLContext(sparkContext);

        //we have a file which ";" separated 
        String filePath=args[0];
    
        JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
                new Function<String, Schema>() {
                    public Schema call(String line) throws Exception {
                        String[] tokens=line.split(";");
                        Schema schema = new Schema();
                        schema.setMachId(tokens[0]);
                        schema.setSensorType(tokens[1]);
                        schema.setTimestamp(tokens[2]);
                        return schema;
                    }
                });

        DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
        df.show();
    }   
}

Nu har vi dataram från oracle och från en fil. På liknande sätt kan vi också läsa en tabell från bikupa. På dataram kan vi hämta valfri kolumn som vi gör i rdbms. Som att få ett minvärde för en kolumn eller maxvärde. Kan beräkna ett medelvärde / medelvärde för en kolumn. Vissa andra funktioner som select, filter, agg, groupBy finns också tillgängliga.

Spark Dataframe förklarade

I Spark är en DataFrame en distribuerad samling av data organiserade i namngivna kolumner. Det är konceptuellt ekvivalent med en tabell i en relationsdatabas eller en dataram i R / Python, men med rikare optimeringar under huven. DataFrames kan konstrueras från en mängd olika källor som strukturerade datafiler, tabeller i Hive, externa databaser eller befintliga RDD: er.

Sätt att skapa Dataframe

val data= spark.read.json("path to json")

val df = spark.read.format("com.databricks.spark.csv").load("test.txt") i alternativfältet, du kan tillhandahålla rubrik, avgränsare, charset och mycket mer

Du kan också skapa Dataframe från en RDD

val rdd = sc.parallelize(
  Seq(
    ("first", Array(2.0, 1.0, 2.1, 5.4)),
    ("test", Array(1.5, 0.5, 0.9, 3.7)),
    ("choose", Array(8.0, 2.9, 9.1, 2.5))
  )
)

val dfWithoutSchema = spark.createDataFrame(rdd)

Om du vill skapa df med schema

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

Varför vi behöver Dataframe om Spark har tillhandahållit RDD

En RDD är bara en fjädrande distribuerad databas som mer är en blackbox med data som inte kan optimeras eftersom de operationer som kan utföras mot den inte är lika begränsade.

Ingen inbyggd optimeringsmotor: När man arbetar med strukturerad data kan RDD: er inte dra fördel av Sparks avancerade optimisatorer inklusive katalysatoroptimerare och Tungsten exekveringsmotor. Utvecklare måste optimera varje RDD baserat på dess attribut. Hantera strukturerade data: Till skillnad från Dataframe och datasätt drar RDD inte ut schemat för den intagna informationen och kräver att användaren anger den.

DataFrames i Spark optimeras automatiskt med en frågaoptimerare. Innan någon beräkning på en DataFrame startar, sammanställer Catalyst-optimatorn de operationer som användes för att bygga DataFrame till en fysisk plan för exekvering. Eftersom optimeringsprogrammet förstår semantiken i operationerna och strukturen för data, kan den fatta intelligenta beslut för att påskynda beräkningen.

Begränsning av DataFrame

Säkerhet för kompileringstid: Dataframe API stöder inte kompilering av tidssäkerhet som begränsar dig från att manipulera data när strukturen inte är känd.



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow