サーチ…


pmap

pmapは(あなたが指定した)関数をとり、それを配列のすべての要素に適用します。この作品は利用可能な労働者の間で分かれています。 pmapはその関数の結果を別の配列に戻します。

addprocs(3)
sqrts = pmap(sqrt, 1:10)

関数に複数の引数がある場合は、複数のベクトルをpmapことができます

dots = pmap(dot, 1:10, 11:20)

しかし、 @parallel@parallelpmap与えられた関数がJulia基底にない場合(つまり、ユーザ定義であるかパッケージで定義されている場合)、関数がすべてのワーカーに最初に利用可能であることを確認する必要があります。

@everywhere begin
    function rand_det(n)
        det(rand(n,n))
    end
end

determinants = pmap(rand_det, 1:10)

この SO Q&Aも参照してください。

@平行

@parallelを使ってループを並列化し、ループのステップを別のワーカーに分割することができます。非常に簡単な例として、

addprocs(3)

a = collect(1:10)

for idx = 1:10
    println(a[idx])
end

もう少し複雑な例を考えてみましょう:

@time begin
    @sync begin
        @parallel for idx in 1:length(a)
            sleep(a[idx])
        end
    end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55

したがって、私たちが@parallelなしでこのループを実行した場合、27秒ではなく55秒を実行することになります。

@parallelマクロのための縮小演算子を与えることもできます。配列があるとします。配列の各列を合計し、これらの合計を相互に掛けたいとします。

A = rand(100,100);

@parallel (*) for idx = 1:size(A,1)
    sum(A[:,idx])
end

予期せぬ動作を避けるために@parallelを使用するときに留意すべきいくつかの重要なことがあります。

第一に 、あなたのループ内でJuliaにない関数(スクリプトで定義する関数やパッケージからインポートする関数など)を使用する場合は、その関数をワーカーがアクセスできるようにする必要があります。したがって、たとえば、次のようには動作しません

myprint(x) = println(x)
for idx = 1:10
    myprint(a[idx])
end

代わりに、次のものを使用する必要があります。

@everywhere begin
    function myprint(x) 
        println(x)
    end
end

@parallel for idx in 1:length(a)
    myprint(a[idx])
end

第2に、各作業者はコントローラーの範囲内のオブジェクトにアクセスできますが、変更することはできません 。従って

a = collect(1:10)
@parallel for idx = 1:length(a)
   a[idx] += 1
end

julia> a'
1x10 Array{Int64,2}:
 1  2  3  4  5  6  7  8  9  10

一方、@parallelを使ってループを実行した場合、配列a正常に変更できましa

これにSharedArrayするためaSharedArray型オブジェクトを作成して、各ワーカーがアクセスして変更できるようにすることができます。

a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
    a[idx] += 1
end

julia> a'
1x10 Array{Float64,2}:
 2.0  3.0  4.0  5.0  6.0  7.0  8.0  9.0  10.0  11.0

@spawnと@spawnat

マクロ@spawn@spawnatは、Juliaが作業を作業者に割り当てるために使用できるツールの2つです。次に例を示します。

julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)

julia>  From worker 2:  hello world

これらのマクロは両方とも、ワーカープロセス上のを評価します。この2つの唯一の違いは、 @spawnat使用すると、表現式を評価するワーカー(ワーカー2の上の例では例を指定)を選択できる一方、 @spawnではワーカーは自動的に選択されます。

上の例では、worker2にprintln関数を実行させるだけでした。そこから戻ったり回収したりするのに興味のあるものは何もなかった。しかし、しばしば私たちが労働者に送った表現は、私たちが取り出そうとしているものをもたらすでしょう。上記の例では、 @spawnat 、worker 2からの出力を得る前に、次のことが分かりました:

RemoteRef{Channel{Any}}(2,1,3)

