Sök…


Syntax

  • q = Kö.nytt
  • q.push objekt
  • q << objekt # samma som # push
  • q.pop # => objekt

Flera arbetare en diskbänk

Vi vill samla in data skapade av flera arbetare.

Först skapar vi en kö:

sink = Queue.new

Då genererar 16 arbetare alla ett slumpmässigt antal och pressar det i sjunken:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

Och för att få data, konvertera en kö till en array:

data = [].tap { |a| a << sink.pop until sink.empty? }

En källa flera arbetare

Vi vill bearbeta data parallellt.

Låt oss fylla i källan med lite data:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Skapa sedan några arbetare för att bearbeta data:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

En källa - rörledning - en diskbänk

Vi vill bearbeta data parallellt och trycka ner dem för att arbeta av andra arbetare.

Eftersom arbetare både konsumerar och producerar data måste vi skapa två köer:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

Första vågen med arbetare läste ett objekt från first_input_source , bearbeta objektet och skriva resultat i first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

Andra vågen av arbetare använder first_output_sink som sin ingångskälla och läser, processen skriver sedan till en annan utgångssink:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Nu är second_output_sink sjunken, låt oss konvertera det till en matris:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Trycka data in i en kö - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Det finns inget högt vattenmärke, köerna kan växa oändligt.
  • #push blockerar aldrig

Dra data från en kö - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop kommer att blockeras tills det finns några data tillgängliga.
  • #pop kan användas för synkronisering.

Synkronisering - Efter en tidpunkt

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Konvertera en kö till en array

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Eller en foder :

[].tap { |array| array < queue.pop until queue.empty? }

Slå samman två köer

  • För att undvika oändligt blockering, bör läsning från köer inte ske på trådfusionen som händer på.
  • För att undvika synkronisering eller oändligt vänta på en av köerna medan andra har data, bör läsning från köer inte ske på samma tråd.

Låt oss börja med att definiera och fylla två köer:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Vi borde skapa en ny kö och skjuta data från andra trådar in i den:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Om du vet att du helt kan konsumera båda köerna (konsumtionshastigheten är högre än produktionen, kommer du inte att få slut på RAM) finns det en enklare metod:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow