From d65979cc2e49bd4fde5fc97b787e56aaf82ce2eb Mon Sep 17 00:00:00 2001 From: Patrick Mahoney Date: Sat, 17 Dec 2011 11:05:39 -0600 Subject: [PATCH] Initial attempt at ConditionVariable implementation. --- lib/process_shared/condition_variable.rb | 38 +++++++++-- .../process_shared/condition_variable_spec.rb | 67 +++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 spec/process_shared/condition_variable_spec.rb diff --git a/lib/process_shared/condition_variable.rb b/lib/process_shared/condition_variable.rb index afca674..4348d1d 100644 --- a/lib/process_shared/condition_variable.rb +++ b/lib/process_shared/condition_variable.rb @@ -1,14 +1,16 @@ require 'process_shared/semaphore' module ProcessShared - # TODO: implement this class ConditionVariable def initialize - @sem = Semaphore.new + @internal = Semaphore.new(1) + @waiting = SharedMemory.new(:int) + @waiting.write_int(0) + @sem = Semaphore.new(0) end def broadcast - @sem.post + waiting.times { @sem.post } end def signal @@ -18,10 +20,38 @@ module ProcessShared def wait(mutex, timeout = nil) mutex.unlock begin - @sem.wait + inc_waiting + if timeout + begin + @sem.try_wait(timeout) + rescue Errno::EAGAIN, Errno::ETIMEDOUT + # success! + end + else + @sem.wait + end + dec_waiting ensure mutex.lock end end + + private + + def waiting + @internal.synchronize do + @waiting.read_int + end + end + + def inc_waiting(val = 1) + @internal.synchronize do + @waiting.write_int(@waiting.read_int + val) + end + end + + def dec_waiting + inc_waiting(-1) + end end end diff --git a/spec/process_shared/condition_variable_spec.rb b/spec/process_shared/condition_variable_spec.rb new file mode 100644 index 0000000..7ffcaba --- /dev/null +++ b/spec/process_shared/condition_variable_spec.rb @@ -0,0 +1,67 @@ +require 'spec_helper' + +require 'process_shared/condition_variable' + +module ProcessShared + describe ConditionVariable do + it 'runs the example of Ruby Stdlib ConditionVariable' do + mutex = Mutex.new + resource = ConditionVariable.new + + a = fork { + mutex.synchronize { + resource.wait(mutex) + } + Kernel.exit! + } + + b = fork { + mutex.synchronize { + resource.signal + } + Kernel.exit! + } + + ::Process.wait(a) + ::Process.wait(b) + end + + it 'broadcasts to multiple processes' do + mutex = Mutex.new + cond = ConditionVariable.new + mem = SharedMemory.new(:int) + mem.write_int(0) + + pids = [] + 10.times do + pids << fork do + mutex.synchronize { + cond.wait(mutex) + mem.write_int(mem.read_int + 1) + } + Kernel.exit! + end + end + + sleep 0.2 # hopefully they are all waiting... + mutex.synchronize { + cond.broadcast + } + + pids.each { |p| ::Process.wait(p) } + + mem.read_int.must_equal(10) + end + + it 'stops waiting after timeout' do + mutex = Mutex.new + cond = ConditionVariable.new + + mutex.synchronize { + start = Time.now.to_f + cond.wait(mutex, 0.1) + (Time.now.to_f - start).must be_gte(0.1) + } + end + end +end