Ruby Language
Warteschlange
Suche…
Syntax
- q = Warteschlange.neu
- q.push Objekt
- q << object # wie #push
- q.pop # => Objekt
Mehrere Arbeiter eine Senke
Wir möchten Daten sammeln, die von mehreren Mitarbeitern erstellt wurden.
Zuerst erstellen wir eine Warteschlange:
sink = Queue.new
Dann generieren 16 Arbeiter eine Zufallszahl und stecken sie in die Senke:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
Um die Daten abzurufen, konvertieren Sie eine Warteschlange in ein Array:
data = [].tap { |a| a << sink.pop until sink.empty? }
Eine Quelle mehrere Arbeiter
Wir wollen Daten parallel verarbeiten.
Lassen Sie uns die Quelle mit einigen Daten füllen:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Erstellen Sie dann einige Arbeiter, um Daten zu verarbeiten:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Eine Quelle - Pipeline der Arbeit - Eine Senke
Wir möchten die Daten parallel verarbeiten und auf die Linie bringen, die von anderen Mitarbeitern verarbeitet werden soll.
Da Arbeiter sowohl Daten konsumieren als auch produzieren, müssen wir zwei Warteschlangen erstellen:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Die erste Arbeiterwelle liest ein Element aus first_input_source
, verarbeitet das Element und schreibt Ergebnisse in first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Die zweite Arbeiterwelle verwendet first_output_sink
als first_output_sink
und liest, der Prozess schreibt dann in eine andere Ausgabesenke:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Nun ist second_output_sink
die Senke, konvertieren wir sie in ein Array:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Daten in eine Warteschlange verschieben - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Es gibt keine hohe Wasserlinie, Schlangen können unendlich groß werden.
-
#push
blockiert nie
Daten aus einer Warteschlange ziehen - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
wird blockiert, bis Daten verfügbar sind. -
#pop
kann zur Synchronisation verwendet werden.
Synchronisation - Nach einem bestimmten Zeitpunkt
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Konvertieren einer Warteschlange in ein Array
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Oder eine Einlage :
[].tap { |array| array < queue.pop until queue.empty? }
Zwei Warteschlangen zusammenführen
- Um ein endloses Blockieren zu vermeiden, sollte das Lesen von Warteschlangen nicht bei der Zusammenführung des Threads erfolgen.
- Um die Synchronisierung zu vermeiden oder auf eine Warteschlange zu warten, während andere Daten enthalten, sollte das Lesen aus den Warteschlangen nicht im selben Thread erfolgen.
Beginnen wir mit der Definition und Besetzung von zwei Warteschlangen:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Wir sollten eine weitere Warteschlange erstellen und Daten aus anderen Threads in diese hineinschieben:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Wenn Sie wissen, dass Sie beide Warteschlangen vollständig verwenden können (die Verbrauchsgeschwindigkeit ist höher als bei der Produktion, wird nicht genügend RAM benötigt), gibt es einen einfacheren Ansatz:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?