apache-spark
パーティション
サーチ…
備考
パーティションの数は、アプリケーションのパフォーマンスおよび/または正常終了のために重要です。
Resilient Distributed Dataset(RDD)はSparkの主な抽象です。 RDDはパーティションに分割されます。つまり、パーティションはデータセットの一部、スライス、つまりそのチャンクです。
パーティションの数が多いほど、各パーティションのサイズは小さくなります。
しかし、多数のパーティションがHadoop Distributed File System(HDFS)に多くの負荷をかけることに注意してください.HDOSは大量のメタデータを保持しなければなりません。
パーティションの数はメモリ使用量に関連しており、memoryOverheadの問題はこの数( 個人的な経験 )に関連している可能性があります。
新しいユーザーにとっての一般的な落とし穴は、RDDを1つのパーティションでRDDに変換することです。通常は次のようになります。
data = sc.textFile(file)
data = data.coalesce(1)
これは通常、非常に悪い考えです.Sparkにすべてのデータを1つのパーティションにするように指示しているからです!覚えていること:
Sparkのステージは、一度に1つのパーティションで動作します(そのパーティションのデータをメモリにロードします)。
その結果、Sparkにすべてのデータを一度に処理させるように指示します。通常は、メモリ関連のエラー(メモリ不足など)やnullポインタ例外が発生します。
だから、あなたが何をしているのか分からなければ、1つのパーティションでRDDを再分割しないでください!
パーティションイントロ
RDDはどのように分割されますか?
デフォルトでは、HDFSパーティションごとにパーティションが作成されます。デフォルトでは64MBです。詳細はこちらをご覧ください 。
パーティション間でデータのバランスを取る方法は?
まず、データを再分割する3つの方法を見てみましょう。
あなたのRDDに必要な最小のパーティション数である2番目のパラメータをtextFile()に渡しますが、注意してください:
[14]では:lines = sc.textFile( "data")
In [15]:lines.getNumPartitions()Out [15]:1000
[16]では、lines = sc.textFile( "data"、500)
In [17]:lines.getNumPartitions()Out [17]:1434
[18]で:lines = sc.textFile( "data"、5000)
In [19]:lines.getNumPartitions()Out [19]:5926
ご覧のように、 [16]
は、RDDのパーティション数がすでに要求されているパーティションの最小数よりも大きいため、期待することはしません。
次のように、 repartition()を使用します。
[22]で:lines = lines.repartition(10)
[23]で:lines.getNumPartitions()Out [23]:10
警告:これはシャッフルを呼び出すため、RDDのパーティション数を増やしたい場合に使用してください。
ドキュメントから:
シャッフルは、パーティション間でグループ分けされるように、データを再配布するためのSparkの仕組みです。これには通常、エグゼキュータおよびマシン間でデータをコピーする必要があるため、シャッフルは複雑でコストのかかる操作になります。
次のようにcoalesce()を使用します。
[25]で:lines = lines.coalesce(2)
In [26]:lines.getNumPartitions()Out [26]:2
スパークは、RDDを縮小し、それを利用することを認識しています。 repartition()とcoalesce()の詳細を読む。
しかし、これですべてのデータがパーティション間で完全にバランスされることが保証されますか?実際には、パーティション間でデータのバランスを取る方法で経験したように?
RDDのパーティション
「解説」で述べたように、パーティションはRDDのパーツ/スライス/チャンクです。以下は、RDDの最小パーティション数を要求する方法の最小限の例です。
In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)
In [2]: mylistRDD.getNumPartitions()
Out[2]: 2
[1]でparallelize()
2番目のパラメータとして2を渡したことに注目してください。このパラメータは、私たちのRDDに少なくとも2つのパーティションが必要であることを示しています。
RDDを再分割する
時には、私たちが作成していないファイルから来ているなど、RDDを再分割する必要があります。作成者から定義されたパーティションの数は、私たちが望むものではありません。
これを達成するための2つの最もよく知られている関数は次のとおりです。
repartition(numPartitions)
そして:
coalesce(numPartitions, shuffle=False)
経験則として、より多くのパーティションでRDDを再分割する場合は最初のパーティションを使用し、パーティションの数を減らしてRDDを減らす場合は、最初のパーティションを使用してください。 spark - repartition()vs coalesce() 。
例えば:
data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions
このRDDがtextFile()
によって読み込まれたときに100以上のパーティションを持つことを考えると、 'data'というRDDのパーティション数を100にtextFile()
ます。
同様の方法で、RDDの現在のパーティション数を超えたい場合は、次のようにします(RDDがたとえば200パーティションに分散されている場合)。
data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions
パーティションの数についてのThumbの規則
経験則として、RDDにはエグゼキュータの数と使用済みコアの数を3(または多分4)の積で割ったものが必要です。もちろん、これはヒューリスティックであり、アプリケーション、データセット、およびクラスタ構成に大きく依存します。
例:
In [1]: data = sc.textFile(file)
In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))
In [3]: data = data.coalesce(total_cores * 3)
RDDコンテンツを表示する
RDDの内容を表示するには、印刷する必要があります。
myRDD.foreach(println)
印刷する行数を制限するには:
myRDD.take(num_of_rows).foreach(println)