サーチ…


備考

パーティションの数は、アプリケーションのパフォーマンスおよび/または正常終了のために重要です。

Resilient Distributed Dataset(RDD)はSparkの主な抽象です。 RDDはパーティションに分割されます。つまり、パーティションはデータセットの一部、スライス、つまりそのチャンクです。

パーティションの数が多いほど、各パーティションのサイズは小さくなります。

しかし、多数のパーティションがHadoop Distributed File System(HDFS)に多くの負荷をかけることに注意してください.HDOSは大量のメタデータを保持しなければなりません。

パーティションの数はメモリ使用量に関連しており、memoryOverheadの問題はこの数( 個人的な経験 )に関連している可能性があります。


新しいユーザーにとっての一般的な落とし穴は、RDDを1つのパーティションでRDDに変換することです。通常は次のようになります。

data = sc.textFile(file)
data = data.coalesce(1) 

これは通常、非常に悪い考えです.Sparkにすべてのデータを1つのパーティションにするように指示しているからです!覚えていること:

Sparkのステージは、一度に1つのパーティションで動作します(そのパーティションのデータをメモリにロードします)。

その結果、Sparkにすべてのデータを一度に処理させるように指示します。通常は、メモリ関連のエラー(メモリ不足など)やnullポインタ例外が発生します。

だから、あなたが何をしているのか分からなければ、1つのパーティションでRDDを再分割しないでください!

パーティションイントロ

RDDはどのように分割されますか?

デフォルトでは、HDFSパーティションごとにパーティションが作成されます。デフォルトでは64MBです。詳細はこちらをご覧ください

パーティション間でデータのバランスを取る方法は?

まず、データを再分割する3つの方法を見てみましょう。

  1. あなたのRDDに必要な最小のパーティション数である2番目のパラメータをtextFile()渡しますが、注意してください:

    [14]では:lines = sc.textFile( "data")

    In [15]:lines.getNumPartitions()Out [15]:1000

    [16]では、lines = sc.textFile( "data"、500)

    In [17]:lines.getNumPartitions()Out [17]:1434

    [18]で:lines = sc.textFile( "data"、5000)

    In [19]:lines.getNumPartitions()Out [19]:5926

ご覧のように、 [16]は、RDDのパーティション数がすでに要求されているパーティションの最小数よりも大きいため、期待することはしません。

  1. 次のように、 repartition()を使用します。

    [22]で:lines = lines.repartition(10)

    [23]で:lines.getNumPartitions()Out [23]:10

警告:これはシャッフルを呼び出すため、RDDのパーティション数を増やしたい場合に使用してください。

ドキュメントから:

シャッフルは、パーティション間でグループ分けされるように、データを再配布するためのSparkの仕組みです。これには通常、エグゼキュータおよびマシン間でデータをコピーする必要があるため、シャッフルは複雑でコストのかかる操作になります。

  1. 次のようにcoalesce()を使用ます。

    [25]で:lines = lines.coalesce(2)

    In [26]:lines.getNumPartitions()Out [26]:2

スパークは、RDDを縮小し、それを利用することを認識しています。 repartition()とcoalesce()の詳細を読む。


しかし、これですべてのデータがパーティション間で完全にバランスされることが保証されますか?実際には、パーティション間でデータのバランスを取る方法で経験したように?

RDDのパーティション

「解説」で述べたように、パーティションはRDDのパーツ/スライス/チャンクです。以下は、RDDの最小パーティション数を要求する方法の最小限の例です。

In [1]: mylistRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 2)

In [2]: mylistRDD.getNumPartitions()
Out[2]: 2

[1]でparallelize() 2番目のパラメータとして2を渡したことに注目してください。このパラメータは、私たちのRDDに少なくとも2つのパーティションが必要であることを示しています。

RDDを再分割する

時には、私たちが作成していないファイルから来ているなど、RDDを再分割する必要があります。作成者から定義されたパーティションの数は、私たちが望むものではありません。

これを達成するための2つの最もよく知られている関数は次のとおりです。

repartition(numPartitions)

そして:

coalesce(numPartitions, shuffle=False)

経験則として、より多くのパーティションでRDDを再分割する場合は最初のパーティションを使用し、パーティションの数を減らしてRDDを減らす場合は、最初のパーティションを使用してください。 spark - repartition()vs coalesce()

例えば:

data = sc.textFile(file)
data = data.coalesce(100) // requested number of #partitions

このRDDがtextFile()によって読み込まれたときに100以上のパーティションを持つことを考えると、 'data'というRDDのパーティション数を100にtextFile()ます。

同様の方法で、RDDの現在のパーティション数を超えたい場合は、次のようにします(RDDがたとえば200パーティションに分散されている場合)。

data = sc.textFile(file)
data = data.repartition(300) // requested number of #partitions

パーティションの数についてのThumbの規則

経験則として、RDDにはエグゼキュータの数と使用済みコアの数を3(または多分4)の積で割ったものが必要です。もちろん、これはヒューリスティックであり、アプリケーション、データセット、およびクラスタ構成に大きく依存します。

例:

In [1]: data  = sc.textFile(file)

In [2]: total_cores = int(sc._conf.get('spark.executor.instances')) * int(sc._conf.get('spark.executor.cores'))

In [3]: data = data.coalesce(total_cores * 3)      

RDDコンテンツを表示する

RDDの内容を表示するには、印刷する必要があります。

myRDD.foreach(println)

印刷する行数を制限するには:

myRDD.take(num_of_rows).foreach(println)


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow