Ruby Language
Wachtrij
Zoeken…
Syntaxis
- q = Wachtrij. nieuw
- q.push object
- q << object # hetzelfde als #push
- q.pop # => object
Meerdere werknemers Eén wastafel
We willen gegevens verzamelen die zijn gemaakt door meerdere werknemers.
Eerst maken we een wachtrij:
sink = Queue.new
Vervolgens genereren 16 werknemers allemaal een willekeurig nummer en duwen het in de gootsteen:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
En om de gegevens te krijgen, converteert u een wachtrij naar een array:
data = [].tap { |a| a << sink.pop until sink.empty? }
Eén bron Meerdere werknemers
We willen gegevens parallel verwerken.
Laten we de bron vullen met enkele gegevens:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Maak vervolgens enkele medewerkers aan om gegevens te verwerken:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Eén bron - Werkleiding - Eén wastafel
We willen gegevens parallel verwerken en naar beneden schuiven om door andere werknemers te worden verwerkt.
Aangezien werknemers zowel gegevens consumeren als produceren, moeten we twee wachtrijen maken:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Eerste groep werknemers leest een item uit first_input_source , first_input_source het item en schrijft resultaten in first_output_sink :
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Tweede golf van werknemers gebruikt first_output_sink als invoerbron en leest, proces schrijft dan naar een andere output-sink:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Nu is second_output_sink de sink, laten we het converteren naar een array:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Gegevens in een wachtrij duwen - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Er is geen hoogwaterlijn, wachtrijen kunnen oneindig groeien.
-
#pushblokkeert nooit
Gegevens uit een wachtrij halen - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#popwordt geblokkeerd totdat er gegevens beschikbaar zijn. -
#popkan worden gebruikt voor synchronisatie.
Synchronisatie - Na een tijdstip
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Een wachtrij omzetten in een array
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Of een voering :
[].tap { |array| array < queue.pop until queue.empty? }
Twee wachtrijen samenvoegen
- Om oneindig blokkeren te voorkomen, moet lezen uit wachtrijen niet gebeuren wanneer de thread samengaat.
- Om te voorkomen dat synchronisatie of oneindig wachten op een van de wachtrijen terwijl de andere gegevens heeft, mag lezen uit wachtrijen niet op dezelfde thread gebeuren.
Laten we beginnen met het definiëren en vullen van twee wachtrijen:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
We moeten een nieuwe wachtrij maken en gegevens van andere threads erin pushen:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Als je weet dat je beide wachtrijen volledig kunt consumeren (consumptiesnelheid is hoger dan productie, je hebt geen RAM-geheugen) is er een eenvoudiger aanpak:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?