Julia Language
Parallelle verwerking
Zoeken…
pmap
pmap
neemt een functie (die u opgeeft) en past deze toe op alle elementen in een array. Dit werk is verdeeld over de beschikbare werknemers. pmap
retourneert vervolgens de resultaten van die functie in een andere array.
addprocs(3)
sqrts = pmap(sqrt, 1:10)
als u meerdere argumenten gebruikt, kunt u meerdere vectoren aan pmap
dots = pmap(dot, 1:10, 11:20)
Net als bij @parallel
moet u er echter voor zorgen dat als de functie die aan pmap
wordt gegeven zich niet in base Julia bevindt (dat wil zeggen, door de gebruiker is gedefinieerd of in een pakket is gedefinieerd), deze functie eerst voor alle werknemers beschikbaar is:
@everywhere begin
function rand_det(n)
det(rand(n,n))
end
end
determinants = pmap(rand_det, 1:10)
Zie ook deze SO Q&A.
@parallel
@parallel kan worden gebruikt om een lus te parallelliseren, waarbij stappen van de lus worden verdeeld over verschillende werkers. Als een heel eenvoudig voorbeeld:
addprocs(3)
a = collect(1:10)
for idx = 1:10
println(a[idx])
end
Overweeg voor een iets complexer voorbeeld:
@time begin
@sync begin
@parallel for idx in 1:length(a)
sleep(a[idx])
end
end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55
We zien dus dat als we deze lus zonder @parallel
hadden uitgevoerd, het 55 seconden zou hebben @parallel
, in plaats van 27, om het uit te voeren.
We kunnen ook een reductie-operator leveren voor de macro @parallel
. Stel dat we een array hebben, we willen elke kolom van de array optellen en deze bedragen vervolgens met elkaar vermenigvuldigen:
A = rand(100,100);
@parallel (*) for idx = 1:size(A,1)
sum(A[:,idx])
end
Er zijn verschillende belangrijke dingen om in gedachten te houden wanneer u @parallel
gebruikt om onverwacht gedrag te voorkomen.
Ten eerste: als u functies in uw lussen wilt gebruiken die zich niet in base Julia bevinden (bijvoorbeeld functies die u in uw script definieert of die u uit pakketten importeert), moet u die functies toegankelijk maken voor de werkers. Het volgende zou bijvoorbeeld niet werken:
myprint(x) = println(x)
for idx = 1:10
myprint(a[idx])
end
In plaats daarvan zouden we moeten gebruiken:
@everywhere begin
function myprint(x)
println(x)
end
end
@parallel for idx in 1:length(a)
myprint(a[idx])
end
Ten tweede Hoewel elke werknemer toegang heeft tot de objecten in het bereik van de controller, kunnen ze deze niet wijzigen. Dus
a = collect(1:10)
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Int64,2}:
1 2 3 4 5 6 7 8 9 10
Terwijl, als we de lus zonder @parallel hadden uitgevoerd, deze met succes de array a
zou hebben aangepast.
Dit aan te pakken, kunnen we in plaats daarvan maken a
een SharedArray
soort object, zodat iedere werknemer kan openen en wijzigen:
a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
a[idx] += 1
end
julia> a'
1x10 Array{Float64,2}:
2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 10.0 11.0
@spawn en @spawnat
De macro's @spawn
en @spawnat
zijn twee van de hulpmiddelen die Julia beschikbaar stelt om taken aan werknemers toe te wijzen. Hier is een voorbeeld:
julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)
julia> From worker 2: hello world
Beide macro's evalueren een uitdrukking op een werkproces. Het enige verschil tussen de twee is dat @spawnat
toestaat om te kiezen welke werknemer de uitdrukking zal evalueren (in het bovenstaande voorbeeld is werknemer 2 gespecificeerd), terwijl bij @spawn
automatisch een werknemer wordt gekozen, op basis van beschikbaarheid.
In het bovenstaande voorbeeld hebben we gewoon werknemer 2 de println-functie laten uitvoeren. Hiervan was niets interessants om terug te keren of op te halen. Vaak levert de uitdrukking die we naar de werker hebben gestuurd echter iets op dat we willen ophalen. Let op in het bovenstaande voorbeeld, toen we @spawnat
, voordat we de afdruk van werknemer 2 kregen, zagen we het volgende:
RemoteRef{Channel{Any}}(2,1,3)
Dit geeft aan dat de macro RemoteRef
een RemoteRef
type @spawnat
zal retourneren. Dit object bevat op zijn beurt de retourwaarden van onze expressie die naar de werker worden verzonden. Als we die waarden willen ophalen, kunnen we eerst de RemoteRef
toewijzen die @spawnat
retourneert naar een object en vervolgens de functie fetch()
die op een RemoteRef
type RemoteRef
werkt, om de resultaten op te halen die zijn opgeslagen uit een evaluatie die is uitgevoerd op een werknemer.
julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)
julia> fetch(result)
7
De sleutel tot het effectief kunnen gebruiken van @spawn
is het begrijpen van de aard achter de uitdrukkingen waarop het werkt. Het gebruik van @spawn
om opdrachten naar werknemers te sturen, is iets ingewikkelder dan alleen direct typen wat u zou typen als u een "interpreter" op een van de werknemers zou uitvoeren of native code op hen zou uitvoeren. Stel bijvoorbeeld dat we @spawnat
wilden gebruiken om een waarde toe te kennen aan een variabele op een werknemer. We kunnen proberen:
@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)
Werkte het? Nou, laten we zien door het hebben van de werknemer 2 proberen om af te drukken a
.
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)
julia>
Er is niks gebeurd. Waarom? We kunnen dit meer onderzoeken door fetch()
zoals hierboven. fetch()
kan erg handig zijn omdat het niet alleen succesvolle resultaten maar ook foutmeldingen ophaalt. Zonder dat weten we misschien niet eens dat er iets mis is gegaan.
julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)
julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined
De foutmelding zegt dat a
niet is gedefinieerd voor werknemer 2. Maar waarom is dit? De reden is dat we onze toewijzingsbewerking moeten omzetten in een uitdrukking die we vervolgens gebruiken met @spawn
om de medewerker te laten evalueren. Hieronder is een voorbeeld, met de volgende uitleg:
julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)
julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)
julia> From worker 2: 2
De :()
syntaxis is wat Julia gebruikt om uitdrukkingen aan te duiden . We gebruiken vervolgens de functie eval()
in Julia, die een expressie evalueert, en we gebruiken de macro @spawnat
om te instrueren dat de expressie wordt geëvalueerd op werknemer 2.
We kunnen ook hetzelfde resultaat bereiken als:
julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)
julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)
julia> From worker 2: 5
Dit voorbeeld toont twee extra noties. Eerst zien we dat we ook een uitdrukking kunnen maken met behulp van de functie parse()
die op een string wordt aangeroepen. Ten tweede zien we dat we haakjes kunnen gebruiken wanneer we @spawnat
aanroepen, in situaties waarin dit onze syntaxis duidelijker en beter beheersbaar zou kunnen maken.
Wanneer @parallel vs. pmap te gebruiken
De Julia- documentatie adviseert dat
pmap () is ontworpen voor het geval waarin elke functieaanroep veel werk doet. Daarentegen kan @parallel for situaties aan waarbij elke iteratie klein is, misschien slechts twee getallen.
Daar zijn verschillende redenen voor. Ten eerste brengt pmap
hogere opstartkosten met zich mee waardoor banen voor werknemers worden geïnitieerd. Dus als de taken erg klein zijn, kunnen deze opstartkosten inefficiënt worden. Omgekeerd doet pmap
een "slimmere" taak van het toewijzen van banen aan werknemers. Het bouwt met name een wachtrij met taken en stuurt een nieuwe taak naar elke werknemer wanneer die werknemer beschikbaar komt. @parallel
daarentegen verdeelt al het werk dat onder de arbeiders moet worden gedaan wanneer het wordt opgeroepen. Als sommige werknemers meer tijd nodig hebben om te werken dan anderen, kunt u een situatie krijgen waarbij de meeste van uw werknemers klaar zijn en inactief zijn, terwijl enkelen voor een buitensporige hoeveelheid tijd actief blijven en hun taken afmaken. Een dergelijke situatie is echter minder waarschijnlijk bij zeer kleine en eenvoudige taken.
Het volgende illustreert dit: stel dat we twee werknemers hebben, waarvan er één langzaam is en de andere twee keer zo snel. In het ideale geval zouden we de snelle werker twee keer zoveel werk willen geven als de trage werker. (of we kunnen snel en langzaam werken, maar het principe is exact hetzelfde). pmap
zal dit bereiken, maar @parallel
niet.
Voor elke test initialiseren we het volgende:
addprocs(2)
@everywhere begin
function parallel_func(idx)
workernum = myid() - 1
sleep(workernum)
println("job $idx")
end
end
Voor de @parallel
test voeren we het volgende uit:
@parallel for idx = 1:12
parallel_func(idx)
end
En krijg terug printoutput:
julia> From worker 2: job 1
From worker 3: job 7
From worker 2: job 2
From worker 2: job 3
From worker 3: job 8
From worker 2: job 4
From worker 2: job 5
From worker 3: job 9
From worker 2: job 6
From worker 3: job 10
From worker 3: job 11
From worker 3: job 12
Het is bijna lief. De werknemers hebben het werk gelijkmatig "gedeeld". Merk op dat elke werknemer 6 taken heeft voltooid, hoewel werknemer 2 twee keer zo snel is als werknemer 3. Het kan raken, maar het is inefficiënt.
Voor de pmap
test voer ik het volgende uit:
pmap(parallel_func, 1:12)
en krijg de output:
From worker 2: job 1
From worker 3: job 2
From worker 2: job 3
From worker 2: job 5
From worker 3: job 4
From worker 2: job 6
From worker 2: job 8
From worker 3: job 7
From worker 2: job 9
From worker 2: job 11
From worker 3: job 10
From worker 2: job 12
Merk nu op dat werknemer 2 8 taken heeft uitgevoerd en werknemer 3 heeft 4 uitgevoerd. Dit is precies in verhouding tot hun snelheid en wat we willen voor optimale efficiëntie. pmap
is een harde pmap
- van elk volgens hun vermogen.
@async en @sync
Volgens de documentatie onder ?@async
, " @async
verpakt een uitdrukking in een taak." Wat dit betekent is dat voor alles wat binnen het toepassingsgebied valt, Julia deze taak zal starten, maar dan verder gaat met wat daarna komt in het script zonder te wachten tot de taak is voltooid. Zo krijgt u bijvoorbeeld zonder de macro:
julia> @time sleep(2)
2.005766 seconds (13 allocations: 624 bytes)
Maar met de macro krijg je:
julia> @time @async sleep(2)
0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0
julia>
Julia laat het script dus doorgaan (en de macro @time
volledig uitvoeren) zonder te wachten tot de taak (in dit geval twee seconden slapen) is voltooid.
De macro @sync
daarentegen, "wacht totdat alle dynamisch ingesloten toepassingen van @async
, @spawn
, @spawnat
en @parallel
zijn voltooid." (volgens de documentatie onder ?@sync
). Zo zien we:
julia> @time @sync @async sleep(2)
2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00
In dit eenvoudige voorbeeld heeft het dus geen zin om een enkel exemplaar van @async
en @sync
samen op te nemen. Maar waar @sync
nuttig kan zijn, is dat u @async
toegepast op meerdere bewerkingen die u allemaal wilt toestaan om allemaal tegelijk te starten zonder te wachten tot ze zijn voltooid.
Stel bijvoorbeeld dat we meerdere werknemers hebben en we willen elk van hen tegelijkertijd aan een taak laten werken en dan de resultaten van die taken ophalen. Een eerste (maar onjuiste) poging kan zijn:
addprocs(2)
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 4.011576 seconds (177 allocations: 9.734 KB)
Het probleem hier is dat de lus wacht tot elke remotecall_fetch()
-bewerking is voltooid, dat wil zeggen dat elk proces zijn werk voltooit (in dit geval 2 seconden slapen) voordat de volgende remotecall_fetch()
-bewerking wordt gestart. In termen van een praktische situatie, krijgen we hier niet de voordelen van parallellisme, omdat onze processen hun werk (dwz slapen) niet tegelijkertijd doen.
We kunnen dit echter corrigeren door een combinatie van de macro's @async
en @sync
:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 2.009416 seconds (274 allocations: 25.592 KB)
Als we nu elke stap van de lus als een afzonderlijke bewerking tellen, zien we dat er twee afzonderlijke bewerkingen worden voorafgegaan door de macro @async
. Met de macro kan elk van deze worden opgestart en de code doorgaan (in dit geval naar de volgende stap van de lus) voor elke finish. Maar het gebruik van de macro @sync
, waarvan de reikwijdte de hele lus omvat, betekent dat we het script niet voorbij die lus laten gaan totdat alle bewerkingen zijn voorafgegaan door @async
zijn voltooid.
Het is mogelijk om een nog duidelijker inzicht te krijgen in de werking van deze macro's door het bovenstaande voorbeeld verder aan te passen om te zien hoe het verandert onder bepaalde wijzigingen. Stel bijvoorbeeld dat we gewoon de @async
zonder de @sync
:
@time begin
a = cell(nworkers())
for (idx, pid) in enumerate(workers())
println("sending work to $pid")
@async a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
## 0.001429 seconds (27 allocations: 2.234 KB)
Hier stelt de macro @async
ons in staat door te gaan in onze lus, zelfs voordat elke remotecall_fetch()
is voltooid. Maar, voor beter of slechter, we hebben geen @sync
macro om te voorkomen dat de code voorbij deze lus gaat totdat alle bewerkingen van remotecall_fetch()
.
Niettemin wordt elke remotecall_fetch()
nog steeds parallel uitgevoerd, zelfs als we doorgaan. We kunnen dat zien, want als we twee seconden wachten, zal de array a, met de resultaten, het volgende bevatten:
sleep(2)
julia> a
2-element Array{Any,1}:
nothing
nothing
(Het element "niets" is het resultaat van een succesvolle ophaalactie van de resultaten van de slaapfunctie, die geen waarden retourneert)
We kunnen ook zien dat de twee remotecall_fetch()
in wezen op hetzelfde moment starten omdat de print
die eraan voorafgaan ook snel worden uitgevoerd (uitvoer van deze opdrachten die hier niet worden getoond). Vergelijk dit met het volgende voorbeeld waarbij de print
met een vertraging van 2 seconden ten opzichte van elkaar worden uitgevoerd:
Als we de macro @async
op de hele lus plaatsen (in plaats van alleen de binnenste stap ervan), dan zal ons script weer onmiddellijk doorgaan zonder te wachten tot de remotecall_fetch()
. Nu staan we echter alleen toe dat het script voorbij de lus als geheel doorgaat. We staan niet toe dat elke afzonderlijke stap van de lus begint voordat de vorige is voltooid. Als zodanig, in tegenstelling tot in het bovenstaande voorbeeld, heeft de results
twee seconden nadat het script verdergaat na de lus nog steeds één element als #undef
wat aangeeft dat de tweede remotecall_fetch()
nog steeds niet is voltooid.
@time begin
a = cell(nworkers())
@async for (idx, pid) in enumerate(workers())
println("sending work to $pid")
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to
sleep(2)
a
2-element Array{Any,1}:
nothing
#undef
En, niet verrassend, als we @sync
en @async
naast elkaar plaatsen, krijgen we dat elke remotecall_fetch()
opeenvolgend (in plaats van gelijktijdig) wordt uitgevoerd, maar we gaan niet verder in de code totdat elk is voltooid. Met andere woorden, dit zou in wezen het equivalent zijn van als we geen van beide macro's hadden, net zoals sleep(2)
zich in wezen identiek gedraagt als @sync @async sleep(2)
@time begin
a = cell(nworkers())
@sync @async for (idx, pid) in enumerate(workers())
a[idx] = remotecall_fetch(pid, sleep, 2)
end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10
Merk ook op dat het mogelijk is om meer gecompliceerde bewerkingen binnen het bereik van de macro @async
. De documentatie geeft een voorbeeld met een volledige lus binnen het bereik van @async
.
Bedenk dat de hulp voor de synchronisatiemacro's aangeeft dat het "Wacht tot alle dynamisch ingesloten toepassingen van @async
, @spawn
, @spawnat
en @parallel
zijn voltooid." Voor wat als "voltooid" geldt, is het van belang hoe u de taken definieert in het kader van de macro's @sync
en @async
. Overweeg het onderstaande voorbeeld, dat een kleine variatie is op een van de bovenstaande voorbeelden:
@time begin
a = cell(nworkers())
@sync for (idx, pid) in enumerate(workers())
@async a[idx] = remotecall(pid, sleep, 2)
end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)
julia> a
2-element Array{Any,1}:
RemoteRef{Channel{Any}}(2,1,3)
RemoteRef{Channel{Any}}(3,1,4)
Het eerdere voorbeeld duurde ongeveer 2 seconden om uit te voeren, wat aangeeft dat de twee taken parallel werden uitgevoerd en dat het script wachtte op elk om de uitvoering van hun functies te voltooien alvorens verder te gaan. Dit voorbeeld heeft echter een veel kortere tijdevaluatie. De reden is dat voor de toepassing van @sync
de remotecall()
is "voltooid" zodra de werker de taak heeft verzonden. (Merk op dat de resulterende array, hier, alleen RemoteRef
objecttypen bevat, die alleen aangeven dat er iets aan de hand is met een bepaald proces dat in theorie in de toekomst kan worden opgehaald). De bewerking remotecall_fetch()
is daarentegen alleen "voltooid" wanneer de medewerker de melding krijgt dat de taak is voltooid.
Dus als u op zoek bent naar manieren om ervoor te zorgen dat bepaalde bewerkingen met werknemers zijn voltooid voordat u verder gaat in uw script (zoals bijvoorbeeld in dit bericht wordt besproken), is het noodzakelijk om goed na te denken over wat als "voltooid" telt en hoe u meten en vervolgens operationeel maken in uw script.
Werknemers toevoegen
Wanneer u Julia voor het eerst start, is er standaard slechts één proces actief en beschikbaar om aan te werken. U kunt dit verifiëren met:
julia> nprocs()
1
Om te profiteren van parallelle verwerking, moet u eerst extra medewerkers toevoegen die dan beschikbaar zijn voor het werk dat u aan hen toewijst. U kunt dit doen in uw script (of vanuit de interpreter) met behulp van: addprocs(n)
waarbij n
het aantal processen is dat u wilt gebruiken.
U kunt ook processen toevoegen wanneer u Julia start vanaf de opdrachtregel met:
$ julia -p n
waar n
is hoeveel extra processen u wilt toevoegen. Dus als we Julia beginnen met
$ julia -p 2
Wanneer Julia begint, krijgen we:
julia> nprocs()
3