Refactor class hierarchy, implementation selection details.

This commit is contained in:
Patrick Mahoney 2012-02-07 23:06:17 -06:00
parent 64b746a98c
commit 15fe2e5057
16 changed files with 100 additions and 147 deletions

View File

@ -3,39 +3,29 @@ require 'ffi'
if RUBY_VERSION =~ /^1.8/ if RUBY_VERSION =~ /^1.8/
require 'process_shared/define_singleton_method' require 'process_shared/define_singleton_method'
module ProcessShared class Module
module PSem include ProcessShared::DefineSingletonMethod
extend DefineSingletonMethod
end
module RT
extend DefineSingletonMethod
end
module LibC
extend DefineSingletonMethod
end
end end
end end
require 'process_shared/semaphore'
require 'process_shared/binary_semaphore'
require 'process_shared/mutex'
require 'process_shared/shared_memory'
module ProcessShared module ProcessShared
case FFI::Platform::OS case FFI::Platform::OS
when 'linux' when 'linux'
require 'process_shared/posix/shared_memory' require 'process_shared/posix/shared_memory'
require 'process_shared/posix/semaphore' require 'process_shared/posix/semaphore'
SharedMemory.impl = Posix::SharedMemory SharedMemory = Posix::SharedMemory
Semaphore.impl = Posix::Semaphore Semaphore = Posix::Semaphore
when 'darwin' when 'darwin'
require 'process_shared/posix/shared_memory' require 'process_shared/posix/shared_memory'
require 'process_shared/mach/semaphore' require 'process_shared/mach/semaphore'
SharedMemory.impl = Posix::SharedMemory SharedMemory = Posix::SharedMemory
Semaphore.impl = Mach::Semaphore Semaphore = Mach::Semaphore
end end
end end
require 'process_shared/binary_semaphore'
require 'process_shared/mutex'
require 'process_shared/condition_variable'

View File

@ -1,8 +1,7 @@
require 'forwardable' require 'forwardable'
require 'process_shared' require 'process_shared'
require 'process_shared/with_self' require 'process_shared/open_with_self'
require 'process_shared/semaphore'
require 'process_shared/process_error' require 'process_shared/process_error'
module ProcessShared module ProcessShared
@ -14,14 +13,10 @@ module ProcessShared
# This is identical to a Semaphore but with extra error checking. # This is identical to a Semaphore but with extra error checking.
class BinarySemaphore class BinarySemaphore
extend Forwardable extend Forwardable
include ProcessShared::WithSelf extend ProcessShared::OpenWithSelf
def_delegators :@sem, :wait, :try_wait, :synchronize, :value, :close def_delegators :@sem, :wait, :try_wait, :synchronize, :value, :close
def self.open(value = 1, &block)
new(value).with_self(&block)
end
# Create a new semaphore with initial value +value+. After # Create a new semaphore with initial value +value+. After
# {Kernel#fork}, the semaphore will be shared across two (or more) # {Kernel#fork}, the semaphore will be shared across two (or more)
# processes. The semaphore must be closed with {#close} in each # processes. The semaphore must be closed with {#close} in each

View File

@ -1,4 +1,4 @@
require 'process_shared/semaphore' require 'process_shared'
module ProcessShared module ProcessShared
class ConditionVariable class ConditionVariable

View File

@ -2,12 +2,15 @@ require 'mach'
require 'mach/error' require 'mach/error'
require 'process_shared/mach' require 'process_shared/mach'
require 'process_shared/open_with_self'
require 'process_shared/synchronizable_semaphore'
module ProcessShared module ProcessShared
module Mach module Mach
# Extends ::Mach::Semaphore to be compatible with ProcessShared::Semaphore # Extends ::Mach::Semaphore to be compatible with ProcessShared::Semaphore
class Semaphore < ::Mach::Semaphore class Semaphore < ::Mach::Semaphore
include ProcessShared::Semaphore extend ProcessShared::OpenWithSelf
include ProcessShared::SynchronizableSemaphore
def initialize(value = 1) def initialize(value = 1)
super(:value => value) super(:value => value)

View File

@ -1,6 +1,5 @@
require 'process_shared/semaphore' require 'process_shared'
require 'process_shared/with_self' require 'process_shared/open_with_self'
require 'process_shared/shared_memory'
require 'process_shared/process_error' require 'process_shared/process_error'
module ProcessShared module ProcessShared
@ -20,11 +19,7 @@ module ProcessShared
# release its {Semaphore} and {SharedMemory} resources. For now, # release its {Semaphore} and {SharedMemory} resources. For now,
# rely on the object finalizers of those objects... # rely on the object finalizers of those objects...
class Mutex class Mutex
# include WithSelf extend OpenWithSelf
# def self.open(&block)
# new.with_self(&block)
# end
def initialize def initialize
@internal_sem = Semaphore.new @internal_sem = Semaphore.new

View File

@ -1,23 +1,9 @@
require 'process_shared/with_self'
require 'process_shared/shared_memory_io' require 'process_shared/shared_memory_io'
module ProcessShared module ProcessShared
# Memory block shared across processes. # Provides reading and writing of serialized objects from a memory
module SharedMemory # buffer.
include ProcessShared::WithSelf module ObjectBuffer
class << self
attr_accessor :impl
def new(*args)
impl.new(*args)
end
def open(size, &block)
new(size).with_self(&block)
end
end
# Write the serialization of +obj+ (using Marshal.dump) to this # Write the serialization of +obj+ (using Marshal.dump) to this
# shared memory object at +offset+ (in bytes). # shared memory object at +offset+ (in bytes).
# #

View File

@ -0,0 +1,20 @@
module ProcessShared
module OpenWithSelf
# Like #new but if the optional code block is given, it will be
# passed the new object as an argument, and the object will
# automatically be closed (by invoking +close+) when the block
# terminates. In this instance, value of the block is returned.
def open(*args, &block)
obj = new(*args)
if block_given?
begin
yield obj
ensure
obj.close
end
else
obj
end
end
end
end

View File

@ -1,4 +1,5 @@
require 'process_shared/semaphore' require 'process_shared/synchronizable_semaphore'
require 'process_shared/open_with_self'
require 'process_shared/posix/errno' require 'process_shared/posix/errno'
require 'process_shared/posix/libc' require 'process_shared/posix/libc'
@ -34,8 +35,9 @@ module ProcessShared
:sem_timedwait) :sem_timedwait)
end end
extend ProcessShared::OpenWithSelf
include Foreign include Foreign
include ProcessShared::Semaphore include ProcessShared::SynchronizableSemaphore
# Make a Proc suitable for use as a finalizer that will call # Make a Proc suitable for use as a finalizer that will call
# +shm_unlink+ on +sem+. # +shm_unlink+ on +sem+.

View File

@ -2,7 +2,8 @@ require 'ffi'
require 'process_shared/posix/errno' require 'process_shared/posix/errno'
require 'process_shared/posix/libc' require 'process_shared/posix/libc'
require 'process_shared/shared_memory' require 'process_shared/object_buffer'
require 'process_shared/open_with_self'
module ProcessShared module ProcessShared
module Posix module Posix
@ -25,10 +26,10 @@ module ProcessShared
error_check :shm_open, :shm_unlink error_check :shm_open, :shm_unlink
end end
extend ProcessShared::OpenWithSelf
include SharedMemory::Foreign include SharedMemory::Foreign
include LibC include LibC
include ProcessShared::ObjectBuffer
include ProcessShared::SharedMemory
attr_reader :size, :type, :type_size, :count, :fd attr_reader :size, :type, :type_size, :count, :fd

View File

