Ruby Language
Queue
Recherche…
Syntaxe
- q = Queue.new
- q.push objet
- q << objet # identique à #push
- q.pop # => objet
Plusieurs travailleurs un seul évier
Nous voulons collecter des données créées par plusieurs travailleurs.
Nous créons d'abord une file d'attente:
sink = Queue.new
Ensuite, 16 travailleurs ont tous généré un nombre aléatoire et l'ont poussé dans l'évier:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
Et pour obtenir les données, convertissez une file d'attente en un tableau:
data = [].tap { |a| a << sink.pop until sink.empty? }
One Source Multiple Workers
Nous voulons traiter les données en parallèle.
Remplissons la source avec quelques données:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Ensuite, créez des travailleurs pour traiter les données:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Une source - Pipeline of Work - Un évier
Nous souhaitons traiter les données en parallèle et les acheminer sur la ligne à traiter par les autres travailleurs.
Puisque les travailleurs consomment et produisent des données, nous devons créer deux files d'attente:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
La première vague de travailleurs lit un élément de first_input_source
, traite l'élément et écrit les résultats dans first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
La deuxième vague de travailleurs utilise first_output_sink
comme source d’entrée et lit, puis écrit dans un autre first_output_sink
de sortie:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Maintenant second_output_sink
est le second_output_sink
, convertissons-le en un tableau:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Pousser des données dans une file d'attente - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Il n'y a pas de marée haute, les files d'attente peuvent croître à l'infini.
-
#push
ne bloque jamais
Extraction de données d'une file d'attente - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
bloquera jusqu'à ce que certaines données soient disponibles. -
#pop
peut être utilisé pour la synchronisation.
Synchronisation - Après un point dans le temps
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Conversion d'une file d'attente en un tableau
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Ou un seul paquebot :
[].tap { |array| array < queue.pop until queue.empty? }
Fusion de deux files d'attente
- Pour éviter le blocage à l'infini, la lecture des files d'attente ne devrait pas avoir lieu lorsque la fusion de threads se produit.
- Pour éviter la synchronisation ou l'attente infinie d'une des files d'attente alors que d'autres ont des données, la lecture des files d'attente ne devrait pas avoir lieu sur le même thread.
Commençons par définir et alimenter deux files d'attente:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Nous devrions créer une autre file d'attente et y insérer des données provenant d'autres threads:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Si vous savez que vous pouvez consommer complètement les deux files d'attente (la vitesse de consommation est supérieure à la production, vous ne manquerez pas de RAM), l'approche est plus simple:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?