Поиск…


ртар

pmap принимает функцию (которую вы указываете) и применяет ее ко всем элементам массива. Эта работа разделена среди доступных работников. pmap затем возвращает результаты этой функции в другой массив.

addprocs(3)
sqrts = pmap(sqrt, 1:10)

если вы используете несколько аргументов, вы можете предоставить несколько векторов для pmap

dots = pmap(dot, 1:10, 11:20)

Однако, как и в случае @parallel , если функция, заданная для pmap , не находится в базе Julia (т.е. она определяется пользователем или определена в пакете), вы должны убедиться, что функция доступна всем сотрудникам:

@everywhere begin
    function rand_det(n)
        det(rand(n,n))
    end
end

determinants = pmap(rand_det, 1:10)

См. Также этот вопрос Q & A.

@параллельно

@parallel можно использовать для параллелизации цикла, делящего шаги цикла на разных работников. В качестве очень простого примера:

addprocs(3)

a = collect(1:10)

for idx = 1:10
    println(a[idx])
end

Рассмотрим несколько более сложный пример:

@time begin
    @sync begin
        @parallel for idx in 1:length(a)
            sleep(a[idx])
        end
    end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55

Таким образом, мы видим, что если бы мы выполнили этот цикл без @parallel , для выполнения потребовалось бы 55 секунд, а не 27.

Мы также можем предоставить оператора сокращения для макроса @parallel . Предположим, что у нас есть массив, мы хотим суммировать каждый столбец массива, а затем умножать эти суммы друг на друга:

A = rand(100,100);

@parallel (*) for idx = 1:size(A,1)
    sum(A[:,idx])
end

Есть несколько важных вещей, которые следует иметь в виду при использовании @parallel чтобы избежать неожиданного поведения.

Во-первых: если вы хотите использовать любые функции в своих циклах, которые не находятся в базе Julia (например, любые функции, которые вы определяете в своем скрипте или импортируете из пакетов), вы должны сделать эти функции доступными для рабочих. Таким образом, например, следующее не будет работать:

myprint(x) = println(x)
for idx = 1:10
    myprint(a[idx])
end

Вместо этого нам нужно будет использовать:

@everywhere begin
    function myprint(x) 
        println(x)
    end
end

@parallel for idx in 1:length(a)
    myprint(a[idx])
end

Во-вторых, хотя каждый рабочий сможет получить доступ к объектам в объеме контроллера, они не смогут их модифицировать. таким образом

a = collect(1:10)
@parallel for idx = 1:length(a)
   a[idx] += 1
end

julia> a'
1x10 Array{Int64,2}:
 1  2  3  4  5  6  7  8  9  10

Если бы мы выполнили цикл с @parallel, он бы успешно модифицировал массив a .

Чтобы сообщить a этом, мы можем вместо этого создать SharedArray типа SharedArray чтобы каждый рабочий мог его получить и изменить:

a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
    a[idx] += 1
end

julia> a'
1x10 Array{Float64,2}:
 2.0  3.0  4.0  5.0  6.0  7.0  8.0  9.0  10.0  11.0

@spawn и @spawnat

@spawn и @spawnat - два из инструментов, которые Джулия предоставляет для назначения задач рабочим. Вот пример:

julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)

julia>  From worker 2:  hello world

Оба этих макроса будут оценивать выражение в рабочем процессе. Единственное различие между ними состоит в том, что @spawnat позволяет вам выбирать, какой рабочий будет оценивать выражение (в примере выше рабочий 2 указан), тогда как с @spawn рабочий будет автоматически выбран в зависимости от доступности.

В приведенном выше примере у нас просто был рабочий 2, выполняющий функцию println. Не было ничего интересного, чтобы вернуться или получить от этого. Часто, однако, выражение, которое мы отправили работнику, даст то, что мы хотим получить. Обратите внимание на приведенный выше пример, когда мы вызвали @spawnat , прежде чем мы получили распечатку с рабочего 2, мы увидели следующее:

RemoteRef{Channel{Any}}(2,1,3)

Это указывает на то, что @spawnat макрос будет возвращать RemoteRef объект типа. Этот объект, в свою очередь, будет содержать возвращаемые значения из нашего выражения, которое отправляется работнику. Если мы хотим получить эти значения, мы можем сначала назначить RemoteRef который @spawnat вернется к объекту, а затем, а затем использовать функцию fetch() которая работает с RemoteRef типа RemoteRef , для получения результатов, сохраненных в результате оценки, выполненной на рабочий.

julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)

julia> fetch(result)
7

Ключом к эффективному использованию @spawn является понимание природы выражений , на которых он работает. Использование @spawn для отправки команд для работников немного сложнее, чем просто набирать непосредственно то, что вы набираете, если вы используете «интерпретатор» для одного из рабочих или выполняете код изначально на них. Например, предположим, что мы хотели использовать @spawnat для назначения значения переменной для рабочего. Мы могли бы попробовать:

@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)

Это сработало? Ну, давайте посмотрим, попробуем ли работника 2 напечатать a .

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)

julia> 

Ничего не случилось. Зачем? Мы можем исследовать это, используя вышеприведенную функцию fetch() . fetch() может быть очень удобной, поскольку она будет извлекать не только успешные результаты, но и сообщения об ошибках. Без этого мы могли бы даже не знать, что что-то пошло не так.

julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)

julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined

В сообщении об ошибке указано, что a не определено для рабочего 2. Но почему это? Причина в том, что нам нужно превратить нашу операцию назначения в выражение, которое мы затем используем @spawn чтобы сообщить работнику оценить. Ниже приведен пример, поясняющий следующее:

julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)

julia>  From worker 2:  2

Синтаксис :() - это то, что Джулия использует для обозначения выражений . Затем мы используем функцию eval() в Julia, которая вычисляет выражение, и мы используем макрос @spawnat чтобы @spawnat что выражение будет оцениваться на рабочем 2.

Мы могли бы достичь такого же результата, как:

julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)

julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)

julia>  From worker 2:  5

Этот пример демонстрирует два дополнительных понятия. Во-первых, мы видим, что мы также можем создать выражение, используя функцию parse() вызываемую в строке. Во-вторых, мы видим, что мы можем использовать круглые скобки при вызове @spawnat , в ситуациях, когда это может сделать наш синтаксис более понятным и управляемым.

Когда использовать @parallel vpmap

Юлия документация сообщает , что

pmap () предназначен для случая, когда каждый вызов функции выполняет большой объем работы. Напротив, @parallel for может обрабатывать ситуации, когда каждая итерация крошечная, возможно, просто суммируя два числа.

На это есть несколько причин. Во-первых, pmap берет на себя большие затраты на запуск рабочих мест. Таким образом, если задания очень малы, эти затраты на запуск могут стать неэффективными. Напротив, однако, pmap делает «умную» работу по распределению рабочих мест среди работников. В частности, он создает очередь заданий и отправляет новое задание каждому работнику всякий раз, когда этот рабочий становится доступным. @parallel , напротив, разворачивает всю работу, которая должна выполняться среди рабочих, когда она называется. Таким образом, если некоторые работники занимают больше времени на своих рабочих местах, чем другие, вы можете столкнуться с ситуацией, когда большинство ваших работников закончили и простаивают, в то время как некоторые из них остаются активными в течение чрезмерного количества времени, заканчивая свою работу. Однако такая ситуация реже встречается с очень маленькими и простыми рабочими местами.

Ниже показано следующее: предположим, что у нас есть два рабочих, один из которых медленный, а другой из них в два раза быстрее. В идеале мы хотели бы дать быструю рабочую работу в два раза больше работы, чем медленный рабочий. (или мы могли бы иметь быструю и медленную работу, но главное - то же самое). pmap выполнит это, но @parallel не будет.

Для каждого теста мы инициализируем следующее:

addprocs(2)

@everywhere begin
    function parallel_func(idx)
        workernum = myid() - 1 
        sleep(workernum)
        println("job $idx")
    end
end

Теперь, для теста @parallel , мы запускаем следующее:

@parallel for idx = 1:12
    parallel_func(idx)
end

И верните распечатку:

julia>     From worker 2:    job 1
    From worker 3:    job 7
    From worker 2:    job 2
    From worker 2:    job 3
    From worker 3:    job 8
    From worker 2:    job 4
    From worker 2:    job 5
    From worker 3:    job 9
    From worker 2:    job 6
    From worker 3:    job 10
    From worker 3:    job 11
    From worker 3:    job 12

Это почти сладкое. Рабочие «разделили» работу равномерно. Обратите внимание, что каждый рабочий выполнил 6 заданий, хотя рабочий 2 в два раза быстрее, чем рабочий 3. Он может касаться, но он неэффективен.

Для теста pmap я запускаю следующее:

pmap(parallel_func, 1:12)

и получить результат:

From worker 2:    job 1
From worker 3:    job 2
From worker 2:    job 3
From worker 2:    job 5
From worker 3:    job 4
From worker 2:    job 6
From worker 2:    job 8
From worker 3:    job 7
From worker 2:    job 9
From worker 2:    job 11
From worker 3:    job 10
From worker 2:    job 12

Теперь обратите внимание, что работник 2 выполнил 8 заданий, а работник 3 выполнил 4. Это точно соответствует их скорости и тому, что мы хотим для оптимальной эффективности. pmap - это сложный мастер задачи - от каждого в зависимости от их способности.

@async и @sync

Согласно документации под ?@async , « @async завершает выражение в задаче». Это означает, что для того, что попадает в сферу его охвата, Джулия начнет выполнение этой задачи, а затем перейдет к тому, что будет дальше в скрипте, не дожидаясь завершения задачи. Так, например, без макроса вы получите:

julia> @time sleep(2)
  2.005766 seconds (13 allocations: 624 bytes)

Но с макросом вы получаете:

julia> @time @async sleep(2)
  0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0

julia> 

Таким образом, Julia позволяет сценарию продолжить (и макрос @time полностью выполнить), не дожидаясь завершения задачи (в данном случае, спать в течение двух секунд).

Макрос @sync , напротив, будет «Подождать, пока все динамически закрытые приложения @async , @spawn , @spawnat и @parallel будут завершены». (в соответствии с документацией под ?@sync ). Таким образом, мы видим:

julia> @time @sync @async sleep(2)
  2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00

В этом простом примере нет смысла включать один экземпляр @async и @sync вместе. Но, где @sync может быть полезным, вы можете использовать @async для нескольких операций, которые вы хотите разрешить всем, запускать сразу, не дожидаясь завершения каждого из них.

Например, предположим, что у нас есть несколько сотрудников, и мы бы хотели, чтобы каждый из них работал над задачей одновременно, а затем извлекал результаты из этих задач. Первоначальная (но некорректная) попытка может быть:

addprocs(2)
@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 4.011576 seconds (177 allocations: 9.734 KB)

Проблема заключается в том, что цикл ждет remotecall_fetch() каждой remotecall_fetch() , то есть для каждого процесса завершить свою работу (в этом случае спать в течение 2 секунд), прежде чем продолжить выполнение следующей remotecall_fetch() . Что касается практической ситуации, мы не получаем преимуществ параллелизма здесь, так как наши процессы не выполняют свою работу (т.е. спать) одновременно.

Однако мы можем исправить это, используя комбинацию макросов @async и @sync :

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 2.009416 seconds (274 allocations: 25.592 KB)

Теперь, если мы посчитаем каждый шаг цикла как отдельную операцию, мы видим, что есть два отдельных операции, которым предшествует макрос @async . Макрос позволяет каждому из них запускаться, а код продолжать (в этом случае на следующий шаг цикла) до каждого завершения. Но использование макроса @sync , область видимости которого охватывает весь цикл, означает, что мы не допустим, чтобы сценарий прошел этот цикл до тех пор, пока все операции, предшествующие @async не будут завершены.

Можно получить еще более четкое представление о работе этих макросов путем дальнейшей настройки приведенного выше примера, чтобы увидеть, как он изменяется при определенных модификациях. Например, предположим, что мы просто имеем @async без @sync :

@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 0.001429 seconds (27 allocations: 2.234 KB)

Здесь макрос @async позволяет нам продолжить работу в нашем цикле еще до remotecall_fetch() каждой remotecall_fetch() . Но, к лучшему или худшему, у нас нет макроса @sync чтобы предотвратить продолжение кода после этого цикла до тех пор, пока все операции remotecall_fetch() закончатся.

Тем не менее, каждая remotecall_fetch() все еще работает параллельно, даже если мы продолжим. Мы можем это видеть, потому что если мы подождем две секунды, то массив a, содержащий результаты, будет содержать:

sleep(2)
julia> a
2-element Array{Any,1}:
 nothing
 nothing

(Элемент «ничего» является результатом успешной выборки результатов функции сна, которая не возвращает никаких значений)

Мы также видим, что две remotecall_fetch() начинаются практически в одно и то же время, потому что команды print которые предшествуют им, также выполняются с быстрой последовательностью (вывод этих команд не показан здесь). Сравните это со следующим примером, когда команды print выполняются с интервалом в 2 секунды друг от друга:

Если мы @async макрос @async во весь цикл (а не только на его внутренний шаг), то снова наш скрипт будет продолжен немедленно, не дожидаясь завершения remotecall_fetch() . Однако теперь мы разрешаем сценарию продолжать цикл в целом. Мы не разрешаем каждому отдельному шагу цикла запускать предыдущий. Таким образом, в отличие от приведенного выше примера, через две секунды после того, как сценарий продолжается после цикла, массив results прежнему имеет один элемент в качестве #undef указывающий, что вторая remotecall_fetch() прежнему не завершена.

@time begin
    a = cell(nworkers())
    @async for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to

sleep(2)

a
2-element Array{Any,1}:
    nothing
 #undef    

И неудивительно, что если мы поместим @sync и @async рядом друг с другом, мы получим, что каждый remotecall_fetch() работает последовательно (а не одновременно), но мы не продолжаем в коде до тех пор, пока все не закончится. Другими словами, это было бы фактически эквивалентно тому, если бы у нас не было макроса на месте, так же как sleep(2) ведет себя по существу идентично @sync @async sleep(2)

@time begin
    a = cell(nworkers())
    @sync @async for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10

Также обратите внимание, что в @async макроса @async можно выполнять более сложные операции. В документации приведен пример, содержащий полный цикл в области @async .

Напомним, что справка для макросов синхронизации утверждает, что она будет «Подождать, пока все динамически закрытые приложения @async , @spawn , @spawnat и @parallel будут завершены». Для целей, которые считаются «полными», важно, как вы определяете задачи в рамках макросов @sync и @async . Рассмотрим приведенный ниже пример, который является небольшим изменением на одном из приведенных выше примеров:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall(pid, sleep, 2)
    end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)

julia> a
2-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,3)
 RemoteRef{Channel{Any}}(3,1,4)

Более ранний пример занял примерно 2 секунды для выполнения, что указывает на то, что две задачи выполнялись параллельно и что сценарий ожидает завершения каждого выполнения своих функций перед продолжением. Однако этот пример имеет гораздо более низкую оценку времени. Причина в том, что для целей @sync remotecall() была «завершена», как только она отправила работнику работу. (Обратите внимание, что результирующий массив, здесь, просто содержит RemoteRef объектов RemoteRef , которые просто указывают на то, что происходит что-то происходящее с конкретным процессом, который теоретически может быть получен в какой-то момент в будущем). Напротив, remotecall_fetch() только «закончена», когда получает сообщение от работника о завершении своей задачи.

Таким образом, если вы ищете способы обеспечения того, чтобы определенные операции с работниками были завершены до перехода в ваш сценарий (как, например, обсуждается в этом сообщении ), необходимо тщательно подумать о том, что считается «полным» и как вы будете измерьте, а затем примените это в своем сценарии.

Добавление рабочих

Когда вы впервые запускаете Julia, по умолчанию будет работать только один процесс и доступен для работы. Вы можете проверить это, используя:

julia> nprocs()
1

Чтобы воспользоваться параллельной обработкой, вы должны сначала добавить дополнительных работников, которые затем будут доступны для выполнения работы, которую вы им назначаете. Вы можете сделать это в своем скрипте (или из интерпретатора), используя: addprocs(n) где n - количество процессов, которые вы хотите использовать.

Кроме того, вы можете добавлять процессы, когда вы запускаете Julia из командной строки, используя:

$ julia -p n

где n - количество дополнительных процессов, которые вы хотите добавить. Таким образом, если мы начнем Джулию с

$ julia -p 2

Когда Джулия начнет, мы получим:

julia> nprocs()
3


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow