Szukaj…


Składnia

  • q = Kolejka. nowa
  • q.push obiekt
  • q << obiekt # taki sam jak #push
  • q.pop # => obiekt

Wielu pracowników Jeden zlew

Chcemy gromadzić dane tworzone przez wielu pracowników.

Najpierw tworzymy kolejkę:

sink = Queue.new

Następnie 16 pracowników generuje losową liczbę i wpycha ją do zlewu:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

Aby uzyskać dane, przekonwertuj kolejkę na tablicę:

data = [].tap { |a| a << sink.pop until sink.empty? }

Jedno źródło, wielu pracowników

Chcemy przetwarzać dane równolegle.

Wypełnijmy źródło danymi:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Następnie utwórz pracowników do przetwarzania danych:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

Jedno źródło - potok pracy - jeden zlew

Chcemy przetwarzać dane równolegle i przesuwać je w dół, aby mogły być przetwarzane przez innych pracowników.

Ponieważ pracownicy zarówno zużywają, jak i wytwarzają dane, musimy utworzyć dwie kolejki:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

Pierwsza fala pracowników odczytuje element ze first_input_source , przetwarza element i zapisuje wyniki w first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

Druga fala pracowników używa first_output_sink jako źródła wejściowego i czyta, a następnie zapisuje do innego ujścia wyjściowego:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Teraz second_output_sink jest zlewem, przekonwertujmy go na tablicę:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Przekazywanie danych do kolejki - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Nie ma znaku wysokiej wody, kolejki mogą rosnąć nieskończenie.
  • #push nigdy nie blokuje

Pobieranie danych z kolejki - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop będzie blokować, dopóki nie będzie dostępnych danych.
  • #pop może być używany do synchronizacji.

Synchronizacja - po pewnym momencie

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Konwertowanie kolejki na tablicę

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

Lub jedna wkładka :

[].tap { |array| array < queue.pop until queue.empty? }

Scalanie dwóch kolejek

  • Aby uniknąć nieskończonego blokowania, czytanie z kolejek nie powinno odbywać się podczas scalania wątków.
  • Aby uniknąć synchronizacji lub nieskończonego oczekiwania na jedną z kolejek, podczas gdy druga ma dane, czytanie z kolejek nie powinno odbywać się w tym samym wątku.

Zacznijmy od zdefiniowania i zapełnienia dwóch kolejek:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Powinniśmy utworzyć kolejkę i wepchnąć do niej dane z innych wątków:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Jeśli wiesz, że możesz całkowicie wykorzystać obie kolejki (prędkość konsumpcji jest wyższa niż produkcja, nie zabraknie pamięci RAM), istnieje prostsze podejście:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow