Поиск…


Синтаксис

  • q = Queue.new
  • Объект q.push
  • q << объект # тот же, что и #push
  • Объект q.pop # =>

Несколько рабочих одна раковина

Мы хотим собрать данные, созданные несколькими рабочими.

Сначала мы создаем очередь:

sink = Queue.new

Затем 16 рабочих производят случайное число и подталкивают его к раковине:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

И чтобы получить данные, конвертируйте очередь в массив:

data = [].tap { |a| a << sink.pop until sink.empty? }

Один источник нескольких работников

Мы хотим обрабатывать данные параллельно.

Давайте наполним источник данными:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Затем создайте некоторых рабочих для обработки данных:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

Один источник - трубопровод работы - одна раковина

Мы хотим обрабатывать данные параллельно и подталкивать их к линии, которая будет обрабатываться другими рабочими.

Поскольку Рабочие потребляют и производят данные, мы должны создать две очереди:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

Первая волна рабочих читает элемент из first_input_source , обрабатывает элемент и записывает результаты в first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

Вторая волна рабочих использует first_output_sink качестве источника входного сигнала и считывает, затем обрабатывает другой выходной приемник:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Теперь second_output_sink - это приемник, давайте преобразуем его в массив:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Нажатие данных в очередь - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Нет отметки о воде, очереди могут бесконечно расти.
  • #push никогда не блокирует

Вытягивание данных из очереди - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop будет блокироваться до тех пор, пока не будут доступны некоторые данные.
  • #pop может использоваться для синхронизации.

Синхронизация - после точки во времени

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Преобразование очереди в массив

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Или один лайнер :

[].tap { |array| array < queue.pop until queue.empty? }

Объединение двух очередей

  • Чтобы избежать бесконечной блокировки, чтение из очередей не должно происходить при слиянии потоков.
  • Чтобы избежать синхронизации или бесконечно ждать одной из очередей, в то время как другие имеют данные, чтение из очередей не должно происходить в одном потоке.

Начнем с определения и заполнения двух очередей:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Мы должны создать еще одну очередь и перенести данные из других потоков в нее:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Если вы знаете, что можете полностью потреблять обе очереди (скорость потребления выше, чем у производства, вы не закончите ОЗУ) есть более простой подход:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow