Zoeken…


Syntaxis

  • q = Wachtrij. nieuw
  • q.push object
  • q << object # hetzelfde als #push
  • q.pop # => object

Meerdere werknemers Eén wastafel

We willen gegevens verzamelen die zijn gemaakt door meerdere werknemers.

Eerst maken we een wachtrij:

sink = Queue.new

Vervolgens genereren 16 werknemers allemaal een willekeurig nummer en duwen het in de gootsteen:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

En om de gegevens te krijgen, converteert u een wachtrij naar een array:

data = [].tap { |a| a << sink.pop until sink.empty? }

Eén bron Meerdere werknemers

We willen gegevens parallel verwerken.

Laten we de bron vullen met enkele gegevens:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Maak vervolgens enkele medewerkers aan om gegevens te verwerken:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

Eén bron - Werkleiding - Eén wastafel

We willen gegevens parallel verwerken en naar beneden schuiven om door andere werknemers te worden verwerkt.

Aangezien werknemers zowel gegevens consumeren als produceren, moeten we twee wachtrijen maken:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

Eerste groep werknemers leest een item uit first_input_source , first_input_source het item en schrijft resultaten in first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

Tweede golf van werknemers gebruikt first_output_sink als invoerbron en leest, proces schrijft dan naar een andere output-sink:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Nu is second_output_sink de sink, laten we het converteren naar een array:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Gegevens in een wachtrij duwen - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Er is geen hoogwaterlijn, wachtrijen kunnen oneindig groeien.
  • #push blokkeert nooit

Gegevens uit een wachtrij halen - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop wordt geblokkeerd totdat er gegevens beschikbaar zijn.
  • #pop kan worden gebruikt voor synchronisatie.

Synchronisatie - Na een tijdstip

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Een wachtrij omzetten in een array

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Of een voering :

[].tap { |array| array < queue.pop until queue.empty? }

Twee wachtrijen samenvoegen

  • Om oneindig blokkeren te voorkomen, moet lezen uit wachtrijen niet gebeuren wanneer de thread samengaat.
  • Om te voorkomen dat synchronisatie of oneindig wachten op een van de wachtrijen terwijl de andere gegevens heeft, mag lezen uit wachtrijen niet op dezelfde thread gebeuren.

Laten we beginnen met het definiëren en vullen van twee wachtrijen:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

We moeten een nieuwe wachtrij maken en gegevens van andere threads erin pushen:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Als je weet dat je beide wachtrijen volledig kunt consumeren (consumptiesnelheid is hoger dan productie, je hebt geen RAM-geheugen) is er een eenvoudiger aanpak:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow