diff --git a/lib/process_shared/mach.rb b/lib/process_shared/mach.rb new file mode 100644 index 0000000..6ef87fc --- /dev/null +++ b/lib/process_shared/mach.rb @@ -0,0 +1,93 @@ +require 'set' +require 'mach' + +module ProcessShared + module Mach + include ::Mach + + # The set of ports that should be shared to forked child + # processes. + # + # FIXME: protect with (original ruby) mutex? + def self.shared_ports + @shared_ports ||= Set.new + end + + def self.after_fork_child + parent_port = Task.self.get_bootstrap_port + + # give parent permission to send to child's task port + Task.self.copy_send(parent_port) + + # create a second port and give the parent permission to send + port = Port.new + port.insert_right(:make_send) + port.copy_send(parent_port) + + # parent copies sem, mutex port permissions directly to child + # task port + + # wait for parent to send orig bootstrap port + orig_bootstrap = port.receive_right + Task.self.set_special_port(:bootstrap, orig_bootstrap) + end + + def self.after_fork_parent(port) + child_task_port = port.receive_right + shared_ports.each do |p| + p.insert_right(:copy_send, :ipc_space => child_task_port) + end + + child_port = port.receive_right + ::Mach::bootstrap_port.copy_send(child_port) + end + end +end + +module Kernel + # Override to call Process::fork. + def fork(*args, &block) + Process.fork(*args, &block) + end +end + +module Process + class << self + unless respond_to? :__mach_original_fork__ + alias_method :__mach_original_fork__, :fork + end + + # Override to first copy all shared ports (semaphores, etc.) from + # parent process to child process. + def fork + # make a port for receiving message from child + port = Mach::Port.new + port.insert_right(:make_send) + Mach::Task.self.set_bootstrap_port(port) + + if block_given? + pid = __mach_original_fork__ do + ProcessShared::Mach.after_fork_child + yield + end + + ProcessShared::Mach.after_fork_parent(port) + pid + else + if pid = __mach_original_fork__ + ProcessShared::Mach.after_fork_parent(port) + pid + else + ProcessShared::Mach.after_fork_child + nil + end + end + end + end +end + +require 'mach/time_spec' +require 'process_shared/time_spec' + +# Monkey patch to add #add_seconds! method +Mach::TimeSpec.send(:include, ProcessShared::TimeSpec) diff --git a/lib/process_shared/mach/semaphore.rb b/lib/process_shared/mach/semaphore.rb new file mode 100644 index 0000000..a0fce59 --- /dev/null +++ b/lib/process_shared/mach/semaphore.rb @@ -0,0 +1,39 @@ +require 'mach' +require 'mach/error' + +require 'process_shared/mach' + +module ProcessShared + module Mach + # Extends ::Mach::Semaphore to be compatible with ProcessShared::Semaphore + class Semaphore < ::Mach::Semaphore + include ProcessShared::Semaphore + + def initialize(value = 1) + super(:value => value) + ProcessShared::Mach.shared_ports.add self + end + + def try_wait(timeout = nil) + secs = timeout ? timeout : 0 + begin + # TODO catch and convert exceptions... + timedwait(secs) + rescue Mach::Error::OPERATION_TIMED_OUT => e + klass = secs == 0 ? Errno::EAGAIN : Errno::ETIMEDOUT + raise klass, e.message + end + end + + alias_method :post, :signal + + def value + raise Errno::ENOTSUP + end + + def close + # TODO + end + end + end +end diff --git a/spec/process_shared/semaphore_spec.rb b/spec/process_shared/semaphore_spec.rb index 3e3fa1c..3332a8e 100644 --- a/spec/process_shared/semaphore_spec.rb +++ b/spec/process_shared/semaphore_spec.rb @@ -98,14 +98,13 @@ module ProcessShared it 'returns after waiting if another processes posts' do Semaphore.open(0) do |sem| - start = Time.now.to_f - pid = fork do sleep 0.01 sem.post Kernel.exit! end + start = Time.now.to_f sem.try_wait(0.1) (Time.now.to_f - start).must be_lt(0.1)