Recherche…


Syntaxe

  • q = Queue.new
  • q.push objet
  • q << objet # identique à #push
  • q.pop # => objet

Plusieurs travailleurs un seul évier

Nous voulons collecter des données créées par plusieurs travailleurs.

Nous créons d'abord une file d'attente:

sink = Queue.new

Ensuite, 16 travailleurs ont tous généré un nombre aléatoire et l'ont poussé dans l'évier:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

Et pour obtenir les données, convertissez une file d'attente en un tableau:

data = [].tap { |a| a << sink.pop until sink.empty? }

One Source Multiple Workers

Nous voulons traiter les données en parallèle.

Remplissons la source avec quelques données:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Ensuite, créez des travailleurs pour traiter les données:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

Une source - Pipeline of Work - Un évier

Nous souhaitons traiter les données en parallèle et les acheminer sur la ligne à traiter par les autres travailleurs.

Puisque les travailleurs consomment et produisent des données, nous devons créer deux files d'attente:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

La première vague de travailleurs lit un élément de first_input_source , traite l'élément et écrit les résultats dans first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

La deuxième vague de travailleurs utilise first_output_sink comme source d’entrée et lit, puis écrit dans un autre first_output_sink de sortie:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Maintenant second_output_sink est le second_output_sink , convertissons-le en un tableau:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Pousser des données dans une file d'attente - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Il n'y a pas de marée haute, les files d'attente peuvent croître à l'infini.
  • #push ne bloque jamais

Extraction de données d'une file d'attente - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop bloquera jusqu'à ce que certaines données soient disponibles.
  • #pop peut être utilisé pour la synchronisation.

Synchronisation - Après un point dans le temps

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Conversion d'une file d'attente en un tableau

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Ou un seul paquebot :

[].tap { |array| array < queue.pop until queue.empty? }

Fusion de deux files d'attente

  • Pour éviter le blocage à l'infini, la lecture des files d'attente ne devrait pas avoir lieu lorsque la fusion de threads se produit.
  • Pour éviter la synchronisation ou l'attente infinie d'une des files d'attente alors que d'autres ont des données, la lecture des files d'attente ne devrait pas avoir lieu sur le même thread.

Commençons par définir et alimenter deux files d'attente:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Nous devrions créer une autre file d'attente et y insérer des données provenant d'autres threads:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Si vous savez que vous pouvez consommer complètement les deux files d'attente (la vitesse de consommation est supérieure à la production, vous ne manquerez pas de RAM), l'approche est plus simple:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow