Ruby Language
Kö
Sök…
Syntax
- q = Kö.nytt
- q.push objekt
- q << objekt # samma som # push
- q.pop # => objekt
Flera arbetare en diskbänk
Vi vill samla in data skapade av flera arbetare.
Först skapar vi en kö:
sink = Queue.new
Då genererar 16 arbetare alla ett slumpmässigt antal och pressar det i sjunken:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
Och för att få data, konvertera en kö till en array:
data = [].tap { |a| a << sink.pop until sink.empty? }
En källa flera arbetare
Vi vill bearbeta data parallellt.
Låt oss fylla i källan med lite data:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Skapa sedan några arbetare för att bearbeta data:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
En källa - rörledning - en diskbänk
Vi vill bearbeta data parallellt och trycka ner dem för att arbeta av andra arbetare.
Eftersom arbetare både konsumerar och producerar data måste vi skapa två köer:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Första vågen med arbetare läste ett objekt från first_input_source
, bearbeta objektet och skriva resultat i first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Andra vågen av arbetare använder first_output_sink
som sin ingångskälla och läser, processen skriver sedan till en annan utgångssink:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Nu är second_output_sink
sjunken, låt oss konvertera det till en matris:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Trycka data in i en kö - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Det finns inget högt vattenmärke, köerna kan växa oändligt.
-
#push
blockerar aldrig
Dra data från en kö - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
kommer att blockeras tills det finns några data tillgängliga. -
#pop
kan användas för synkronisering.
Synkronisering - Efter en tidpunkt
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Konvertera en kö till en array
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Eller en foder :
[].tap { |array| array < queue.pop until queue.empty? }
Slå samman två köer
- För att undvika oändligt blockering, bör läsning från köer inte ske på trådfusionen som händer på.
- För att undvika synkronisering eller oändligt vänta på en av köerna medan andra har data, bör läsning från köer inte ske på samma tråd.
Låt oss börja med att definiera och fylla två köer:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Vi borde skapa en ny kö och skjuta data från andra trådar in i den:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Om du vet att du helt kan konsumera båda köerna (konsumtionshastigheten är högre än produktionen, kommer du inte att få slut på RAM) finns det en enklare metod:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?