From 15fe2e50570212c7b15bb2a037b3e16c2aee42a6 Mon Sep 17 00:00:00 2001 From: Patrick Mahoney Date: Tue, 7 Feb 2012 23:06:17 -0600 Subject: [PATCH] Refactor class hierarchy, implementation selection details. --- lib/process_shared.rb | 32 +++++-------- lib/process_shared/binary_semaphore.rb | 9 +--- lib/process_shared/condition_variable.rb | 2 +- lib/process_shared/mach/semaphore.rb | 5 +- lib/process_shared/mutex.rb | 11 ++--- .../{shared_memory.rb => object_buffer.rb} | 20 ++------ lib/process_shared/open_with_self.rb | 20 ++++++++ lib/process_shared/posix/semaphore.rb | 6 ++- lib/process_shared/posix/shared_memory.rb | 7 +-- lib/process_shared/semaphore.rb | 37 -------------- lib/process_shared/shared_array.rb | 2 +- .../synchronizable_semaphore.rb | 16 +++++++ lib/process_shared/with_self.rb | 20 -------- spec/mach/semaphore_spec.rb | 48 ++++++++++--------- spec/process_shared/mutex_spec.rb | 9 ++-- spec/process_shared/semaphore_spec.rb | 3 +- 16 files changed, 100 insertions(+), 147 deletions(-) rename lib/process_shared/{shared_memory.rb => object_buffer.rb} (82%) create mode 100644 lib/process_shared/open_with_self.rb delete mode 100644 lib/process_shared/semaphore.rb create mode 100644 lib/process_shared/synchronizable_semaphore.rb delete mode 100644 lib/process_shared/with_self.rb diff --git a/lib/process_shared.rb b/lib/process_shared.rb index e93c4b3..1d625d6 100644 --- a/lib/process_shared.rb +++ b/lib/process_shared.rb @@ -3,39 +3,29 @@ require 'ffi' if RUBY_VERSION =~ /^1.8/ require 'process_shared/define_singleton_method' - module ProcessShared - module PSem - extend DefineSingletonMethod - end - - module RT - extend DefineSingletonMethod - end - - module LibC - extend DefineSingletonMethod - end + class Module + include ProcessShared::DefineSingletonMethod end end -require 'process_shared/semaphore' -require 'process_shared/binary_semaphore' -require 'process_shared/mutex' -require 'process_shared/shared_memory' - module ProcessShared case FFI::Platform::OS when 'linux' require 'process_shared/posix/shared_memory' require 'process_shared/posix/semaphore' - SharedMemory.impl = Posix::SharedMemory - Semaphore.impl = Posix::Semaphore + SharedMemory = Posix::SharedMemory + Semaphore = Posix::Semaphore when 'darwin' require 'process_shared/posix/shared_memory' require 'process_shared/mach/semaphore' - SharedMemory.impl = Posix::SharedMemory - Semaphore.impl = Mach::Semaphore + SharedMemory = Posix::SharedMemory + Semaphore = Mach::Semaphore end end + +require 'process_shared/binary_semaphore' +require 'process_shared/mutex' +require 'process_shared/condition_variable' + diff --git a/lib/process_shared/binary_semaphore.rb b/lib/process_shared/binary_semaphore.rb index 73b11ac..f716a0d 100644 --- a/lib/process_shared/binary_semaphore.rb +++ b/lib/process_shared/binary_semaphore.rb @@ -1,8 +1,7 @@ require 'forwardable' require 'process_shared' -require 'process_shared/with_self' -require 'process_shared/semaphore' +require 'process_shared/open_with_self' require 'process_shared/process_error' module ProcessShared @@ -14,14 +13,10 @@ module ProcessShared # This is identical to a Semaphore but with extra error checking. class BinarySemaphore extend Forwardable - include ProcessShared::WithSelf + extend ProcessShared::OpenWithSelf def_delegators :@sem, :wait, :try_wait, :synchronize, :value, :close - def self.open(value = 1, &block) - new(value).with_self(&block) - end - # Create a new semaphore with initial value +value+. After # {Kernel#fork}, the semaphore will be shared across two (or more) # processes. The semaphore must be closed with {#close} in each diff --git a/lib/process_shared/condition_variable.rb b/lib/process_shared/condition_variable.rb index 3079aa6..0fd42b4 100644 --- a/lib/process_shared/condition_variable.rb +++ b/lib/process_shared/condition_variable.rb @@ -1,4 +1,4 @@ -require 'process_shared/semaphore' +require 'process_shared' module ProcessShared class ConditionVariable diff --git a/lib/process_shared/mach/semaphore.rb b/lib/process_shared/mach/semaphore.rb index a0fce59..1a87488 100644 --- a/lib/process_shared/mach/semaphore.rb +++ b/lib/process_shared/mach/semaphore.rb @@ -2,12 +2,15 @@ require 'mach' require 'mach/error' require 'process_shared/mach' +require 'process_shared/open_with_self' +require 'process_shared/synchronizable_semaphore' module ProcessShared module Mach # Extends ::Mach::Semaphore to be compatible with ProcessShared::Semaphore class Semaphore < ::Mach::Semaphore - include ProcessShared::Semaphore + extend ProcessShared::OpenWithSelf + include ProcessShared::SynchronizableSemaphore def initialize(value = 1) super(:value => value) diff --git a/lib/process_shared/mutex.rb b/lib/process_shared/mutex.rb index 8505f40..6de5d12 100644 --- a/lib/process_shared/mutex.rb +++ b/lib/process_shared/mutex.rb @@ -1,6 +1,5 @@ -require 'process_shared/semaphore' -require 'process_shared/with_self' -require 'process_shared/shared_memory' +require 'process_shared' +require 'process_shared/open_with_self' require 'process_shared/process_error' module ProcessShared @@ -20,11 +19,7 @@ module ProcessShared # release its {Semaphore} and {SharedMemory} resources. For now, # rely on the object finalizers of those objects... class Mutex - # include WithSelf - - # def self.open(&block) - # new.with_self(&block) - # end + extend OpenWithSelf def initialize @internal_sem = Semaphore.new diff --git a/lib/process_shared/shared_memory.rb b/lib/process_shared/object_buffer.rb similarity index 82% rename from lib/process_shared/shared_memory.rb rename to lib/process_shared/object_buffer.rb index 192cc39..6bf72b5 100644 --- a/lib/process_shared/shared_memory.rb +++ b/lib/process_shared/object_buffer.rb @@ -1,23 +1,9 @@ -require 'process_shared/with_self' require 'process_shared/shared_memory_io' module ProcessShared - # Memory block shared across processes. - module SharedMemory - include ProcessShared::WithSelf - - class << self - attr_accessor :impl - - def new(*args) - impl.new(*args) - end - - def open(size, &block) - new(size).with_self(&block) - end - end - + # Provides reading and writing of serialized objects from a memory + # buffer. + module ObjectBuffer # Write the serialization of +obj+ (using Marshal.dump) to this # shared memory object at +offset+ (in bytes). # diff --git a/lib/process_shared/open_with_self.rb b/lib/process_shared/open_with_self.rb new file mode 100644 index 0000000..a5c5ae8 --- /dev/null +++ b/lib/process_shared/open_with_self.rb @@ -0,0 +1,20 @@ +module ProcessShared + module OpenWithSelf + # Like #new but if the optional code block is given, it will be + # passed the new object as an argument, and the object will + # automatically be closed (by invoking +close+) when the block + # terminates. In this instance, value of the block is returned. + def open(*args, &block) + obj = new(*args) + if block_given? + begin + yield obj + ensure + obj.close + end + else + obj + end + end + end +end diff --git a/lib/process_shared/posix/semaphore.rb b/lib/process_shared/posix/semaphore.rb index e72d1a6..e75028f 100644 --- a/lib/process_shared/posix/semaphore.rb +++ b/lib/process_shared/posix/semaphore.rb @@ -1,4 +1,5 @@ -require 'process_shared/semaphore' +require 'process_shared/synchronizable_semaphore' +require 'process_shared/open_with_self' require 'process_shared/posix/errno' require 'process_shared/posix/libc' @@ -34,8 +35,9 @@ module ProcessShared :sem_timedwait) end + extend ProcessShared::OpenWithSelf include Foreign - include ProcessShared::Semaphore + include ProcessShared::SynchronizableSemaphore # Make a Proc suitable for use as a finalizer that will call # +shm_unlink+ on +sem+. diff --git a/lib/process_shared/posix/shared_memory.rb b/lib/process_shared/posix/shared_memory.rb index d612eb3..351dbab 100644 --- a/lib/process_shared/posix/shared_memory.rb +++ b/lib/process_shared/posix/shared_memory.rb @@ -2,7 +2,8 @@ require 'ffi' require 'process_shared/posix/errno' require 'process_shared/posix/libc' -require 'process_shared/shared_memory' +require 'process_shared/object_buffer' +require 'process_shared/open_with_self' module ProcessShared module Posix @@ -25,10 +26,10 @@ module ProcessShared error_check :shm_open, :shm_unlink end + extend ProcessShared::OpenWithSelf include SharedMemory::Foreign include LibC - - include ProcessShared::SharedMemory + include ProcessShared::ObjectBuffer attr_reader :size, :type, :type_size, :count, :fd diff --git a/lib/process_shared/semaphore.rb b/lib/process_shared/semaphore.rb deleted file mode 100644 index b1766e3..0000000 --- a/lib/process_shared/semaphore.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'process_shared/with_self' - -module ProcessShared - module Semaphore - include ProcessShared::WithSelf - - class << self - # the implementation to use to create semaphores. impl is set - # based on the platform in 'process_shared' - attr_accessor :impl - - def new(*args) - impl.new(*args) - end - - # With no associated block, open is a synonym for - # Semaphore.new. If the optional code block is given, it will be - # passed +sem+ as an argument, and the Semaphore object will - # automatically be closed when the block terminates. In this - # instance, Semaphore.open returns the value of the block. - # - # @param [Integer] value the initial semaphore value - def open(value = 1, &block) - new(value).with_self(&block) - end - end - - def synchronize - wait - begin - yield - ensure - post - end - end - end -end diff --git a/lib/process_shared/shared_array.rb b/lib/process_shared/shared_array.rb index a4409e8..a4e65c8 100644 --- a/lib/process_shared/shared_array.rb +++ b/lib/process_shared/shared_array.rb @@ -1,7 +1,7 @@ require 'process_shared' module ProcessShared - class SharedArray < SharedMemory.impl + class SharedArray < SharedMemory include Enumerable # A fixed-size array in shared memory. Processes forked from this diff --git a/lib/process_shared/synchronizable_semaphore.rb b/lib/process_shared/synchronizable_semaphore.rb new file mode 100644 index 0000000..38529c8 --- /dev/null +++ b/lib/process_shared/synchronizable_semaphore.rb @@ -0,0 +1,16 @@ +module ProcessShared + module SynchronizableSemaphore + # Yield the block after decrementing the semaphore, ensuring that + # the semaphore is incremented. + # + # @return [Object] the value of the block + def synchronize + wait + begin + yield + ensure + post + end + end + end +end diff --git a/lib/process_shared/with_self.rb b/lib/process_shared/with_self.rb deleted file mode 100644 index 3e20b05..0000000 --- a/lib/process_shared/with_self.rb +++ /dev/null @@ -1,20 +0,0 @@ -module ProcessShared - module WithSelf - # With no associated block, return self. If the optional code - # block is given, it will be passed `self` as an argument, and the - # self object will automatically be closed (by invoking +close+ on - # +self+) when the block terminates. In this instance, value of - # the block is returned. - def with_self - if block_given? - begin - yield self - ensure - self.close - end - else - self - end - end - end -end diff --git a/spec/mach/semaphore_spec.rb b/spec/mach/semaphore_spec.rb index b1eef36..f3bd076 100644 --- a/spec/mach/semaphore_spec.rb +++ b/spec/mach/semaphore_spec.rb @@ -30,31 +30,35 @@ module Mach end it 'coordinates access to shared resource between two tasks' do - sem = Semaphore.new(:value => 0) + begin + sem = Semaphore.new(:value => 0) - port = Port.new - port.insert_right(:make_send) - Task.self.set_bootstrap_port(port) + port = Port.new + port.insert_right(:make_send) + Task.self.set_bootstrap_port(port) - child = fork do - parent_port = Task.self.get_bootstrap_port - Task.self.copy_send(parent_port) - # parent will copy send rights to sem into child task - sleep 0.5 - sem.signal - Kernel.exit! + child = fork do + parent_port = Task.self.get_bootstrap_port + Task.self.copy_send(parent_port) + # parent will copy send rights to sem into child task + sleep 0.5 + sem.signal + Kernel.exit! + end + + child_task_port = port.receive_right + + start = Time.now.to_f + sem.insert_right(:copy_send, :ipc_space => child_task_port) + sem.wait + elapsed = Time.now.to_f - start + + Process.wait child + + elapsed.must be_gt(0.4) + ensure + Task.self.set_bootstrap_port(Mach::Functions.bootstrap_port) end - - child_task_port = port.receive_right - - start = Time.now.to_f - sem.insert_right(:copy_send, :ipc_space => child_task_port) - sem.wait - elapsed = Time.now.to_f - start - - Process.wait child - - elapsed.must be_gt(0.4) end end end diff --git a/spec/process_shared/mutex_spec.rb b/spec/process_shared/mutex_spec.rb index f9a08bb..e790ab0 100644 --- a/spec/process_shared/mutex_spec.rb +++ b/spec/process_shared/mutex_spec.rb @@ -1,6 +1,5 @@ require 'spec_helper' -require 'process_shared/mutex' -require 'process_shared/shared_memory' +require 'process_shared' module ProcessShared describe Mutex do @@ -60,9 +59,9 @@ module ProcessShared pid = Kernel.fork do mutex.lock - sleep 0.2 - mutex.unlock - Kernel.exit! + # sleep 0.2 + # mutex.unlock + # Kernel.exit! end sleep 0.1 diff --git a/spec/process_shared/semaphore_spec.rb b/spec/process_shared/semaphore_spec.rb index 2cf50ec..75b9236 100644 --- a/spec/process_shared/semaphore_spec.rb +++ b/spec/process_shared/semaphore_spec.rb @@ -1,8 +1,7 @@ require 'spec_helper' require 'ffi' -require 'process_shared/semaphore' -require 'process_shared/shared_memory' +require 'process_shared' module ProcessShared describe Semaphore do