Ruby Language
Очередь
Поиск…
Синтаксис
- q = Queue.new
- Объект q.push
- q << объект # тот же, что и #push
- Объект q.pop # =>
Несколько рабочих одна раковина
Мы хотим собрать данные, созданные несколькими рабочими.
Сначала мы создаем очередь:
sink = Queue.new
Затем 16 рабочих производят случайное число и подталкивают его к раковине:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
И чтобы получить данные, конвертируйте очередь в массив:
data = [].tap { |a| a << sink.pop until sink.empty? }
Один источник нескольких работников
Мы хотим обрабатывать данные параллельно.
Давайте наполним источник данными:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Затем создайте некоторых рабочих для обработки данных:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Один источник - трубопровод работы - одна раковина
Мы хотим обрабатывать данные параллельно и подталкивать их к линии, которая будет обрабатываться другими рабочими.
Поскольку Рабочие потребляют и производят данные, мы должны создать две очереди:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Первая волна рабочих читает элемент из first_input_source
, обрабатывает элемент и записывает результаты в first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Вторая волна рабочих использует first_output_sink
качестве источника входного сигнала и считывает, затем обрабатывает другой выходной приемник:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Теперь second_output_sink
- это приемник, давайте преобразуем его в массив:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Нажатие данных в очередь - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Нет отметки о воде, очереди могут бесконечно расти.
-
#push
никогда не блокирует
Вытягивание данных из очереди - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
будет блокироваться до тех пор, пока не будут доступны некоторые данные. -
#pop
может использоваться для синхронизации.
Синхронизация - после точки во времени
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Преобразование очереди в массив
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Или один лайнер :
[].tap { |array| array < queue.pop until queue.empty? }
Объединение двух очередей
- Чтобы избежать бесконечной блокировки, чтение из очередей не должно происходить при слиянии потоков.
- Чтобы избежать синхронизации или бесконечно ждать одной из очередей, в то время как другие имеют данные, чтение из очередей не должно происходить в одном потоке.
Начнем с определения и заполнения двух очередей:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Мы должны создать еще одну очередь и перенести данные из других потоков в нее:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Если вы знаете, что можете полностью потреблять обе очереди (скорость потребления выше, чем у производства, вы не закончите ОЗУ) есть более простой подход:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?