@ -1,37 +0,0 @@
require 'process_shared/with_self'
module ProcessShared
module Semaphore
include ProcessShared::WithSelf
class << self
# the implementation to use to create semaphores. impl is set
# based on the platform in 'process_shared'
attr_accessor :impl
def new(*args)
impl.new(*args)
end
# With no associated block, open is a synonym for
# Semaphore.new. If the optional code block is given, it will be
# passed +sem+ as an argument, and the Semaphore object will
# automatically be closed when the block terminates. In this
# instance, Semaphore.open returns the value of the block.
#
# @param [Integer] value the initial semaphore value
def open(value = 1, &block)
new(value).with_self(&block)
end
end
def synchronize
wait
begin
yield
ensure
post
end
end
end
end

View File

@ -1,7 +1,7 @@
require 'process_shared' require 'process_shared'
module ProcessShared module ProcessShared
class SharedArray < SharedMemory.impl class SharedArray < SharedMemory
include Enumerable include Enumerable
# A fixed-size array in shared memory. Processes forked from this # A fixed-size array in shared memory. Processes forked from this

View File

@ -0,0 +1,16 @@
module ProcessShared
module SynchronizableSemaphore
# Yield the block after decrementing the semaphore, ensuring that
# the semaphore is incremented.
#
# @return [Object] the value of the block
def synchronize
wait
begin
yield
ensure
post
end
end
end
end

View File

@ -1,20 +0,0 @@
module ProcessShared
module WithSelf
# With no associated block, return self. If the optional code
# block is given, it will be passed `self` as an argument, and the
# self object will automatically be closed (by invoking +close+ on
# +self+) when the block terminates. In this instance, value of
# the block is returned.
def with_self
if block_given?
begin
yield self
ensure
self.close
end
else
self
end
end
end
end

View File

@ -30,31 +30,35 @@ module Mach
end end
it 'coordinates access to shared resource between two tasks' do it 'coordinates access to shared resource between two tasks' do
sem = Semaphore.new(:value => 0) begin
sem = Semaphore.new(:value => 0)
port = Port.new port = Port.new
port.insert_right(:make_send) port.insert_right(:make_send)
Task.self.set_bootstrap_port(port) Task.self.set_bootstrap_port(port)
child = fork do child = fork do
parent_port = Task.self.get_bootstrap_port parent_port = Task.self.get_bootstrap_port
Task.self.copy_send(parent_port) Task.self.copy_send(parent_port)
# parent will copy send rights to sem into child task # parent will copy send rights to sem into child task
sleep 0.5 sleep 0.5
sem.signal sem.signal
Kernel.exit! Kernel.exit!
end
child_task_port = port.receive_right
start = Time.now.to_f
sem.insert_right(:copy_send, :ipc_space => child_task_port)
sem.wait
elapsed = Time.now.to_f - start
Process.wait child
elapsed.must be_gt(0.4)
ensure
Task.self.set_bootstrap_port(Mach::Functions.bootstrap_port)
end end
child_task_port = port.receive_right
start = Time.now.to_f
sem.insert_right(:copy_send, :ipc_space => child_task_port)
sem.wait
elapsed = Time.now.to_f - start
Process.wait child
elapsed.must be_gt(0.4)
end end
end end
end end

View File

@ -1,6 +1,5 @@
require 'spec_helper' require 'spec_helper'
require 'process_shared/mutex' require 'process_shared'
require 'process_shared/shared_memory'
module ProcessShared module ProcessShared
describe Mutex do describe Mutex do
@ -60,9 +59,9 @@ module ProcessShared
pid = Kernel.fork do pid = Kernel.fork do
mutex.lock mutex.lock
sleep 0.2 # sleep 0.2
mutex.unlock # mutex.unlock
Kernel.exit! # Kernel.exit!
end end
sleep 0.1 sleep 0.1

View File

@ -1,8 +1,7 @@
require 'spec_helper' require 'spec_helper'
require 'ffi' require 'ffi'
require 'process_shared/semaphore' require 'process_shared'
require 'process_shared/shared_memory'
module ProcessShared module ProcessShared
describe Semaphore do describe Semaphore do