Julia Language
並列処理
サーチ…
pmap
pmap
は(あなたが指定した)関数をとり、それを配列のすべての要素に適用します。この作品は利用可能な労働者の間で分かれています。 pmap
はその関数の結果を別の配列に戻します。
addprocs(3)
sqrts = pmap(sqrt, 1:10)
関数に複数の引数がある場合は、複数のベクトルをpmap
ことができます
dots = pmap(dot, 1:10, 11:20)
しかし、 @parallel
と@parallel
、 pmap
与えられた関数がJulia基底にない場合(つまり、ユーザ定義であるかパッケージで定義されている場合)、関数がすべてのワーカーに最初に利用可能であることを確認する必要があります。
@everywhere begin
function rand_det(n)
det(rand(n,n))
end
end
determinants = pmap(rand_det, 1:10)
この SO Q&Aも参照してください。
@平行
@parallelを使ってループを並列化し、ループのステップを別のワーカーに分割することができます。非常に簡単な例として、
addprocs(3)
a = collect(1:10)
for idx = 1:10
println(a[idx])
end
もう少し複雑な例を考えてみましょう:
@time begin
@sync begin
@parallel for idx in 1:length(a)
sleep(a[idx])
end
end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55
したがって、私たちが@parallel
なしでこのループを実行した場合、27秒ではなく55秒を実行することになります。
@parallel
マクロのための縮小演算子を与えることもできます。配列があるとします。配列の各列を合計し、これらの合計を相互に掛けたいとします。
A = rand(100,100);
@parallel (*) for idx = 1:size(A,1)
sum(A[:,idx])
end
予期せぬ動作を避けるために@parallel
を使用するときに留意すべきいくつかの重要なことがあります。
第一に 、あなたのループ内でJuliaにない関数(スクリプトで定義する関数やパッケージからインポートする関数など)を使用する場合は、その関数をワーカーがアクセスできるようにする必要があります。したがって、たとえば、次のようには動作しません 。
myprint(x) = println(x)
for idx = 1:10
myprint(a[idx])
end
代わりに、次のものを使用する必要があります。
@everywhere begin
function myprint(x)
println(x)
end
end
@parallel for idx in 1:length(a)
myprint(a[idx])
end
第2に、各作業者はコントローラーの範囲内のオブジェクトにアクセスできますが、変更することはできません 。従って
a = collect(1:10)
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Int64,2}:
1 2 3 4 5 6 7 8 9 10
一方、@parallelを使ってループを実行した場合、配列a
正常に変更できましa
。
これにSharedArray
するためa
、 SharedArray
型オブジェクトを作成して、各ワーカーがアクセスして変更できるようにすることができます。
a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Float64,2}:
2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0 11.0
@spawnと@spawnat
マクロ@spawn
と@spawnat
は、Juliaが作業を作業者に割り当てるために使用できるツールの2つです。次に例を示します。
julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)
julia> From worker 2: hello world
これらのマクロは両方とも、ワーカープロセス上の式を評価します。この2つの唯一の違いは、 @spawnat
使用すると、表現式を評価するワーカー(ワーカー2の上の例では例を指定)を選択できる一方、 @spawn
ではワーカーは自動的に選択されます。
上の例では、worker2にprintln関数を実行させるだけでした。そこから戻ったり回収したりするのに興味のあるものは何もなかった。しかし、しばしば私たちが労働者に送った表現は、私たちが取り出そうとしているものをもたらすでしょう。上記の例では、 @spawnat
、worker 2からの出力を得る前に、次のことが分かりました:
RemoteRef{Channel{Any}}(2,1,3)
これは、 @spawnat
マクロがRemoteRef
型オブジェクトを返すことを示します。このオブジェクトには、ワーカーに送信される式の戻り値が含まれます。これらの値を取得する場合は、最初に@spawnat
が返すRemoteRef
をオブジェクトに割り当ててから、次にRemoteRef
型オブジェクトで動作するfetch()
関数を使用して、労働者。
julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)
julia> fetch(result)
7
効果的に@spawn
を使用するための鍵は、それが動作する式の背後にある性質を理解することです。 @spawn
を使って作業者にコマンドを送るのは、作業者の一人に「インタプリタ」を実行している場合や、コードをネイティブに実行している場合に入力するものを直接入力することよりも少し複雑です。たとえば、 @spawnat
を使用してワーカーの変数に値を割り当てることを考えたとします。私たちは試みるかもしれない:
@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)
それは動作しましたか?さて、作業員2にa
を印刷させてみましょう。
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)
julia>
何も起こらなかった。どうして?上記のようにfetch()
を使用すると、これ以上調べることができます。 fetch()
は成功した結果だけでなくエラーメッセージも取り出すので非常に便利です。それがなければ、何かが間違っていることを知ることさえできないかもしれません。
julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)
julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined
エラーメッセージには、ワーカー2にはa
が定義されていないと表示されますが、これはなぜですか?その理由は、私たちが@spawn
を使って評価するように作業者に指示する式に代入演算をラップする必要があるからです。以下は説明の例です。
julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)
julia> From worker 2: 2
:()
構文は、Juliaが式を指定するために使用する構文です。 Juliaで式を評価するeval()
関数を使用し、 @spawnat
マクロを使用して式がワーカー2で評価されるように指示します。
同じ結果を得ることもできます:
julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)
julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)
julia> From worker 2: 5
この例は、2つの追加の概念を示しています。まず、文字列に対して呼び出されるparse()
関数を使用して式を作成できることがわかります。次に、 @spawnat
呼び出すときに括弧を使用できることが@spawnat
ます。これにより、構文がより明確で扱いやすいものになる場合があります。
@parallelとpmapの使い分け
Juliaの文書には、
pmap()は、各関数呼び出しが大量の作業を行う場合に設計されています。対照的に、@ for parallelは、各反復が小さい状況を扱うことができます。恐らく2つの数値を合計するだけです。
これにはいくつかの理由があります。第一に、 pmap
は労働者の仕事を開始するための起動コストが大きくなります。したがって、ジョブが非常に小さい場合、これらの起動コストは非効率になる可能性があります。逆に、 pmap
は作業者にジョブを割り当てるよりスマートな仕事をしています。特に、それはジョブのキューを構築し、そのワーカーが利用可能になるたびに新しいジョブを各ワーカーに送信します。 @parallel
対照的には、それが呼び出された労働者の間で行われるすべての作業をdivvies。そのため、一部の労働者が他の労働者よりも自分の仕事で時間がかかる場合、あなたの労働者の大部分が仕事を終え、アイドル状態に陥ることがあります。しかし、このような状況は、非常に小さく簡単な仕事では起こりにくい。
これは次のとおりです:2人の労働者がいて、そのうちの1人が遅く、もう1人が2倍の労働者を抱えているとします。理想的には、速い労働者に遅い労働者の2倍の仕事を与えたいと考えています。 (あるいは、私たちは仕事が遅くて遅いかもしれませんが、元本はまったく同じです)。 pmap
はこれを達成しますが、 @parallel
はそうしません。
各テストについて、次のように初期化します。
addprocs(2)
@everywhere begin
function parallel_func(idx)
workernum = myid() - 1
sleep(workernum)
println("job $idx")
end
end
さて、 @parallel
テストのために、以下を実行します:
@parallel for idx = 1:12
parallel_func(idx)
end
そして、印刷出力を取り戻す:
julia> From worker 2: job 1
From worker 3: job 7
From worker 2: job 2
From worker 2: job 3
From worker 3: job 8
From worker 2: job 4
From worker 2: job 5
From worker 3: job 9
From worker 2: job 6
From worker 3: job 10
From worker 3: job 11
From worker 3: job 12
それはほとんど甘いです。労働者は仕事を均等に「共有」しています。労働者2は労働者3の2倍の速さであっても、各労働者は6つの仕事を完了していることに注意してください。接触しているかもしれませんが、非効率です。
pmap
テストのために、私は以下を実行します:
pmap(parallel_func, 1:12)
出力を取得する:
From worker 2: job 1
From worker 3: job 2
From worker 2: job 3
From worker 2: job 5
From worker 3: job 4
From worker 2: job 6
From worker 2: job 8
From worker 3: job 7
From worker 2: job 9
From worker 2: job 11
From worker 3: job 10
From worker 2: job 12
ここで、作業員2は8つのジョブを実行し、作業者3は4を実行したことに注意してください。これは、速度に比例しており、効率を最適化するためです。 pmap
はそれぞれの能力に応じて、ハードタスクマスターです。
@asyncと@sync
?@async
のドキュメントによれば、「 @async
、式をタスクにラップします。これが意味することは、その範囲内にあるものについては、ジュリアはこのタスクを実行し始めますが、タスクが完了するのを待たずにスクリプトの次のものに進むことです。たとえば、マクロがなければ、次のようになります:
julia> @time sleep(2)
2.005766 seconds (13 allocations: 624 bytes)
しかし、マクロを使用すると、次のようになります。
julia> @time @async sleep(2)
0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0
julia>
Juliaは、タスク(この場合は2秒間スリープする)が完了するのを待つことなく、スクリプトを進める(そして@time
マクロを完全に実行させる)ことができます。
対照的に、 @sync
マクロは、 @async
、 @spawn
、 @spawnat
および@parallel
すべての動的に囲まれた使用が@async
するまで待機します。 (「 ?@sync
」の文書に従って)。したがって、我々は以下を参照する:
julia> @time @sync @async sleep(2)
2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00
この単純な例では、 @async
と@sync
1つのインスタンスを一緒に含める必要はありません。しかし、 @sync
が便利なところは、 @async
を複数の操作に適用して、それぞれが完了するのを待たずにすぐにすべてを開始できるようにすることです。
たとえば、複数の従業員がいるとします。それぞれの従業員を同時に作業を開始し、それらのタスクから結果を取得したいとします。最初の(しかし間違った)試みは次のようなものです:
addprocs(2)
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 4.011576 seconds (177 allocations: 9.734 KB)
ここで問題となるのは、各remotecall_fetch()
処理が完了するまでループが待機することです。つまり、次のremotecall_fetch()
操作を続行する前に各プロセスが処理を完了する(この場合は2秒間スリープしますremotecall_fetch()
。現実的な状況では、私たちのプロセスは自分の仕事(すなわち睡眠)を同時に行っていないので、ここでは並列性の利点は得られません。
私たちは、の組み合わせで使用することによって、しかし、これを修正することができます@async
と@sync
マクロを:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 2.009416 seconds (274 allocations: 25.592 KB)
ここで、ループの各ステップを別々の操作としてカウントすると、 @async
マクロの前に2つの別々の操作があることが@async
ます。マクロは、これらのそれぞれが起動し、コードが終了する前に(この場合はループの次のステップに)進むことができます。しかし、使用@sync
範囲全体のループを含むマクロは、我々はスクリプトがが先行するすべての操作まで、そのループを過ぎて進むことができないことを意味します@async
完了しました。
これらのマクロの動作をさらに明確に理解するには、上記の例をさらに微調整して、特定の変更の下でどのように変化するかを確認することができます。例えば、 @async
持たずに@sync
持っているとします。
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
println("sending work to $pid")
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 0.001429 seconds (27 allocations: 2.234 KB)
ここで、 @async
マクロを@async
と、 @async
remotecall_fetch()
操作の実行が終了する前でもループを継続できます。しかし、 @sync
ながら、 remotecall_fetch()
オペレーションがすべて終了するまで、コードがこのループを過ぎないようにするための@sync
マクロはありません。
それにもかかわらず、各remotecall_fetch()
操作は、いったん実行しても、並行して実行されます。 2秒間待つと、結果を含む配列aには次のものが含まれていることが分かります。
sleep(2)
julia> a
2-element Array{Any,1}:
nothing
nothing
(「nothing」要素は、スリープ関数の結果を正常にフェッチした結果であり、値を返さない)
また、2つのremotecall_fetch()
操作は、それらの前にあるprint
コマンドも速く連続して実行されるため、本質的に同時に開始することがremotecall_fetch()
ます(これらのコマンドの出力はここには表示されません)。これを、次の例と対照すると、 print
コマンドは互いに2秒遅れて実行されます。
@async
マクロを(内部のステップではなく)ループ全体に置くと、 remotecall_fetch()
操作が終了するのを待つことなくスクリプトが直ちに続行されます。しかし、今度は、スクリプトがループ全体を越えて進むことを許可します。前のループが完了する前にループの個々のステップを開始することはできません。したがって、上記の例とは異なり、スクリプトがループの後に続く2秒後、 results
配列には2つ目のremotecall_fetch()
操作がまだ完了していないことを示す#undef
という要素が1つあります。
@time begin
a = cell(nworkers())
@async for (idx, pid) in enumerate(workers())
println("sending work to $pid")
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to
sleep(2)
a
2-element Array{Any,1}:
nothing
#undef
驚くことではないが、私たちが@sync
と@async
置くと、各remotecall_fetch()
は(同時にではなくremotecall_fetch()
順番に実行されますが、それぞれが終了するまでコード内で継続しません。言い換えれば、 sleep(2)
が@sync @async sleep(2)
と本質的に同じように動作するのと同じように、
@time begin
a = cell(nworkers())
@sync @async for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10
また、 @async
マクロのスコープ内でより複雑な操作を行うことも可能です。このドキュメントでは、 @async
の範囲内にあるループ全体を含むサンプルを@async
ます。
同期のマクロのヘルプは、それがすることを述べていることを思い出してください「のすべての動的に囲まれた用途まで待って@async
、 @spawn
、 @spawnat
と@parallel
完了しています。」 「完全」と@async
目的のために、 @sync
@async
マクロと@async
マクロの範囲内でタスクを定義する方法が重要です。上記の例の1つにわずかなバリエーションがあります。
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall(pid, sleep, 2)
end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)
julia> a
2-element Array{Any,1}:
RemoteRef{Channel{Any}}(2,1,3)
RemoteRef{Channel{Any}}(3,1,4)
前の例では、実行に約2秒かかっていました。これは、2つのタスクが並行して実行されていることを示しています。しかし、この例では、時間評価がずっと短くなっています。その理由は、 @sync
の目的のために@sync
remotecall()
操作は作業者に作業を送った後に「終了」しているからです。 (結果として得られる配列aにはRemoteRef
オブジェクト型が含まれているにすぎないことに注意してください。これは、将来、ある時点でフェッチされる可能性のある特定のプロセスで起こっていることを示しています)。対照的に、 remotecall_fetch()
操作は、作業が完了したというメッセージをワーカーから取得した場合にのみ「終了」します。
このように、あなたのスクリプトで作業を進める前に、特定の作業が完了していることを確認する方法を探しているなら(例えば、 この記事で議論されているように)、「完全」とみなして考える方法と、それを測定し、それをスクリプトで操作してください。
労働者の追加
あなたが最初にJuliaを起動すると、デフォルトでは、実行中の単一のプロセスしか存在せず、作業を行うことができません。これを確認するには、
julia> nprocs()
1
並列処理を利用するには、最初に追加のワーカーを追加し、次に割り当てられた作業を実行できるようにする必要があります。 addprocs(n)
を使用して、スクリプト内(またはインタプリタ側)でこれを行うことができますn
は使用するプロセスの数です。
または、次のコマンドラインを使用してJuliaを起動するときに、プロセスを追加することもできます。
$ julia -p n
n
は追加するプロセスの追加数です。したがって、Juliaを
$ julia -p 2
ジュリアが始まると、私たちは次のようになるでしょう:
julia> nprocs()
3