これは、 @spawnatマクロがRemoteRef型オブジェクトを返すことを示します。このオブジェクトには、ワーカーに送信される式の戻り値が含まれます。これらの値を取得する場合は、最初に@spawnatが返すRemoteRefをオブジェクトに割り当ててから、次にRemoteRef型オブジェクトで動作するfetch()関数を使用して、労働者。

julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)

julia> fetch(result)
7

効果的に@spawnを使用するための鍵は、それが動作する式の背後にある性質を理解することです。 @spawnを使って作業者にコマンドを送るのは、作業者の一人に「インタプリタ」を実行している場合や、コードをネイティブに実行している場合に入力するものを直接入力することよりも少し複雑です。たとえば、 @spawnatを使用してワーカーの変数に値を割り当てることを考えたとします。私たちは試みるかもしれない:

@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)

それは動作しましたか?さて、作業員2にaを印刷させてみましょう。

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)

julia> 

何も起こらなかった。どうして?上記のようにfetch()を使用すると、これ以上調べることができます。 fetch()は成功した結果だけでなくエラーメッセージも取り出すので非常に便利です。それがなければ、何かが間違っていることを知ることさえできないかもしれません。

julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)

julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined

エラーメッセージには、ワーカー2にはaが定義されていないと表示されますが、これはなぜですか?その理由は、私たちが@spawnを使って評価するように作業者に指示する式に代入演算をラップする必要があるからです。以下は説明の例です。

julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)

julia>  From worker 2:  2

:()構文は、Juliaがを指定するために使用する構文です。 Juliaで式を評価するeval()関数を使用し、 @spawnatマクロを使用して式がワーカー2で評価されるように指示します。

同じ結果を得ることもできます:

julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)

julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)

julia>  From worker 2:  5

この例は、2つの追加の概念を示しています。まず、文字列に対して呼び出されるparse()関数を使用して式を作成できることがわかります。次に、 @spawnat呼び出すときに括弧を使用できることが@spawnatます。これにより、構文がより明確で扱いやすいものになる場合があります。

@parallelとpmapの使い分け

Juliaの文書には、

pmap()は、各関数呼び出しが大量の作業を行う場合に設計されています。対照的に、@ for parallelは、各反復が小さい状況を扱うことができます。恐らく2つの数値を合計するだけです。

これにはいくつかの理由があります。第一に、 pmapは労働者の仕事を開始するための起動コストが大きくなります。したがって、ジョブが非常に小さい場合、これらの起動コストは非効率になる可能性があります。逆に、 pmapは作業者にジョブを割り当てるよりスマートな仕事をしています。特に、それはジョブのキューを構築し、そのワーカーが利用可能になるたびに新しいジョブを各ワーカーに送信します。 @parallel対照的には、それが呼び出された労働者の間で行われるすべての作業をdivvies。そのため、一部の労働者が他の労働者よりも自分の仕事で時間がかかる場合、あなたの労働者の大部分が仕事を終え、アイドル状態に陥ることがあります。しかし、このような状況は、非常に小さく簡単な仕事では起こりにくい。

これは次のとおりです:2人の労働者がいて、そのうちの1人が遅く、もう1人が2倍の労働者を抱えているとします。理想的には、速い労働者に遅い労働者の2倍の仕事を与えたいと考えています。 (あるいは、私たちは仕事が遅くて遅いかもしれませんが、元本はまったく同じです)。 pmapはこれを達成しますが、 @parallelはそうしません。

各テストについて、次のように初期化します。

addprocs(2)

@everywhere begin
    function parallel_func(idx)
        workernum = myid() - 1 
        sleep(workernum)
        println("job $idx")
    end
end

さて、 @parallelテストのために、以下を実行します:

@parallel for idx = 1:12
    parallel_func(idx)
end

そして、印刷出力を取り戻す:

julia>     From worker 2:    job 1
    From worker 3:    job 7
    From worker 2:    job 2
    From worker 2:    job 3
    From worker 3:    job 8
    From worker 2:    job 4
    From worker 2:    job 5
    From worker 3:    job 9
    From worker 2:    job 6
    From worker 3:    job 10
    From worker 3:    job 11
    From worker 3:    job 12

