Ruby Language
Coda
Ricerca…
Sintassi
- q = Queue.new
- q.push oggetto
- q << oggetto # uguale a #push
- q.pop # => oggetto
Lavoratori multipli Lavandino singolo
Vogliamo raccogliere dati creati da più lavoratori.
Per prima cosa creiamo una coda:
sink = Queue.new
Quindi 16 lavoratori generano tutti un numero casuale e lo spingono nel lavandino:
(1..16).to_a.map do
Thread.new do
sink << rand(1..100)
end
end.map(&:join)
E per ottenere i dati, converti una coda in una matrice:
data = [].tap { |a| a << sink.pop until sink.empty? }
Una fonte più lavoratori
Vogliamo elaborare i dati in parallelo.
Compila la fonte con alcuni dati:
source = Queue.new
data = (1..100)
data.each { |e| source << e }
Quindi crea alcuni lavoratori per elaborare i dati:
(1..16).to_a.map do
Thread.new do
until source.empty?
item = source.pop
sleep 0.5
puts "Processed: #{item}"
end
end
end.map(&:join)
One Source - Pipeline of Work - One Sink
Vogliamo elaborare i dati in parallelo e spostarli lungo la linea per essere elaborati da altri lavoratori.
Poiché i lavoratori consumano e producono dati, dobbiamo creare due code:
first_input_source = Queue.new
first_output_sink = Queue.new
100.times { |i| first_input_source << i }
La prima ondata di lavoratori legge un elemento da first_input_source
, elabora l'elemento e scrive i risultati in first_output_sink
:
(1..16).to_a.map do
Thread.new do
loop do
item = first_input_source.pop
first_output_source << item ** 2
first_output_source << item ** 3
end
end
end
La seconda ondata di worker usa first_output_sink
come sorgente di input e legge, processa quindi scrive su un altro sink di output:
second_input_source = first_output_sink
second_output_sink = Queue.new
(1..32).to_a.map do
Thread.new do
loop do
item = second_input_source.pop
second_output_sink << item * 2
second_output_sink << item * 3
end
end
end
Ora second_output_sink
è il sink, convertiamolo in array:
sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }
Inserimento di dati in una coda: #push
q = Queue.new
q << "any object including another queue"
# or
q.push :data
- Non vi è alcun limite massimo di acqua, le code possono crescere all'infinito.
-
#push
non blocca mai
Tirare i dati da una coda - #pop
q = Queue.new
q << :data
q.pop #=> :data
-
#pop
bloccherà fino a quando non ci saranno dati disponibili. -
#pop
può essere utilizzato per la sincronizzazione.
Sincronizzazione: dopo un punto nel tempo
syncer = Queue.new
a = Thread.new do
syncer.pop
puts "this happens at end"
end
b = Thread.new do
puts "this happens first"
STDOUT.flush
syncer << :ok
end
[a, b].map(&:join)
Convertire una coda in una matrice
q = Queue.new
q << 1
q << 2
a = Array.new
a << q.pop until q.empty?
O una sola nave :
[].tap { |array| array < queue.pop until queue.empty? }
Unione di due code
- Per evitare il blocco all'infinito, la lettura dalle code non dovrebbe avvenire durante l'unione dei thread.
- Per evitare la sincronizzazione o l'attesa infinita di una coda mentre altri hanno dati, la lettura dalle code non dovrebbe avvenire sullo stesso thread.
Iniziamo definendo e compilando due code:
q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }
Dovremmo creare un'altra coda e spingere i dati da altri thread al suo interno:
merged = Queue.new
[q1, q2].map do |q|
Thread.new do
loop do
merged << q.pop
end
end
end
Se sai che puoi consumare completamente entrambe le code (la velocità di consumo è superiore alla produzione, non esaurirai la RAM) c'è un approccio più semplice:
merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?