Buscar..


pmap

pmap toma una función (que usted especifica) y la aplica a todos los elementos de una matriz. Este trabajo se divide entre los trabajadores disponibles. Luego, pmap devuelve los resultados de esa función a otra matriz.

addprocs(3)
sqrts = pmap(sqrt, 1:10)

Si la función toma múltiples argumentos, puede suministrar múltiples vectores a pmap

dots = pmap(dot, 1:10, 11:20)

@parallel embargo, al igual que con @parallel , si la función dada a pmap no está en la base Julia (es decir, está definida por el usuario o definida en un paquete), primero debe asegurarse de que la función esté disponible para todos los trabajadores:

@everywhere begin
    function rand_det(n)
        det(rand(n,n))
    end
end

determinants = pmap(rand_det, 1:10)

Vea también este SO Q&A.

@paralela

@parallel se puede usar para paralelizar un bucle, dividiendo los pasos del bucle entre diferentes trabajadores. Como un ejemplo muy simple:

addprocs(3)

a = collect(1:10)

for idx = 1:10
    println(a[idx])
end

Para un ejemplo un poco más complejo, considere:

@time begin
    @sync begin
        @parallel for idx in 1:length(a)
            sleep(a[idx])
        end
    end
end
27.023411 seconds (13.48 k allocations: 762.532 KB)
julia> sum(a)
55

Por lo tanto, vemos que si hubiéramos ejecutado este bucle sin @parallel , habrían tardado 55 segundos, en lugar de 27, en ejecutarse.

También podemos suministrar un operador de reducción para la macro @parallel . Supongamos que tenemos una matriz, queremos sumar cada columna de la matriz y luego multiplicar estas sumas entre sí:

A = rand(100,100);

@parallel (*) for idx = 1:size(A,1)
    sum(A[:,idx])
end

Hay varias cosas importantes que se deben tener en cuenta al usar @parallel para evitar comportamientos inesperados.

Primero: si desea usar cualquier función en sus bucles que no esté en la base Julia (por ejemplo, cualquiera de las funciones que defina en su script o que importe desde paquetes), debe hacer que esas funciones estén disponibles para los trabajadores. Así, por ejemplo, lo siguiente no funcionaría:

myprint(x) = println(x)
for idx = 1:10
    myprint(a[idx])
end

En su lugar, necesitaríamos usar:

@everywhere begin
    function myprint(x) 
        println(x)
    end
end

@parallel for idx in 1:length(a)
    myprint(a[idx])
end

Segundo Aunque cada trabajador podrá acceder a los objetos en el alcance del controlador, no podrá modificarlos. Así

a = collect(1:10)
@parallel for idx = 1:length(a)
   a[idx] += 1
end

julia> a'
1x10 Array{Int64,2}:
 1  2  3  4  5  6  7  8  9  10

Considerando que, si hubiéramos ejecutado el bucle sin el @paralelo, habría modificado con éxito la matriz a .

PARA ABORDAR ESTO, podemos hacer a objeto de tipo SharedArray para que cada trabajador pueda acceder y modificarlo:

a = convert(SharedArray{Float64,1}, collect(1:10))
@parallel for idx = 1:length(a)
    a[idx] += 1
end

julia> a'
1x10 Array{Float64,2}:
 2.0  3.0  4.0  5.0  6.0  7.0  8.0  9.0  10.0  11.0

@spawn y @spawnat

Las macros @spawn y @spawnat son dos de las herramientas que Julia pone a disposición para asignar tareas a los trabajadores. Aquí hay un ejemplo:

julia> @spawnat 2 println("hello world")
RemoteRef{Channel{Any}}(2,1,3)

julia>  From worker 2:  hello world

Ambas macros evaluarán una expresión en un proceso de trabajo. La única diferencia entre los dos es que @spawnat permite elegir qué trabajador evaluará la expresión (en el ejemplo anterior se especifica el trabajador 2), mientras que con @spawn se elegirá automáticamente un trabajador, según la disponibilidad.

En el ejemplo anterior, simplemente el trabajador 2 ejecutó la función println. No había nada de interés para volver o recuperar de esto. Sin embargo, a menudo, la expresión que enviamos al trabajador producirá algo que deseamos recuperar. Observe que en el ejemplo anterior, cuando llamamos a @spawnat , antes de que @spawnat la impresión del trabajador 2, vimos lo siguiente:

RemoteRef{Channel{Any}}(2,1,3)

Esto indica que la macro @spawnat devolverá un objeto de tipo RemoteRef . Este objeto, a su vez, contendrá los valores de retorno de nuestra expresión que se envía al trabajador. Si queremos recuperar esos valores, lo primero que puede asignar el RemoteRef que @spawnat vuelve a un objeto y, a continuación, y luego usar el fetch() la función que opera en un RemoteRef objeto de tipo, para recuperar los resultados almacenados a partir de una evaluación realizada en un trabajador.

julia> result = @spawnat 2 2 + 5
RemoteRef{Channel{Any}}(2,1,26)

julia> fetch(result)
7

La clave para poder usar @spawn efectiva es entender la naturaleza detrás de las expresiones en las que opera. Usar @spawn para enviar comandos a los trabajadores es un poco más complicado que solo escribir directamente lo que escribiría si estuviera ejecutando un "intérprete" en uno de los trabajadores o ejecutando el código de forma nativa en ellos. Por ejemplo, supongamos que deseamos usar @spawnat para asignar un valor a una variable en un trabajador. Podríamos intentar:

@spawnat 2 a = 5
RemoteRef{Channel{Any}}(2,1,2)

¿Funcionó? Bueno, veamos si el trabajador 2 intenta imprimir a archivo.

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,4)

julia> 

No pasó nada. ¿Por qué? Podemos investigar esto más usando fetch() como anteriormente. fetch() puede ser muy útil porque no solo recuperará resultados exitosos sino también mensajes de error. Sin él, tal vez ni siquiera sepamos que algo ha ido mal.

julia> result = @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,5)

julia> fetch(result)
ERROR: On worker 2:
UndefVarError: a not defined

El mensaje de error dice que a no está definido en el trabajador 2. Pero, ¿por qué es esto? La razón es que necesitamos incluir nuestra operación de asignación en una expresión que luego usamos @spawn para decirle al trabajador que evalúe. A continuación se muestra un ejemplo, con la siguiente explicación:

julia> @spawnat 2 eval(:(a = 2))
RemoteRef{Channel{Any}}(2,1,7)

julia> @spawnat 2 println(a)
RemoteRef{Channel{Any}}(2,1,8)

julia>  From worker 2:  2

La sintaxis :() es lo que Julia usa para designar expresiones . Luego usamos la función eval() en Julia, que evalúa una expresión, y usamos la macro @spawnat para indicar que la expresión se evalúe en el trabajador 2.

También podríamos lograr el mismo resultado que:

julia> @spawnat(2, eval(parse("c = 5")))
RemoteRef{Channel{Any}}(2,1,9)

julia> @spawnat 2 println(c)
RemoteRef{Channel{Any}}(2,1,10)

julia>  From worker 2:  5

Este ejemplo demuestra dos nociones adicionales. Primero, vemos que también podemos crear una expresión usando la función parse() llamada en una cadena. En segundo lugar, vemos que podemos usar paréntesis al llamar a @spawnat , en situaciones donde esto podría hacer que nuestra sintaxis sea más clara y manejable.

Cuándo usar @parallel vs. pmap

La documentación de Julia informa que

pmap () está diseñado para el caso en el que cada llamada de función realiza una gran cantidad de trabajo. En contraste, @parallel for puede manejar situaciones donde cada iteración es pequeña, tal vez simplemente sumando dos números.

Hay varias razones para esto. Primero, pmap incurre en mayores costos de inicio, lo que pmap empleos en los trabajadores. Por lo tanto, si los trabajos son muy pequeños, estos costos de inicio pueden volverse ineficientes. Sin embargo, a la pmap , pmap hace un trabajo "más inteligente" al asignar trabajos entre los trabajadores. En particular, crea una cola de trabajos y envía un nuevo trabajo a cada trabajador siempre que ese trabajador esté disponible. @parallel en contraste, divide todo el trabajo que se debe hacer entre los trabajadores cuando se llama. Como tal, si algunos trabajadores tardan más en sus trabajos que otros, puede terminar con una situación en la que la mayoría de sus trabajadores han terminado y están inactivos, mientras que unos pocos permanecen activos durante un tiempo excesivo, terminando sus trabajos. Sin embargo, tal situación es menos probable que ocurra con trabajos muy pequeños y simples.

Lo siguiente ilustra esto: supongamos que tenemos dos trabajadores, uno de los cuales es lento y el otro es el doble de rápido. Idealmente, queremos dar al trabajador rápido el doble de trabajo que al trabajador lento. (O, podríamos tener trabajos rápidos y lentos, pero el principal es exactamente el mismo). pmap logrará esto, pero @parallel no lo hará.

Para cada prueba, inicializamos lo siguiente:

addprocs(2)

@everywhere begin
    function parallel_func(idx)
        workernum = myid() - 1 
        sleep(workernum)
        println("job $idx")
    end
end

Ahora, para la prueba @parallel , ejecutamos lo siguiente:

@parallel for idx = 1:12
    parallel_func(idx)
end

Y vuelve la salida de impresión:

julia>     From worker 2:    job 1
    From worker 3:    job 7
    From worker 2:    job 2
    From worker 2:    job 3
    From worker 3:    job 8
    From worker 2:    job 4
    From worker 2:    job 5
    From worker 3:    job 9
    From worker 2:    job 6
    From worker 3:    job 10
    From worker 3:    job 11
    From worker 3:    job 12

Es casi dulce. Los trabajadores han "compartido" el trabajo de manera equitativa. Tenga en cuenta que cada trabajador ha completado 6 trabajos, aunque el trabajador 2 es dos veces más rápido que el trabajador 3. Puede ser conmovedor, pero ineficiente.

Para la prueba de pmap , ejecuto lo siguiente:

pmap(parallel_func, 1:12)

y obtener la salida:

From worker 2:    job 1
From worker 3:    job 2
From worker 2:    job 3
From worker 2:    job 5
From worker 3:    job 4
From worker 2:    job 6
From worker 2:    job 8
From worker 3:    job 7
From worker 2:    job 9
From worker 2:    job 11
From worker 3:    job 10
From worker 2:    job 12

Ahora, tenga en cuenta que el trabajador 2 ha realizado 8 trabajos y el trabajador 3 ha realizado 4. Esto es exactamente proporcional a su velocidad y lo que queremos para una eficiencia óptima. pmap es un maestro de tareas difíciles, de cada uno según su capacidad.

@async y @sync

Según la documentación en ?@async , " @async envuelve una expresión en una tarea". Lo que esto significa es que para cualquier cosa que se encuentre dentro de su alcance, Julia iniciará esta tarea ejecutándose, pero luego procederá a lo que sigue en el script sin esperar a que la tarea se complete. Así, por ejemplo, sin la macro obtendrás:

julia> @time sleep(2)
  2.005766 seconds (13 allocations: 624 bytes)

Pero con la macro, obtienes:

julia> @time @async sleep(2)
  0.000021 seconds (7 allocations: 657 bytes)
Task (waiting) @0x0000000112a65ba0

julia> 

De este modo, Julia permite que el script continúe (y que la macro @time se ejecute por completo) sin esperar a que la tarea (en este caso, durmiendo durante dos segundos) se complete.

La macro @sync , por el contrario, "esperará hasta que se completen todos los usos dinámicamente cerrados de @async , @spawn , @spawnat y @parallel ". (De acuerdo con la documentación bajo ?@sync ). Así, vemos:

