수색…


통사론

  • q = Queue.new
  • q.push 개체
  • q << 개체 #push와 동일합니다.
  • q.pop # => 객체

다중 근로자 한 방울

우리는 여러 명의 근로자가 작성한 데이터를 수집하려고합니다.

먼저 대기열을 만듭니다.

sink = Queue.new

그런 다음 16 명 모두가 임의의 숫자를 생성하고 싱크대로 밀어 넣습니다.

(1..16).to_a.map do
  Thread.new do
    sink << rand(1..100)
  end
end.map(&:join)

그리고 데이터를 얻으려면 대기열을 배열로 변환하십시오.

data = [].tap { |a| a << sink.pop until sink.empty? }

단일 출처 복수 노동자

우리는 병렬로 데이터를 처리하려고합니다.

소스를 일부 데이터로 채 웁니다.

source = Queue.new
data = (1..100)
data.each { |e| source << e }

그런 다음 데이터를 처리 할 직원을 만듭니다.

(1..16).to_a.map do
  Thread.new do
    until source.empty?
      item = source.pop
      sleep 0.5
      puts "Processed: #{item}"
    end
  end
end.map(&:join)

하나의 소스 - 작업의 파이프 라인 - 하나의 싱크대

우리는 병렬로 데이터를 처리하고 다른 작업자가 처리 할 라인 아래로 밀기를 원합니다.

Workers는 데이터를 소비하고 생성하므로 두 개의 대기열을 만들어야합니다.

first_input_source = Queue.new
first_output_sink  = Queue.new
100.times { |i| first_input_source << i }

근로자의 첫 번째 물결은 first_input_source 에서 항목을 읽고 항목을 처리 한 다음 first_output_sink 결과를 first_output_sink .

(1..16).to_a.map do
  Thread.new do
    loop do
      item = first_input_source.pop
      first_output_source << item ** 2
      first_output_source << item ** 3
    end
  end
end

근로자의 두 번째 물결은 입력 소스로 first_output_sink 를 사용하고 프로세스를 읽고 다른 출력 싱크에 씁니다.

second_input_source = first_output_sink
second_output_sink  = Queue.new

(1..32).to_a.map do
  Thread.new do
    loop do
      item = second_input_source.pop
      second_output_sink << item * 2
      second_output_sink << item * 3
    end
  end
end

이제 second_output_sink 가 싱크입니다. 배열로 변환 해 봅시다 :

sleep 5 # workaround in place of synchronization
sink = second_output_sink
[].tap { |a| a << sink.pop until sink.empty? }

대기열에 데이터 밀어 넣기 - #push

q = Queue.new
q << "any object including another queue"
# or
q.push :data
  • 높은 워터 마크가 없으며 대기열이 무한히 커질 수 있습니다.
  • #push never 블록

대기열에서 데이터 가져 오기 - #pop

q = Queue.new
q << :data
q.pop #=> :data
  • #pop 은 사용할 수있는 데이터가있을 때까지 차단됩니다.
  • #pop 은 동기화에 사용될 수 있습니다.

동기화 - 특정 시점 이후

syncer = Queue.new

a = Thread.new do
  syncer.pop
  puts "this happens at end"
end

b = Thread.new do
  puts "this happens first"
  STDOUT.flush
  syncer << :ok
end

[a, b].map(&:join)

큐를 배열로 변환

q = Queue.new
q << 1
q << 2

a = Array.new
a << q.pop until q.empty?

아니면 하나의 라이너 :

[].tap { |array| array < queue.pop until queue.empty? }

두 개의 대기열 병합

  • 무한 차단을 피하려면 스레드 병합이 발생하는 동안 대기열에서 읽기가 발생하지 않아야합니다.
  • 다른 사용자가 데이터를 가지고있는 동안 동기화를 피하거나 큐 중 하나를 무한히 기다리려면 대기열에서 읽기가 동일한 스레드에서 일어나지 않아야합니다.

두 개의 대기열을 정의하고 채우는 것으로 시작합시다.

q1 = Queue.new
q2 = Queue.new
(1..100).each { |e| q1 << e }
(101..200).each { |e| q2 << e }

다른 큐를 생성하고 다른 스레드의 데이터를 그곳으로 밀어 넣어야합니다.

merged = Queue.new

[q1, q2].map do |q|
  Thread.new do
    loop do
      merged << q.pop
    end
  end
end

두 대기열을 완전히 소비 할 수 있다는 것을 알고 있으면 (소비 속도가 생산보다 높으면 RAM이 부족하지는 않습니다) 더 간단한 방법이 있습니다.

merged = Queue.new
merged << q1.pop until q1.empty?
merged << q2.pop until q2.empty?


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow