Ruby Language
Cola
Buscar..
Sintaxis
- q = Queue.nuevo
- objeto q.push
- q << objeto # igual que #push
- q.pop # => objeto
Múltiples trabajadores un fregadero
Queremos recopilar datos creados por múltiples trabajadores.
Primero creamos una cola:
sink = Queue.new
Luego 16 trabajadores, todos generando un número aleatorio y empujándolo hacia el sumidero:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
Y para obtener los datos, convierta una cola en una matriz:
data = [].tap { |a| a << sink.pop until sink.empty? }
Una fuente de trabajadores múltiples
Queremos procesar los datos en paralelo.
Vamos a llenar la fuente con algunos datos:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Luego crea algunos trabajadores para procesar los datos:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
One Source - Pipeline of Work - One Sink
Queremos procesar datos en paralelo y empujarlos hacia abajo para que otros trabajadores los procesen.
Como los trabajadores consumen y producen datos, tenemos que crear dos colas:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
La primera oleada de trabajadores lee un elemento de first_input_source
, procesa el elemento y escribe los resultados en first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
La segunda ola de trabajadores usa first_output_sink
como su fuente de entrada y lee, luego el proceso escribe en otro receptor de salida:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Ahora second_output_sink
es el sumidero, vamos a convertirlo en una matriz:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Empujando datos en una cola - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- No hay marca de agua alta, las colas pueden crecer infinitamente.
-
#push
nunca bloquea
Extraer datos de una cola - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
se bloqueará hasta que haya algunos datos disponibles. -
#pop
se puede utilizar para la sincronización.
Sincronización - Después de un punto en el tiempo
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Convertir una cola en una matriz
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
O un trazador de líneas :
[].tap { |array| array < queue.pop until queue.empty? }
Fusionando dos colas
- Para evitar el bloqueo infinito, la lectura de las colas no debería ocurrir en la combinación de hilos en la que está ocurriendo.
- Para evitar la sincronización o la espera infinita de una de las colas mientras que otra tiene datos, la lectura de las colas no debería ocurrir en el mismo hilo.
Empecemos definiendo y llenando dos colas:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Deberíamos crear otra cola e introducir datos de otros subprocesos en ella:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Si sabe que puede consumir ambas colas completamente (la velocidad de consumo es mayor que la producción, no se quedará sin RAM), hay un enfoque más simple:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?