apache-spark
Apache Spark DataFrames का परिचय
खोज…
स्पार्क डाटाफ्रेम जेएवीए के साथ
एक DataFrame नाम स्तंभों में आयोजित डेटा का एक वितरित संग्रह है। यह एक संबंधपरक डेटाबेस में तालिका के समतुल्य है। DataFrames का निर्माण कई प्रकार के स्रोतों से किया जा सकता है जैसे: संरचित डेटा फ़ाइलें, हाइव में टेबल, बाहरी डेटाबेस या मौजूदा RDDs।
स्पार्क डेटा फ्रेम में एक Oracle RDBMS टेबल पढ़ना ::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
अगर जरूरत पड़ी तो हम इस डेटा फ्रेम को वापस rdd में भी बदल सकते हैं:
JavaRDD<Row> rdd=df.javaRDD();
किसी फ़ाइल से डेटाफ़्रेम बनाएँ:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
अब हमारे पास oracle से डेटा फ्रेम है और साथ ही फाइल से। इसी तरह हम हाइव से भी एक टेबल पढ़ सकते हैं। डेटा फ्रेम पर हम किसी भी कॉलम को ला सकते हैं जैसा कि हम rdbms में करते हैं। जैसे किसी कॉलम या अधिकतम मूल्य के लिए न्यूनतम मान प्राप्त करें। किसी स्तंभ के लिए माध्य / औसत की गणना कर सकते हैं। कुछ अन्य कार्य जैसे चयन, फ़िल्टर, एग, ग्रुपबी भी उपलब्ध हैं।
स्पार्क डेटाफ़्रेम ने समझाया
स्पार्क में, एक DataFrame नाम स्तंभों में आयोजित डेटा का एक वितरित संग्रह है। यह एक संबंधपरक डेटाबेस या आर / पायथन में एक डेटा फ्रेम की मेज के बराबर है, लेकिन हुड के तहत समृद्ध अनुकूलन के साथ। DataFrames का निर्माण विभिन्न प्रकार के स्रोतों से किया जा सकता है जैसे संरचित डेटा फ़ाइलें, हाइव में टेबल, बाहरी डेटाबेस या मौजूदा RDFs।
डेटाफ्रेम बनाने के तरीके
val data= spark.read.json("path to json")
val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
विकल्प फ़ील्ड में val df = spark.read.format("com.databricks.spark.csv").load("test.txt")
, आप हेडर, सीमांकक, चारसेट और बहुत कुछ प्रदान कर सकते हैं।
आप RDD से डेटाफ्रेम भी बना सकते हैं
val rdd = sc.parallelize(
Seq(
("first", Array(2.0, 1.0, 2.1, 5.4)),
("test", Array(1.5, 0.5, 0.9, 3.7)),
("choose", Array(8.0, 2.9, 9.1, 2.5))
)
)
val dfWithoutSchema = spark.createDataFrame(rdd)
यदि आप स्कीमा के साथ df बनाना चाहते हैं
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
अगर स्पार्क ने आरडीडी प्रदान किया है तो हमें डेटाफ्रेम की आवश्यकता क्यों है
एक RDD महज एक लचीला डिस्ट्रीब्यूटेड डेटासेट है जो कि डेटा के एक ब्लैकबॉक्स से अधिक है, जो कि इसके खिलाफ किए जाने वाले ऑपरेशन के रूप में अनुकूलित नहीं किया जा सकता है, विवश नहीं हैं।
कोई इनबिल्ट ऑप्टिमाइज़ेशन इंजन नहीं: संरचित डेटा के साथ काम करने पर, RDD उत्प्रेरक उत्प्रेरक अनुकूलक और टंगस्टन निष्पादन इंजन सहित स्पार्क के उन्नत ऑप्टिमाइज़र का लाभ नहीं ले सकता है। डेवलपर्स को अपनी विशेषताओं के आधार पर प्रत्येक RDD का अनुकूलन करने की आवश्यकता होती है। संरचित डेटा को संभालना: डेटाफ्रेम और डेटासेट के विपरीत, RDD अंतर्ग्रहण डेटा के स्कीमा का अनुमान नहीं लगाते हैं और उपयोगकर्ता को इसे निर्दिष्ट करने की आवश्यकता होती है।
स्पार्क में डेटाफ्रेम का निष्पादन स्वचालित रूप से एक क्वेरी ऑप्टिमाइज़र द्वारा अनुकूलित होता है। किसी DataFrame पर कोई गणना शुरू होने से पहले, उत्प्रेरक ऑप्टिमाइज़र उन ऑपरेशनों को संकलित करता है जिनका उपयोग DataFrame को निष्पादन के लिए एक भौतिक योजना में बनाने के लिए किया गया था। क्योंकि ऑप्टिमाइज़र डेटा के संचालन और संरचना के शब्दार्थ को समझता है, यह कम्प्यूटेशन को गति देने के लिए बुद्धिमान निर्णय ले सकता है।
डेटाफ्रेम की सीमा
कंपाइल-टाइम टाइप सेफ्टी: डेटाफ्रेम एपीआई कंपाइल टाइम सेफ्टी का समर्थन नहीं करता है, जो स्ट्रक्चर का पता नहीं चलने पर आपको डेटा में हेरफेर करने से बचाता है।