julia> @time @sync @async sleep(2)
  2.002899 seconds (47 allocations: 2.986 KB)
Task (done) @0x0000000112bd2e00

En este simple ejemplo, entonces, no tiene sentido incluir una sola instancia de @async y @sync juntas. Pero, donde @sync puede ser útil es donde tiene @async aplicado a múltiples operaciones que desea permitir que comiencen todas al mismo tiempo sin esperar a que se complete cada una.

Por ejemplo, supongamos que tenemos varios trabajadores y nos gustaría comenzar cada uno de ellos trabajando en una tarea simultáneamente y luego obtener los resultados de esas tareas. Un intento inicial (pero incorrecto) podría ser:

addprocs(2)
@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 4.011576 seconds (177 allocations: 9.734 KB)

El problema aquí es que el bucle espera a que remotecall_fetch() cada operación remotecall_fetch() , es decir, para que cada proceso complete su trabajo (en este caso durante 2 segundos) antes de continuar para comenzar la próxima operación remotecall_fetch() . En términos de una situación práctica, no estamos obteniendo los beneficios del paralelismo aquí, ya que nuestros procesos no están haciendo su trabajo (es decir, durmiendo) simultáneamente.

Sin embargo, podemos corregir esto utilizando una combinación de las macros @async y @sync :

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 2.009416 seconds (274 allocations: 25.592 KB)

Ahora, si contamos cada paso del bucle como una operación separada, vemos que hay dos operaciones separadas precedidas por la macro @async . La macro permite que se inicie cada uno de estos, y que el código continúe (en este caso hasta el siguiente paso del bucle) antes de que finalice cada uno. Sin embargo, el uso de la macro @sync , cuyo alcance abarca todo el bucle, significa que no permitiremos que el script @async ese bucle hasta que todas las operaciones precedidas por @async hayan finalizado.

Es posible obtener una comprensión aún más clara del funcionamiento de estas macros ajustando aún más el ejemplo anterior para ver cómo cambia bajo ciertas modificaciones. Por ejemplo, supongamos que solo tenemos @async sin @sync :

@time begin
    a = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        @async a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
## 0.001429 seconds (27 allocations: 2.234 KB)

Aquí, la macro @async nos permite continuar en nuestro bucle incluso antes de que cada operación remotecall_fetch() termine de ejecutarse. Pero, para bien o para mal, no tenemos una macro @sync para evitar que el código continúe más allá de este bucle hasta que todas las operaciones remotecall_fetch() terminen.

Sin embargo, cada operación remotecall_fetch() todavía se ejecuta en paralelo, incluso una vez que continuamos. Podemos ver eso porque si esperamos dos segundos, entonces la matriz a, que contiene los resultados, contendrá:

sleep(2)
julia> a
2-element Array{Any,1}:
 nothing
 nothing

(El elemento "nada" es el resultado de una recuperación exitosa de los resultados de la función de suspensión, que no devuelve ningún valor)

También podemos ver que las dos operaciones remotecall_fetch() comienzan esencialmente al mismo tiempo porque los comandos de print que los preceden también se ejecutan en rápida sucesión (la salida de estos comandos no se muestra aquí). Contraste esto con el siguiente ejemplo donde los comandos de print ejecutan a un intervalo de 2 segundos entre sí:

Si colocamos la macro @async en todo el bucle (en lugar de solo el paso interno), nuevamente nuestra secuencia de comandos continuará inmediatamente sin esperar a que remotecall_fetch() operaciones remotecall_fetch() . Ahora, sin embargo, solo permitimos que el script continúe más allá del bucle en su totalidad. No permitimos que cada paso individual del bucle comience antes de que termine el anterior. Como tal, a diferencia del ejemplo anterior, dos segundos después de que el script continúe después del bucle, la matriz de results aún tiene un elemento como #undef que indica que la segunda operación remotecall_fetch() aún no se ha completado.

@time begin
    a = cell(nworkers())
    @async for (idx, pid) in enumerate(workers())
        println("sending work to $pid")
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 0.001279 seconds (328 allocations: 21.354 KB)
# Task (waiting) @0x0000000115ec9120
## This also allows us to continue to

sleep(2)

a
2-element Array{Any,1}:
    nothing
 #undef    

Y, como es @sync , si colocamos @sync y @async uno al lado del otro, conseguimos que cada remotecall_fetch() ejecute secuencialmente (en lugar de simultáneamente) pero no continuamos en el código hasta que cada uno haya finalizado. En otras palabras, esto sería esencialmente equivalente a si no tuviéramos ninguna macro en su lugar, al igual que sleep(2) comporta de manera idéntica a @sync @async sleep(2)

@time begin
    a = cell(nworkers())
    @sync @async for (idx, pid) in enumerate(workers())
        a[idx] = remotecall_fetch(pid, sleep, 2)
    end
end
# 4.019500 seconds (4.20 k allocations: 216.964 KB)
# Task (done) @0x0000000115e52a10

Tenga en cuenta también que es posible tener operaciones más complicadas dentro del alcance de la macro @async . La documentación proporciona un ejemplo que contiene un bucle completo dentro del alcance de @async .

Recuerde que la ayuda para las macros de sincronización indica que "Espere hasta que se completen todos los usos dinámicamente incluidos de @async , @spawn , @spawnat y @parallel ". Para los fines de lo que se considera "completo", importa cómo defina las tareas dentro del alcance de las macros @sync y @async . Considere el siguiente ejemplo, que es una ligera variación en uno de los ejemplos dados anteriormente:

@time begin
    a = cell(nworkers())
    @sync for (idx, pid) in enumerate(workers())
        @async a[idx] = remotecall(pid, sleep, 2)
    end
end
## 0.172479 seconds (93.42 k allocations: 3.900 MB)

julia> a
2-element Array{Any,1}:
 RemoteRef{Channel{Any}}(2,1,3)
 RemoteRef{Channel{Any}}(3,1,4)

El ejemplo anterior tardó aproximadamente 2 segundos en ejecutarse, lo que indica que las dos tareas se ejecutaron en paralelo y que el script espera a que cada una complete la ejecución de sus funciones antes de continuar. Este ejemplo, sin embargo, tiene una evaluación de tiempo mucho menor. El motivo es que, a los fines de @sync la operación remotecall() ha "finalizado" una vez que ha enviado al trabajador el trabajo a realizar. (Tenga en cuenta que la matriz resultante, a, aquí, solo contiene tipos de objetos RemoteRef , que solo indican que algo está sucediendo con un proceso en particular que, en teoría, podría obtenerse en algún momento en el futuro). Por el contrario, la operación remotecall_fetch() solo ha "finalizado" cuando recibe el mensaje del trabajador de que su tarea está completa.

Por lo tanto, si está buscando formas de asegurarse de que ciertas operaciones con los trabajadores se hayan completado antes de continuar con su secuencia de comandos (como se explica en esta publicación ), es necesario pensar detenidamente qué se considera "completo" y cómo lo hará. medir y luego operacionalizar eso en su script.

Agregando trabajadores

Cuando inicie Julia por primera vez, de manera predeterminada, solo habrá un proceso en ejecución y estará disponible para dar trabajo. Puedes verificar esto usando:

julia> nprocs()
1

Para aprovechar el procesamiento paralelo, primero debe agregar trabajadores adicionales que luego estarán disponibles para realizar el trabajo que les asigne. Puede hacer esto dentro de su script (o desde el intérprete) usando: addprocs(n) donde n es el número de procesos que desea usar.

Alternativamente, puede agregar procesos cuando inicie Julia desde la línea de comando usando:

$ julia -p n

donde n es cuántos procesos adicionales desea agregar. Así, si empezamos con Julia

$ julia -p 2

Cuando Julia empiece obtendremos:

julia> nprocs()
3


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow