Suche…


Syntax

  • q = Warteschlange.neu
  • q.push Objekt
  • q << object # wie #push
  • q.pop # => Objekt

Mehrere Arbeiter eine Senke

Wir möchten Daten sammeln, die von mehreren Mitarbeitern erstellt wurden.

Zuerst erstellen wir eine Warteschlange:

sink = Queue.new

Dann generieren 16 Arbeiter eine Zufallszahl und stecken sie in die Senke:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

Um die Daten abzurufen, konvertieren Sie eine Warteschlange in ein Array:

data = [].tap { |a| a << sink.pop until sink.empty? }

Eine Quelle mehrere Arbeiter

Wir wollen Daten parallel verarbeiten.

Lassen Sie uns die Quelle mit einigen Daten füllen:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Erstellen Sie dann einige Arbeiter, um Daten zu verarbeiten:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

Eine Quelle - Pipeline der Arbeit - Eine Senke

Wir möchten die Daten parallel verarbeiten und auf die Linie bringen, die von anderen Mitarbeitern verarbeitet werden soll.

Da Arbeiter sowohl Daten konsumieren als auch produzieren, müssen wir zwei Warteschlangen erstellen:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

Die erste Arbeiterwelle liest ein Element aus first_input_source , verarbeitet das Element und schreibt Ergebnisse in first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

Die zweite Arbeiterwelle verwendet first_output_sink als first_output_sink und liest, der Prozess schreibt dann in eine andere Ausgabesenke:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Nun ist second_output_sink die Senke, konvertieren wir sie in ein Array:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Daten in eine Warteschlange verschieben - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Es gibt keine hohe Wasserlinie, Schlangen können unendlich groß werden.
  • #push blockiert nie

Daten aus einer Warteschlange ziehen - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop wird blockiert, bis Daten verfügbar sind.
  • #pop kann zur Synchronisation verwendet werden.

Synchronisation - Nach einem bestimmten Zeitpunkt

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Konvertieren einer Warteschlange in ein Array

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Oder eine Einlage :

[].tap { |array| array < queue.pop until queue.empty? }

Zwei Warteschlangen zusammenführen

  • Um ein endloses Blockieren zu vermeiden, sollte das Lesen von Warteschlangen nicht bei der Zusammenführung des Threads erfolgen.
  • Um die Synchronisierung zu vermeiden oder auf eine Warteschlange zu warten, während andere Daten enthalten, sollte das Lesen aus den Warteschlangen nicht im selben Thread erfolgen.

Beginnen wir mit der Definition und Besetzung von zwei Warteschlangen:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Wir sollten eine weitere Warteschlange erstellen und Daten aus anderen Threads in diese hineinschieben:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Wenn Sie wissen, dass Sie beide Warteschlangen vollständig verwenden können (die Verbrauchsgeschwindigkeit ist höher als bei der Produktion, wird nicht genügend RAM benötigt), gibt es einen einfacheren Ansatz:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow