Ricerca…


Sintassi

  • q = Queue.new
  • q.push oggetto
  • q << oggetto # uguale a #push
  • q.pop # => oggetto

Lavoratori multipli Lavandino singolo

Vogliamo raccogliere dati creati da più lavoratori.

Per prima cosa creiamo una coda:

sink = Queue.new

Quindi 16 lavoratori generano tutti un numero casuale e lo spingono nel lavandino:

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

E per ottenere i dati, converti una coda in una matrice:

data = [].tap { |a| a << sink.pop until sink.empty? }

Una fonte più lavoratori

Vogliamo elaborare i dati in parallelo.

Compila la fonte con alcuni dati:

source = Queue.new
data = (1..100)
data.each { |e| source << e }

Quindi crea alcuni lavoratori per elaborare i dati:

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

One Source - Pipeline of Work - One Sink

Vogliamo elaborare i dati in parallelo e spostarli lungo la linea per essere elaborati da altri lavoratori.

Poiché i lavoratori consumano e producono dati, dobbiamo creare due code:

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

La prima ondata di lavoratori legge un elemento da first_input_source , elabora l'elemento e scrive i risultati in first_output_sink :

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

La seconda ondata di worker usa first_output_sink come sorgente di input e legge, processa quindi scrive su un altro sink di output:

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

Ora second_output_sink è il sink, convertiamolo in array:

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

Inserimento di dati in una coda: #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • Non vi è alcun limite massimo di acqua, le code possono crescere all'infinito.
  • #push non blocca mai

Tirare i dati da una coda - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop bloccherà fino a quando non ci saranno dati disponibili.
  • #pop può essere utilizzato per la sincronizzazione.

Sincronizzazione: dopo un punto nel tempo

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

Convertire una coda in una matrice

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

O una sola nave :

[].tap { |array| array < queue.pop until queue.empty? }

Unione di due code

  • Per evitare il blocco all'infinito, la lettura dalle code non dovrebbe avvenire durante l'unione dei thread.
  • Per evitare la sincronizzazione o l'attesa infinita di una coda mentre altri hanno dati, la lettura dalle code non dovrebbe avvenire sullo stesso thread.

Iniziamo definendo e compilando due code:

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

Dovremmo creare un'altra coda e spingere i dati da altri thread al suo interno:

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

Se sai che puoi consumare completamente entrambe le code (la velocità di consumo è superiore alla produzione, non esaurirai la RAM) c'è un approccio più semplice:

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow