Julia Language
Parallelverarbeitung
Suche…
pmap
pmap
übernimmt eine Funktion (die Sie angeben) und wendet diese auf alle Elemente in einem Array an. Diese Arbeit wird unter den verfügbaren Arbeitern aufgeteilt. pmap
dann die Ergebnisse dieser Funktion in ein anderes Array zurück.
addprocs(3)
sqrts = pmap(sqrt, 1:10)
Wenn Sie mehrere Argumente verwenden, können Sie pmap
mehrere Vektoren zur Verfügung pmap
dots = pmap(dot, 1:10, 11:20)
Wie bei @parallel
jedoch, wenn die Funktion gegeben pmap
(ist also benutzerdefiniert oder in einem Paket definiert ist ) ist nicht in der Basis Julia dann müssen Sie sicherstellen, dass Funktion für alle Arbeitnehmer zur Verfügung steht zuerst:
@everywhere begin
function rand_det(n)
det(rand(n,n))
end
end
determinants = pmap(rand_det, 1:10)
Siehe auch diese SO Q & A.
@parallel
Mit @parallel können Sie eine Schleife parallelisieren, indem Sie die Schritte der Schleife auf verschiedene Worker verteilen. Als sehr einfaches Beispiel:
addprocs(3)
a = collect(1:10)
for idx = 1:10
println(a[idx])
end
Für ein etwas komplexeres Beispiel sollten Sie Folgendes berücksichtigen:
@time begin
@sync begin
@parallel for idx in 1:length(a)
sleep(a[idx])
end
end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55
Wir sehen also, wenn wir diese Schleife ohne @parallel
hätten, hätte es 55 statt 27 Sekunden @parallel
.
Wir können auch einen Reduktionsoperator für das @parallel
Makro @parallel
. Angenommen, wir haben ein Array, wir wollen jede Spalte des Arrays summieren und diese Summen dann mit einander multiplizieren:
A = rand(100,100);
@parallel (*) for idx = 1:size(A,1)
sum(A[:,idx])
end
Bei der Verwendung von @parallel
einige wichtige @parallel
, um unerwartetes Verhalten zu vermeiden.
Erstens: Wenn Sie Funktionen in Ihren Schleifen verwenden möchten, die sich nicht in der Basis Julia befinden (z. B. eine von Ihnen in Ihrem Skript definierte Funktion oder die Sie aus Paketen importieren), müssen Sie diese Funktionen den Arbeitern zugänglich machen. Folgendes würde beispielsweise nicht funktionieren:
myprint(x) = println(x)
for idx = 1:10
myprint(a[idx])
end
Stattdessen müssten wir Folgendes verwenden:
@everywhere begin
function myprint(x)
println(x)
end
end
@parallel for idx in 1:length(a)
myprint(a[idx])
end
Zweitens Obwohl jeder Arbeiter auf die Objekte im Bereich des Controllers zugreifen kann, kann er sie nicht ändern. Somit
a = collect(1:10)
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Int64,2}:
1 2 3 4 5 6 7 8 9 10
Wenn wir jedoch die Schleife ohne @parallel ausgeführt hätten, wäre das Array a
erfolgreich geändert worden.
Um dem abzuhelfen, können wir stattdessen machen a
ein SharedArray
Typ - Objekt , sodass jeder Arbeiter zugreifen können und ändern Sie es:
a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Float64,2}:
2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0 11.0
@ Spawn und @ Spawnat
Die Makros @spawn
und @spawnat
sind zwei der Werkzeuge, die Julia zur Verfügung stellt, um Arbeitern Aufgaben zuzuweisen. Hier ist ein Beispiel:
julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)
julia> From worker 2: hello world
Beide Makros bewerten einen Ausdruck in einem Arbeitsprozess. Der einzige Unterschied zwischen den beiden besteht darin, dass Sie in @spawnat
auswählen können, welcher Worker den Ausdruck auswerten soll (im obigen Beispiel wurde Worker 2 angegeben), während mit @spawn
ein Worker automatisch @spawn
von der Verfügbarkeit ausgewählt wird.
In dem obigen Beispiel mussten wir einfach Arbeiter 2 die println-Funktion ausführen. Es gab nichts Interessantes, um davon zurückzukommen. Oft wird jedoch der Ausdruck, den wir dem Arbeiter senden, etwas ergeben, das wir abrufen möchten. Beachten Sie im obigen Beispiel, als wir @spawnat
, bevor wir den Ausdruck von Worker 2 erhielten, wir Folgendes gesehen haben:
RemoteRef{Channel{Any}}(2,1,3)
Dies zeigt an, dass das @spawnat
Makro ein RemoteRef
Objekt RemoteRef
. Dieses Objekt enthält wiederum die Rückgabewerte aus unserem Ausdruck, die an den Worker gesendet werden. Wenn wir diese Werte abrufen möchten, können wir zuerst das von RemoteRef
@spawnat
einem Objekt zuweisen und dann die Funktion fetch()
verwenden, die ein RemoteRef
Objekt RemoteRef
, um die Ergebnisse einer Auswertung RemoteRef
, für die eine Auswertung durchgeführt wurde ein Arbeiter.
julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)
julia> fetch(result)
7
Der Schlüssel für die effektive Verwendung von @spawn
ist das Verständnis der Natur hinter den Ausdrücken , mit denen es arbeitet. Die Verwendung von @spawn
zum Senden von Befehlen an Worker ist etwas komplizierter als nur das direkte Eingeben, was Sie eingeben würden, wenn Sie einen "Interpreter" auf einem der Worker ausführen oder Code nativ auf ihnen ausführen würden. Nehmen wir zum Beispiel an, wir wollten @spawnat
, um einer Variablen eines Arbeiters einen Wert zuzuweisen. Wir könnten versuchen:
@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)
Hat es funktioniert? Nun, lass uns sehen, indem Arbeiter 2 versucht, a
zu drucken.
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)
julia>
Nichts ist passiert. Warum? Wir können dies mehr mithilfe von fetch()
wie oben untersuchen. fetch()
kann sehr praktisch sein, da es nicht nur erfolgreiche Ergebnisse, sondern auch Fehlermeldungen abruft. Ohne sie wissen wir vielleicht gar nicht, dass etwas schief gelaufen ist.
julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)
julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined
Die Fehlermeldung besagt, dass a
nicht für Worker 2 definiert ist. Aber warum ist das so? Der Grund ist, dass wir unsere Zuweisungsoperation in einen Ausdruck @spawn
, den wir dann @spawn
, um den Worker mit der Auswertung zu beauftragen. Unten ist ein Beispiel mit der folgenden Erklärung:
julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)
julia> From worker 2: 2
Die Syntax :()
verwendet Julia, um Ausdrücke zu bezeichnen. Wir verwenden dann die Funktion eval()
in Julia, die einen Ausdruck auswertet, und verwenden das @spawnat
Makro, um anzuweisen, dass der Ausdruck auf Worker 2 ausgewertet wird.
Wir könnten auch das gleiche Ergebnis erzielen wie:
julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)
julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)
julia> From worker 2: 5
Dieses Beispiel zeigt zwei zusätzliche Begriffe. Zunächst sehen wir, dass wir auch einen Ausdruck erstellen können, indem Sie die Funktion parse()
verwenden, die für einen String aufgerufen wird. Zweitens sehen wir, dass wir beim Aufruf von @spawnat
Klammern verwenden können, wenn @spawnat
unsere Syntax klarer und handhabbarer wird.
Wann Sie @parallel vs. pmap verwenden sollten
Die Julia- Dokumentation weist darauf hin
pmap () ist für den Fall konzipiert, in dem jeder Funktionsaufruf viel Arbeit verrichtet. Im Gegensatz dazu kann @parallel for Situationen bewältigen, in denen jede Iteration klein ist und vielleicht nur zwei Zahlen summiert.
Dafür gibt es mehrere Gründe. pmap
verursacht pmap
höhere Anlaufkosten für die pmap
Arbeitsplätzen. Wenn die Jobs sehr klein sind, können diese Anlaufkosten ineffizient werden. Umgekehrt pmap
jedoch die Aufgabe, Arbeitsplätze unter den Arbeitnehmern zu pmap
. Insbesondere wird eine Warteschlange mit Aufträgen erstellt und jedes Mal, wenn er verfügbar ist, ein neuer Auftrag an jeden Mitarbeiter gesendet. @parallel
dazu gibt es alle Arbeiten, die unter den Arbeitern erledigt werden müssen, wenn sie aufgerufen werden. Wenn also einige Arbeiter länger arbeiten als andere, können Sie in einer Situation enden, in der die meisten Ihrer Mitarbeiter beendet sind und im Leerlauf sind, während einige für eine übermäßig lange Zeit aktiv bleiben und ihre Jobs beenden. Es ist jedoch weniger wahrscheinlich, dass eine solche Situation bei sehr kleinen und einfachen Jobs auftritt.
Das Folgende veranschaulicht dies: Angenommen, wir haben zwei Arbeiter, von denen einer langsam und der andere doppelt so schnell ist. Idealerweise möchten wir dem schnellen Arbeiter doppelt so viel Arbeit geben wie dem langsamen Arbeiter. (oder wir könnten schnelle und langsame Jobs haben, aber das Prinzip ist das gleiche). pmap
dies, aber @parallel
nicht.
Für jeden Test initialisieren wir Folgendes:
addprocs(2)
@everywhere begin
function parallel_func(idx)
workernum = myid() - 1
sleep(workernum)
println("job $idx")
end
end
Beim @parallel
Test führen wir nun Folgendes aus:
@parallel for idx = 1:12
parallel_func(idx)
end
Und Druckausgabe zurückbekommen:
julia> From worker 2: job 1
From worker 3: job 7
From worker 2: job 2
From worker 2: job 3
From worker 3: job 8
From worker 2: job 4
From worker 2: job 5
From worker 3: job 9
From worker 2: job 6
From worker 3: job 10
From worker 3: job 11
From worker 3: job 12
Es ist fast süß. Die Arbeiter haben die Arbeit gleichmäßig "geteilt". Beachten Sie, dass jeder Arbeiter 6 Jobs ausgeführt hat, obwohl Arbeiter 2 doppelt so schnell wie Arbeiter 3 ist. Er kann sich berühren, ist aber ineffizient.
Für den pmap
Test pmap
ich Folgendes aus:
pmap(parallel_func, 1:12)
und erhalte die Ausgabe:
From worker 2: job 1
From worker 3: job 2
From worker 2: job 3
From worker 2: job 5
From worker 3: job 4
From worker 2: job 6
From worker 2: job 8
From worker 3: job 7
From worker 2: job 9
From worker 2: job 11
From worker 3: job 10
From worker 2: job 12
Nun ist zu beachten, dass Arbeiter 2 acht Jobs und Arbeiter 3 4 ausgeführt hat. Dies ist genau im Verhältnis zu ihrer Geschwindigkeit und dem, was wir für eine optimale Effizienz wünschen. pmap
ist ein harter Aufgabenmeister - von jedem nach seinen Fähigkeiten.
@async und @sync
Gemäß der Dokumentation unter ?@async
" @async
einen Ausdruck in einer Task." Dies bedeutet, dass Julia diese Aufgabe für das, was in ihren Geltungsbereich fällt, startet, aber dann mit dem nächsten Schritt im Skript fortfährt, ohne auf den Abschluss der Aufgabe zu warten. So erhalten Sie beispielsweise ohne das Makro:
julia> @time sleep(2)
2.005766 seconds (13 allocations: 624 bytes)
Mit dem Makro erhalten Sie jedoch:
julia> @time @async sleep(2)
0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0
julia>
Julia erlaubt dem Skript daher, fortzufahren (und das @time
Makro vollständig auszuführen), ohne darauf zu warten, dass die Aufgabe (in diesem Fall zwei Sekunden im @time
) abgeschlossen ist.
Im Gegensatz dazu wird das @sync
Makro "warten, bis alle dynamisch eingeschlossenen Verwendungen von @async
, @spawn
, @spawnat
und @parallel
sind." (gemäß der Dokumentation unter ?@sync
). So sehen wir:
julia> @time @sync @async sleep(2)
2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00
In diesem einfachen Beispiel ist es nicht @async
eine einzelne Instanz von @async
und @sync
zusammenzufügen. @sync
kann jedoch nützlich sein, wenn @async
auf mehrere Vorgänge angewendet werden soll, die alle gleichzeitig starten sollen, ohne darauf zu warten, bis die einzelnen Operationen abgeschlossen sind.
Angenommen, wir haben mehrere Mitarbeiter, und wir möchten, dass jeder von ihnen gleichzeitig an einer Aufgabe arbeitet und dann die Ergebnisse aus diesen Aufgaben abruft. Ein erster (aber falscher) Versuch könnte sein:
addprocs(2)
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 4.011576 seconds (177 allocations: 9.734 KB)
Das Problem hierbei ist, dass die Schleife auf die remotecall_fetch()
jeder remotecall_fetch()
Operation wartet, dh dass jeder Prozess seine Arbeit beendet (in diesem Fall für 2 Sekunden remotecall_fetch()
), bevor die nächste remotecall_fetch()
Operation fortgesetzt wird. In einer praktischen Situation haben wir hier nicht die Vorteile der Parallelität, da unsere Prozesse nicht gleichzeitig arbeiten (dh schlafen).
Wir können dies jedoch korrigieren, indem Sie eine Kombination der @async
und @sync
Makros verwenden:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 2.009416 seconds (274 allocations: 25.592 KB)
Wenn wir nun jeden Schritt der Schleife als separate Operation zählen, sehen wir, dass dem @async
Makro zwei separate Operationen vorangestellt sind. Das Makro ermöglicht das Starten jedes dieser Elemente, und der Code wird fortgesetzt (in diesem Fall bis zum nächsten Schritt der Schleife), bevor er beendet wird. Die Verwendung des @sync
Makros, dessen Gültigkeitsbereich die gesamte Schleife umfasst, bedeutet jedoch, dass das Skript nicht an dieser Schleife vorbeigehen kann, bis alle Vorgänge, denen @async
, abgeschlossen sind.
Sie können die Funktionsweise dieser Makros noch genauer verstehen, indem Sie das obige Beispiel weiter anpassen, um zu sehen, wie es sich bei bestimmten Modifikationen ändert. Angenommen, wir haben nur @async
ohne @sync
:
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
println("sending work to $pid")
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 0.001429 seconds (27 allocations: 2.234 KB)
Mit dem @async
Makro können wir in unserer Schleife remotecall_fetch()
auch bevor die Ausführung von remotecall_fetch()
. Wir haben jedoch kein @sync
Makro, um zu verhindern, dass der Code nach dieser Schleife fortgesetzt wird, bis alle remotecall_fetch()
.
Trotzdem remotecall_fetch()
jede remotecall_fetch()
Operation immer noch parallel, auch wenn wir remotecall_fetch()
. Wir können das sehen, denn wenn wir zwei Sekunden warten, enthält das Array a, das die Ergebnisse enthält, Folgendes:
sleep(2)
julia> a
2-element Array{Any,1}:
nothing
nothing
(Das Element "nothing" ist das Ergebnis eines erfolgreichen Abrufs der Ergebnisse der Sleep-Funktion, die keine Werte zurückgibt.)
Sehen wir auch , dass die beiden remotecall_fetch()
Operationen im wesentlichen zur gleichen Zeit gestartet werden, da die print
, die sie auch in schnellen Folge ausführen vorausgehen (Ausgabe von diesen Befehlen hier nicht dargestellt). Man vergleiche dies mit dem nächsten Beispiel , bei dem die print
in einer 2 Sekunde Verzögerung voneinander auszuführen:
Wenn wir das @async
Makro in die gesamte Schleife @async
(anstatt nur den inneren Schritt), wird unser Skript sofort fortgesetzt, ohne zu warten, bis die remotecall_fetch()
. Jetzt erlauben wir jedoch nur, dass das Skript über die gesamte Schleife hinausgeht. Wir lassen nicht zu, dass jeder einzelne Schritt der Schleife beginnt, bevor der vorherige abgeschlossen ist. Anders als im obigen Beispiel hat das results
Array zwei Sekunden, nachdem das Skript nach der Schleife #undef
, noch ein Element als #undef
, das #undef
hinweist, dass die zweite remotecall_fetch()
noch nicht abgeschlossen ist.
@time begin
a = cell(nworkers())
@async for (idx, pid) in enumerate(workers())
println("sending work to $pid")
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to
sleep(2)
a
2-element Array{Any,1}:
nothing
#undef
Und nicht überraschend, wenn wir @sync
und @async
direkt nebeneinander stellen, wird @sync
, dass jeder remotecall_fetch()
sequentiell (und nicht gleichzeitig) ausgeführt wird. Der Code wird jedoch erst fortgesetzt, wenn er fertig ist. Mit anderen Worten, dies wäre im Wesentlichen das Äquivalent dazu, wenn wir keines der Makros hätten, genauso wie sich sleep(2)
Wesentlichen identisch zu @sync @async sleep(2)
verhält.
@time begin
a = cell(nworkers())
@sync @async for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10
Beachten Sie auch, dass im @async
Makro @async
Operationen möglich sind. Die Dokumentation enthält ein Beispiel, das eine gesamte Schleife im Rahmen von @async
.
Es sei daran erinnert, dass die Hilfe für die Synchronisationsmakros besagt, dass "es warten wird, bis alle dynamisch eingeschlossenen Anwendungen von @async
, @spawn
, @spawnat
und @parallel
vollständig sind." Für das, was als "abgeschlossen" gilt, ist es wichtig, wie Sie die Aufgaben im Rahmen der @sync
und @async
Makros definieren. Betrachten Sie das folgende Beispiel, das eine geringfügige Abweichung von einem der oben angegebenen Beispiele darstellt:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall(pid, sleep, 2)
end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)
julia> a
2-element Array{Any,1}:
RemoteRef{Channel{Any}}(2,1,3)
RemoteRef{Channel{Any}}(3,1,4)
Die Ausführung des vorherigen Beispiels dauerte ungefähr zwei Sekunden. Dies zeigt an, dass die beiden Tasks parallel ausgeführt wurden und das Skript darauf wartete, dass die Ausführung der Funktionen abgeschlossen war, bevor es fortfuhr. Dieses Beispiel hat jedoch eine wesentlich geringere Zeitauswertung. Der Grund dafür ist, dass die @sync
remotecall()
für @sync
"beendet" ist, sobald der Arbeiter den Job gesendet hat. (Beachten Sie, dass das resultierende Array, a, hier nur RemoteRef
Objekttypen enthält, die nur darauf hinweisen, dass bei einem bestimmten Prozess etwas RemoteRef
, das theoretisch irgendwann in der Zukunft abgerufen werden könnte.) Im Gegensatz dazu ist der remotecall_fetch()
" remotecall_fetch()
" nur "beendet", wenn er die Nachricht vom Worker erhält, dass seine Aufgabe abgeschlossen ist.
Wenn Sie also nach Möglichkeiten suchen, sicherzustellen, dass bestimmte Operationen mit Arbeitern abgeschlossen sind, bevor Sie mit Ihrem Skript fortfahren (wie zum Beispiel in diesem Beitrag beschrieben ), müssen Sie sorgfältig darüber nachdenken, was als "abgeschlossen" gilt und wie Sie dies tun werden Messen und dann operationalisieren Sie das in Ihrem Skript.
Arbeiter hinzufügen
Wenn Sie Julia zum ersten Mal starten, wird standardmäßig nur ein einziger Prozess ausgeführt, der für die Arbeit verfügbar ist. Sie können dies überprüfen mit:
julia> nprocs()
1
Um die parallele Verarbeitung zu nutzen, müssen Sie zunächst weitere Worker hinzufügen, die dann für die ihnen zugewiesene Arbeit zur Verfügung stehen. Sie können dies in Ihrem Skript (oder über den Interpreter) mithilfe von: addprocs(n)
wobei n
die Anzahl der Prozesse ist, die Sie verwenden möchten.
Alternativ können Sie Prozesse hinzufügen, wenn Sie Julia von der Befehlszeile aus starten:
$ julia -p n
Dabei ist n
die Anzahl der zusätzlichen Prozesse, die Sie hinzufügen möchten. Also, wenn wir mit Julia anfangen
$ julia -p 2
Wenn Julia anfängt, bekommen wir:
julia> nprocs()
3