Buscar..


Sintaxis

  • q = Queue.nuevo
  • objeto q.push
  • q << objeto # igual que #push
  • q.pop # => objeto

Múltiples trabajadores un fregadero

Queremos recopilar datos creados por múltiples trabajadores.

Primero creamos una cola:

sink = Queue.new

Luego 16 trabajadores, todos generando un número aleatorio y empujándolo hacia el sumidero:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

Y para obtener los datos, convierta una cola en una matriz:

data = [].tap { |a| a << sink.pop until sink.empty? }

Una fuente de trabajadores múltiples

Queremos procesar los datos en paralelo.

Vamos a llenar la fuente con algunos datos:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Luego crea algunos trabajadores para procesar los datos:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

One Source - Pipeline of Work - One Sink

Queremos procesar datos en paralelo y empujarlos hacia abajo para que otros trabajadores los procesen.

Como los trabajadores consumen y producen datos, tenemos que crear dos colas:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

La primera oleada de trabajadores lee un elemento de first_input_source , procesa el elemento y escribe los resultados en first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

La segunda ola de trabajadores usa first_output_sink como su fuente de entrada y lee, luego el proceso escribe en otro receptor de salida:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Ahora second_output_sink es el sumidero, vamos a convertirlo en una matriz:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Empujando datos en una cola - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • No hay marca de agua alta, las colas pueden crecer infinitamente.
  • #push nunca bloquea

Extraer datos de una cola - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop se bloqueará hasta que haya algunos datos disponibles.
  • #pop se puede utilizar para la sincronización.

Sincronización - Después de un punto en el tiempo

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Convertir una cola en una matriz

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

O un trazador de líneas :

[].tap { |array| array < queue.pop until queue.empty? }

Fusionando dos colas

  • Para evitar el bloqueo infinito, la lectura de las colas no debería ocurrir en la combinación de hilos en la que está ocurriendo.
  • Para evitar la sincronización o la espera infinita de una de las colas mientras que otra tiene datos, la lectura de las colas no debería ocurrir en el mismo hilo.

Empecemos definiendo y llenando dos colas:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Deberíamos crear otra cola e introducir datos de otros subprocesos en ella:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Si sabe que puede consumir ambas colas completamente (la velocidad de consumo es mayor que la producción, no se quedará sin RAM), hay un enfoque más simple:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow