apache-spark
設定:Apache Spark SQL
サーチ…
前書き
このトピックでは、Sparkユーザーは、さまざまな構成のSpark SQLを見つけることができます。これは、Apache Sparkフレームワークで最もよく使用されるコンポーネントです。
Spark SQLシャッフルパーティションの制御
Apache Spark cogroup
、 join
やcogroup
などのシャッフル操作を行いながら、多くのデータがネットワークを介して転送されます。現在、シャッフルが発生するパーティションの数を制御するには、Spark SQLで指定された構成で制御できます。その構成は次のとおりです。
spark.sql.shuffle.partitions
この設定を使用して、シャッフル操作のパーティション数を制御できます。デフォルトでは、その値は200
です。しかし、我々は数GBのファイルを持っている場合、200のパーティションは意味をなさない。したがって、Spark SQLを使用して処理する必要があるデータの量に応じてそれらを変更する必要があります。次のように:
このシナリオでは、 employee
とdepartment
結合する2つのテーブルがあります。両方のテーブルにはほんのわずかのレコードしか含まれていませんが、各従業員の部門を知るためにこれらのテーブルに参加する必要があります。そこで、Spark DataFramesを使って次のように結合します。
val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)
val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")
employeeDF.join(departmentDF, "employeeName").show()
現在、結合を実行している間に作成されるパーティションの数は、デフォルトでは200ですが、この量のデータではあまりにも多くなります。
したがって、この値を変更して、シャッフル操作の回数を減らすことができます。
val conf = new SparkConf().setAppName("sample").setMaster("local").set("spark.sql.shuffle.partitions", 2)
val sc = new SparkContext(conf)
val employee = sc.parallelize(List("Bob", "Alice")).toDF("name")
val department = sc.parallelize(List(("Bob", "Accounts"), ("Alice", "Sales"))).toDF("name", "department")
employeeDF.join(departmentDF, "employeeName").show()
さて、シャッフル・パーティションの数は、2、シャッフル操作の数を減らすだけでなく、からのデータフレームを結合するためにかかる時間短縮だけでなく、これに還元される0.878505 s
に0.077847 s
。
したがって、シャッフル操作のパーティション数は、処理されるデータに従って常に設定してください。
Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow