Ruby Language
Kolejka
Szukaj…
Składnia
- q = Kolejka. nowa
- q.push obiekt
- q << obiekt # taki sam jak #push
- q.pop # => obiekt
Wielu pracowników Jeden zlew
Chcemy gromadzić dane tworzone przez wielu pracowników.
Najpierw tworzymy kolejkę:
sink = Queue.new
Następnie 16 pracowników generuje losową liczbę i wpycha ją do zlewu:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
Aby uzyskać dane, przekonwertuj kolejkę na tablicę:
data = [].tap { |a| a << sink.pop until sink.empty? }
Jedno źródło, wielu pracowników
Chcemy przetwarzać dane równolegle.
Wypełnijmy źródło danymi:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Następnie utwórz pracowników do przetwarzania danych:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
Jedno źródło - potok pracy - jeden zlew
Chcemy przetwarzać dane równolegle i przesuwać je w dół, aby mogły być przetwarzane przez innych pracowników.
Ponieważ pracownicy zarówno zużywają, jak i wytwarzają dane, musimy utworzyć dwie kolejki:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
Pierwsza fala pracowników odczytuje element ze first_input_source
, przetwarza element i zapisuje wyniki w first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
Druga fala pracowników używa first_output_sink
jako źródła wejściowego i czyta, a następnie zapisuje do innego ujścia wyjściowego:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Teraz second_output_sink
jest zlewem, przekonwertujmy go na tablicę:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Przekazywanie danych do kolejki - #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Nie ma znaku wysokiej wody, kolejki mogą rosnąć nieskończenie.
-
#push
nigdy nie blokuje
Pobieranie danych z kolejki - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
będzie blokować, dopóki nie będzie dostępnych danych. -
#pop
może być używany do synchronizacji.
Synchronizacja - po pewnym momencie
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Konwertowanie kolejki na tablicę
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
Lub jedna wkładka :
[].tap { |array| array < queue.pop until queue.empty? }
Scalanie dwóch kolejek
- Aby uniknąć nieskończonego blokowania, czytanie z kolejek nie powinno odbywać się podczas scalania wątków.
- Aby uniknąć synchronizacji lub nieskończonego oczekiwania na jedną z kolejek, podczas gdy druga ma dane, czytanie z kolejek nie powinno odbywać się w tym samym wątku.
Zacznijmy od zdefiniowania i zapełnienia dwóch kolejek:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Powinniśmy utworzyć kolejkę i wepchnąć do niej dane z innych wątków:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Jeśli wiesz, że możesz całkowicie wykorzystać obie kolejki (prędkość konsumpcji jest wyższa niż produkcja, nie zabraknie pamięci RAM), istnieje prostsze podejście:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?