サーチ…


構文

  • q = Queue.new
  • q.pushオブジェクト
  • q <<オブジェクト#pushと同じ
  • q.pop#=>オブジェクト

複数の労働者のシンク

我々は、複数の労働者によって作成されたデータを収集したい。

まず、キューを作成します。

sink = Queue.new

その後、16人の労働者がすべて乱数を生成し、それをシンクに押し込む:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

データを取得するには、キューを配列に変換します。

data = [].tap { |a| a << sink.pop until sink.empty? }

1つのソース複数の労働者

データを並行して処理する必要があります。

sourceにいくつかのデータを設定しましょう:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

次に、データを処理するためにいくつかのワーカーを作成します。

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

1つのソース - 作業のパイプライン - 1つのシンク

私たちは、データを並行して処理し、他の作業者が処理する行にプッシュダウンする必要があります。

作業者はデータを消費して生成するので、2つのキューを作成する必要があります。

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

労働者の最初の波がfirst_input_sourceから項目を読み取り、その項目を処理し、結果をfirst_output_sink書き込みます。

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

労働者の第二の波は、入力ソースとしてfirst_output_sinkを使用し、処理してから別の出力シンクに書き込みます。

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

second_output_sinkはシンクです。それを配列に変換しましょう:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

データをキューにプッシュする - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • 最高水準点はなく、キューは無限に成長することができます。
  • #push never blocks

キューからデータを引き出す - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #popは利用可能なデータがあるまでブロックします。
  • #popは同期に使用できます。

同期 - ある時点後

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

キューを配列に変換する

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

または1つのライナー

[].tap { |array| array < queue.pop until queue.empty? }

2つのキューをマージする

  • 無限にブロックされないようにするには、スレッドのマージが実行されているときにキューからの読み取りは行われません。
  • 同期を回避したり、他のキューがデータを保持している間に待ち行列を無期限に待機するには、キューからの読み取りを同じスレッドで行うべきではありません。

まず、2つのキューを定義して入力します。

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

別のキューを作成し、他のスレッドからのデータをプッシュする必要があります。

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

両方のキューを完全に消費できることがわかっている場合(消費速度は本番よりも高く、RAMが不足することはありません)、より簡単な方法があります。

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow