Ruby Language
Wachtrij
Zoeken…
Syntaxis
- q = Wachtrij. nieuw
- q.push object
- q << object # hetzelfde als #push
- q.pop # => object
Meerdere werknemers Eén wastafel
We willen gegevens verzamelen die zijn gemaakt door meerdere werknemers.
Eerst maken we een wachtrij:
sink = Queue.new
Vervolgens genereren 16 werknemers allemaal een willekeurig nummer en duwen het in de gootsteen:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
En om de gegevens te krijgen, converteert u een wachtrij naar een array:
data = [].tap { |a| a << sink.pop until sink.empty? }
Eén bron Meerdere werknemers
We willen gegevens parallel verwerken.
Laten we de bron vullen met enkele gegevens:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Maak vervolgens enkele medewerkers aan om gegevens te verwerken:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Eén bron - Werkleiding - Eén wastafel
We willen gegevens parallel verwerken en naar beneden schuiven om door andere werknemers te worden verwerkt.
Aangezien werknemers zowel gegevens consumeren als produceren, moeten we twee wachtrijen maken:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Eerste groep werknemers leest een item uit first_input_source
, first_input_source
het item en schrijft resultaten in first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Tweede golf van werknemers gebruikt first_output_sink
als invoerbron en leest, proces schrijft dan naar een andere output-sink:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Nu is second_output_sink
de sink, laten we het converteren naar een array:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Gegevens in een wachtrij duwen - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Er is geen hoogwaterlijn, wachtrijen kunnen oneindig groeien.
-
#push
blokkeert nooit
Gegevens uit een wachtrij halen - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
wordt geblokkeerd totdat er gegevens beschikbaar zijn. -
#pop
kan worden gebruikt voor synchronisatie.
Synchronisatie - Na een tijdstip
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Een wachtrij omzetten in een array
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Of een voering :
[].tap { |array| array < queue.pop until queue.empty? }
Twee wachtrijen samenvoegen
- Om oneindig blokkeren te voorkomen, moet lezen uit wachtrijen niet gebeuren wanneer de thread samengaat.
- Om te voorkomen dat synchronisatie of oneindig wachten op een van de wachtrijen terwijl de andere gegevens heeft, mag lezen uit wachtrijen niet op dezelfde thread gebeuren.
Laten we beginnen met het definiëren en vullen van twee wachtrijen:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
We moeten een nieuwe wachtrij maken en gegevens van andere threads erin pushen:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Als je weet dat je beide wachtrijen volledig kunt consumeren (consumptiesnelheid is hoger dan productie, je hebt geen RAM-geheugen) is er een eenvoudiger aanpak:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?