R Language
スパークAPI(SparkR)
サーチ…
備考
SparkR
パッケージを使用すると、 Sparkクラスタの上に分散データフレームをSparkR
ことができます 。これにより、非常に大きなデータセットでの選択、フィルタリング、集約などの操作を実行できます。 SparkRの概要 SparkRのパッケージドキュメント
スパークコンテキストの設定
RでSparkコンテキストを設定する
Sparks分散データフレームの作業を開始するには、Rプログラムを既存のSpark Clusterに接続する必要があります。
library(SparkR)
sc <- sparkR.init() # connection to Spark context
sqlContext <- sparkRSQL.init(sc) # connection to SQL context
ここに関する情報ですスパーククラスタにあなたのIDEを接続する方法。
スパーククラスタを取得する
インストール手順については、 Apache Sparkの紹介トピックがあります。基本的には、Spark ClusterをJava( 命令を参照 )または(無料ではない)クラウドアプリケーション( Microsoft Azure [トピックサイト] 、 IBMなど )を使用してローカルに採用することができます。
データをキャッシュする
何:
キャッシングはSparkで計算を最適化できます。キャッシングはデータをメモリに格納し、永続性の特殊なケースです。 ここでは 、SparkでRDDをキャッシュするとどうなるか説明します。
なぜ:
基本的には、キャッシュは、元のデータの中間結果(通常は変換後)を保存します。したがって、キャッシュされたRDDを使用すると、以前の変換を再計算することなく、メモリからすでに変換されたデータにアクセスできます。
どうやって:
ここでは、大容量データ(ここでは3GBの大容量csv)を、メモリ内のストレージから一度アクセスするとすぐにそのデータにすばやくアクセスする方法の例を示します。
library(SparkR)
# next line is needed for direct csv import:
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"')
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
# loading 3 GB big csv file:
train <- read.df(sqlContext, "/train.csv", source = "com.databricks.spark.csv", inferSchema = "true")
cache(train)
system.time(head(train))
# output: time elapsed: 125 s. This action invokes the caching at this point.
system.time(head(train))
# output: time elapsed: 0.2 s (!!)
RDDの作成(復元力のある分散データセット)
データフレームから:
mtrdd <- createDataFrame(sqlContext, mtcars)
csvから:
csvの場合、Sparkコンテキストを開始する前に、環境にcsvパッケージを追加する必要があります。
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.4.0" "sparkr-shell"') # context for csv import read csv ->
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
次に、列のデータのデータスキーマを推測することによってcsvをロードすることができます。
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", inferSchema = "true")
または、事前にデータスキーマを指定することによって:
customSchema <- structType(
structField("margin", "integer"),
structField("gross", "integer"),
structField("name", "string"))
train <- read.df(sqlContext, "/train.csv", header= "true", source = "com.databricks.spark.csv", schema = customSchema)