それはほとんど甘いです。労働者は仕事を均等に「共有」しています。労働者2は労働者3の2倍の速さであっても、各労働者は6つの仕事を完了していることに注意してください。接触しているかもしれませんが、非効率です。

pmapテストのために、私は以下を実行します:

pmap(parallel_func, 1:12)

出力を取得する:

From worker 2:    job 1
From worker 3:    job 2
From worker 2:    job 3
From worker 2:    job 5
From worker 3:    job 4
From worker 2:    job 6
From worker 2:    job 8
From worker 3:    job 7
From worker 2:    job 9
From worker 2:    job 11
From worker 3:    job 10
From worker 2:    job 12

ここで、作業員2は8つのジョブを実行し、作業者3は4を実行したことに注意してください。これは、速度に比例しており、効率を最適化するためです。 pmapはそれぞれの能力に応じて、ハードタスクマスターです。

@asyncと@sync

?@asyncのドキュメントによれば、「 @async 、式をタスクにラップします。これが意味することは、その範囲内にあるものについては、ジュリアはこのタスクを実行し始めますが、タスクが完了するのを待たずにスクリプトの次のものに進むことです。たとえば、マクロがなければ、次のようになります:

julia> @time sleep(2)
  2.005766 seconds (13 allocations: 624 bytes)

しかし、マクロを使用すると、次のようになります。

julia> @time @async sleep(2)
  0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0

julia> 

Juliaは、タスク(この場合は2秒間スリープする)が完了するのを待つことなく、スクリプトを進める(そして@timeマクロを完全に実行させる)ことができます。

対照的に、 @syncマクロは、 @async@spawn@spawnatおよび@parallelすべての動的に囲まれた使用が@asyncするまで待機します。 (「 ?@sync 」の文書に従って)。したがって、我々は以下を参照する:

julia> @time @sync @async sleep(2)
  2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00

この単純な例では、 @async@sync 1つのインスタンスを一緒に含める必要はありません。しかし、 @syncが便利なところは、 @asyncを複数の操作に適用して、それぞれが完了するのを待たずにすぐにすべてを開始できるようにすることです。

たとえば、複数の従業員がいるとします。それぞれの従業員を同時に作業を開始し、それらのタスクから結果を取得したいとします。最初の(しかし間違った)試みは次のようなものです:

addprocs(2)
@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 4.011576 seconds (177 allocations: 9.734 KB)

ここで問題となるのは、各remotecall_fetch()処理が完了するまでループが待機することです。つまり、次のremotecall_fetch()操作を続行する前に各プロセスが処理を完了する(この場合は2秒間スリープしますremotecall_fetch() 。現実的な状況では、私たちのプロセスは自分の仕事(すなわち睡眠)を同時に行っていないので、ここでは並列性の利点は得られません。

私たちは、の組み合わせで使用することによって、しかし、これを修正することができます@async@syncマクロを:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 2.009416 seconds (274 allocations: 25.592 KB)

ここで、ループの各ステップを別々の操作としてカウントすると、 @asyncマクロの前に2つの別々の操作があることが@asyncます。マクロは、これらのそれぞれが起動し、コードが終了する前に(この場合はループの次のステップに)進むことができます。しかし、使用@sync範囲全体のループを含むマクロは、我々はスクリプトがが先行するすべての操作まで、そのループを過ぎて進むことができないことを意味します@async完了しました。

これらのマクロの動作をさらに明確に理解するには、上記の例をさらに微調整して、特定の変更の下でどのように変化するかを確認することができます。例えば、 @async持たずに@sync持っているとします。

@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 0.001429 seconds (27 allocations: 2.234 KB)

ここで、 @asyncマクロを@asyncと、 @async remotecall_fetch()操作の実行が終了する前でもループを継続できます。しかし、 @syncながら、 remotecall_fetch()オペレーションがすべて終了するまで、コードがこのループを過ぎないようにするための@syncマクロはありません。

それにもかかわらず、各remotecall_fetch()操作は、いったん実行しても、並行して実行されます。 2秒間待つと、結果を含む配列aには次のものが含まれていることが分かります。

sleep(2)
julia> a
2-element Array{Any,1}:
 nothing
 nothing

(「nothing」要素は、スリープ関数の結果を正常にフェッチした結果であり、値を返さない)

また、2つのremotecall_fetch()操作は、それらの前にあるprintコマンドも速く連続して実行されるため、本質的に同時に開始することがremotecall_fetch()ます(これらのコマンドの出力はここには表示されません)。これを、次の例と対照すると、 printコマンドは互いに2秒遅れて実行されます。

@asyncマクロを(内部のステップではなく)ループ全体に置くと、 remotecall_fetch()操作が終了するのを待つことなくスクリプトが直ちに続行されます。しかし、今度は、スクリプトがループ全体を越えて進むことを許可します。前のループが完了する前にループの個々のステップを開始することはできません。したがって、上記の例とは異なり、スクリプトがループの後に続く2秒後、 results配列には2つ目のremotecall_fetch()操作がまだ完了していないことを示す#undefという要素が1つあります。

@time begin
    a = cell(nworkers())
    @async for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to

sleep(2)

a
2-element Array{Any,1}:
    nothing
 #undef    

驚くことではないが、私たちが@sync@async置くと、各remotecall_fetch()は(同時にではなくremotecall_fetch()順番に実行されますが、それぞれが終了するまでコード内で継続しません。言い換えれば、 sleep(2)@sync @async sleep(2)と本質的に同じように動作するのと同じように、

@time begin
    a = cell(nworkers())
    @sync @async for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10

また、 @asyncマクロのスコープ内でより複雑な操作を行うことも可能です。このドキュメントでは、 @asyncの範囲内にあるループ全体を含むサンプルを@asyncます。

同期のマクロのヘルプは、それがすることを述べていることを思い出してください「のすべての動的に囲まれた用途まで待って@async@spawn@spawnat@parallel完了しています。」 「完全」と@async目的のために、 @sync @asyncマクロと@asyncマクロの範囲内でタスクを定義する方法が重要です。上記の例の1つにわずかなバリエーションがあります。

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall(pid, sleep, 2)
    end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)

julia> a
2-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,3)
 RemoteRef{Channel{Any}}(3,1,4)

前の例では、実行に約2秒かかっていました。これは、2つのタスクが並行して実行されていることを示しています。しかし、この例では、時間評価がずっと短くなっています。その理由は、 @syncの目的のために@sync remotecall()操作は作業者に作業を送った後に「終了」しているからです。 (結果として得られる配列aにはRemoteRefオブジェクト型が含まれているにすぎないことに注意してください。これは、将来、ある時点でフェッチされる可能性のある特定のプロセスで起こっていることを示しています)。対照的に、 remotecall_fetch()操作は、作業が完了したというメッセージをワーカーから取得した場合にのみ「終了」します。

このように、あなたのスクリプトで作業を進める前に、特定の作業が完了していることを確認する方法を探しているなら(例えば、 この記事で議論されいるように)、「完全」とみなして考える方法と、それを測定し、それをスクリプトで操作してください。

労働者の追加

あなたが最初にJuliaを起動すると、デフォルトでは、実行中の単一のプロセスしか存在せず、作業を行うことができません。これを確認するには、

julia> nprocs()
1

並列処理を利用するには、最初に追加のワーカーを追加し、次に割り当てられた作業を実行できるようにする必要があります。 addprocs(n)を使用して、スクリプト内(またはインタプリタ側)でこれを行うことができますnは使用するプロセスの数です。

または、次のコマンドラインを使用してJuliaを起動するときに、プロセスを追加することもできます。

$ julia -p n

n追加するプロセスの追加数です。したがって、Juliaを

$ julia -p 2

ジュリアが始まると、私たちは次のようになるでしょう:

julia> nprocs()
3


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow