diff --git a/lib/process_shared/posix/errno.rb b/lib/process_shared/posix/errno.rb new file mode 100644 index 0000000..4ed25fc --- /dev/null +++ b/lib/process_shared/posix/errno.rb @@ -0,0 +1,40 @@ +require 'ffi' + +module ProcessShared + module Posix + module Errno + extend FFI::Library + + ffi_lib FFI::Library::LIBC + + attach_variable :errno, :int + + # Replace methods in +syms+ with error checking wrappers that + # invoke the original method and raise a {SystemCallError} with + # the current errno if the return value is an error. + # + # Errors are detected if the block returns true when called with + # the original method's return value. + def error_check(*syms, &is_err) + unless block_given? + is_err = lambda { |v| (v == -1) } + end + + syms.each do |sym| + method = self.method(sym) + new_method_body = proc do |*args| + ret = method.call(*args) + if is_err.call(ret) + raise SystemCallError.new("error in #{sym}", Errno.errno) + else + ret + end + end + + define_singleton_method(sym, &new_method_body) + define_method(sym, &new_method_body) + end + end + end + end +end diff --git a/lib/process_shared/posix/libc.rb b/lib/process_shared/posix/libc.rb new file mode 100644 index 0000000..4c97cad --- /dev/null +++ b/lib/process_shared/posix/libc.rb @@ -0,0 +1,78 @@ +require 'ffi' + +require 'process_shared/posix/errno' +require 'process_shared/posix/time_val' + +module ProcessShared + module Posix + module LibC + module Helper + extend FFI::Library + + # Workaround FFI dylib/bundle issue. See https://github.com/ffi/ffi/issues/42 + suffix = if FFI::Platform.mac? + 'bundle' + else + FFI::Platform::LIBSUFFIX + end + + ffi_lib File.join(File.expand_path(File.dirname(__FILE__)), + 'helper.' + suffix) + + [:o_rdwr, + :o_creat, + :o_excl, + + :prot_read, + :prot_write, + :prot_exec, + :prot_none, + + :map_shared, + :map_private].each do |sym| + attach_variable sym, :int + end + + [:sizeof_sem_t].each do |sym| + attach_variable sym, :size_t + end + end + + extend FFI::Library + extend Errno + + ffi_lib FFI::Library::LIBC + + MAP_FAILED = FFI::Pointer.new(-1) + MAP_SHARED = Helper.map_shared + MAP_PRIVATE = Helper.map_private + + PROT_READ = Helper.prot_read + PROT_WRITE = Helper.prot_write + PROT_EXEC = Helper.prot_exec + PROT_NONE = Helper.prot_none + + O_RDWR = Helper.o_rdwr + O_CREAT = Helper.o_creat + O_EXCL = Helper.o_excl + + def self.type_size(type) + case type + when :sem_t + Helper.sizeof_sem_t + else + FFI.type_size(type) + end + end + + attach_function :mmap, [:pointer, :size_t, :int, :int, :int, :off_t], :pointer + attach_function :munmap, [:pointer, :size_t], :int + attach_function :ftruncate, [:int, :off_t], :int + attach_function :close, [:int], :int + attach_function :gettimeofday, [TimeVal, :pointer], :int + + error_check(:mmap) { |v| v == MAP_FAILED } + error_check(:munmap, :ftruncate, :close, :gettimeofday) + end + end +end diff --git a/lib/process_shared/posix/semaphore.rb b/lib/process_shared/posix/semaphore.rb new file mode 100644 index 0000000..eeea196 --- /dev/null +++ b/lib/process_shared/posix/semaphore.rb @@ -0,0 +1,154 @@ +require 'process_shared/posix/errno' +require 'process_shared/posix/libc' +require 'process_shared/posix/time_val' +require 'process_shared/posix/time_spec' +require 'process_shared/with_self' + +module ProcessShared + module Posix + class Semaphore + module Foreign + extend FFI::Library + extend Errno + + ffi_lib 'rt' # 'pthread' + + typedef :pointer, :sem_p + + attach_function :sem_open, [:string, :int], :sem_p + attach_function :sem_close, [:sem_p], :int + attach_function :sem_unlink, [:string], :int + + attach_function :sem_init, [:sem_p, :int, :uint], :int + attach_function :sem_destroy, [:sem_p], :int + + attach_function :sem_getvalue, [:sem_p, :pointer], :int + attach_function :sem_post, [:sem_p], :int + attach_function :sem_wait, [:sem_p], :int + attach_function :sem_trywait, [:sem_p], :int + attach_function :sem_timedwait, [:sem_p, TimeSpec], :int + + error_check(:sem_close, :sem_unlink, :sem_init, :sem_destroy, + :sem_getvalue, :sem_post, :sem_wait, :sem_trywait, + :sem_timedwait) + end + + include Foreign + include ProcessShared::WithSelf + + # Make a Proc suitable for use as a finalizer that will call + # +shm_unlink+ on +sem+. + # + # @return [Proc] a finalizer + def self.make_finalizer(sem) + proc { LibC.shm_unlink(sem) } + end + + # With no associated block, open is a synonym for + # Semaphore.new. If the optional code block is given, it will be + # passed +sem+ as an argument, and the Semaphore object will + # automatically be closed when the block terminates. In this + # instance, Semaphore.open returns the value of the block. + # + # @param [Integer] value the initial semaphore value + def self.open(value = 1, &block) + new(value).with_self(&block) + end + + # Create a new semaphore with initial value +value+. After + # Kernel#fork, the semaphore will be shared across two (or more) + # processes. The semaphore must be closed with #close in each + # process that no longer needs the semaphore. + # + # (An object finalizer is registered that will close the semaphore + # to avoid memory leaks, but this should be considered a last + # resort). + # + # @param [Integer] value the initial semaphore value + def initialize(value) + @sem = SharedMemory.new(LibC.type_size(:sem_t)) + sem_init(@sem, 1, value) + ObjectSpace.define_finalizer(self, self.class.make_finalizer(@sem)) + end + + # Get the current value of the semaphore. Raises {Errno::NOTSUP} on + # platforms that don't support this (e.g. Mac OS X). + # + # @return [Integer] the current value of the semaphore. + def value + int = FFI::MemoryPointer.new(:int) + sem_getvalue(@sem, int) + int.read_int + end + + # Increment the value of the semaphore. If other processes are + # waiting on this semaphore, one will be woken. + def post + sem_post(@sem) + end + + # Decrement the value of the semaphore. If the value is zero, + # wait until another process increments via {#post}. + def wait + sem_wait(@sem) + end + + NS_PER_S = 1e9 + US_PER_NS = 1000 + TV_NSEC_MAX = (NS_PER_S - 1) + + # Decrement the value of the semaphore if it can be done + # immediately (i.e. if it was non-zero). Otherwise, wait up to + # +timeout+ seconds until another process increments via {#post}. + # + # @param timeout [Numeric] the maximum seconds to wait, or nil to not wait + # + # @return If +timeout+ is nil and the semaphore cannot be + # decremented immediately, raise Errno::EAGAIN. If +timeout+ + # passed before the semaphore could be decremented, raise + # Errno::ETIMEDOUT. + def try_wait(timeout = nil) + if timeout + now = TimeVal.new + abs_timeout = TimeSpec.new + + LibC.gettimeofday(now, nil) + + abs_timeout[:tv_sec] = now[:tv_sec]; + abs_timeout[:tv_nsec] = now[:tv_usec] * US_PER_NS + + # add timeout in seconds to abs_timeout; careful with rounding + sec = timeout.floor + nsec = ((timeout - sec) * NS_PER_S).floor + + abs_timeout[:tv_sec] += sec + abs_timeout[:tv_nsec] += nsec + while abs_timeout[:tv_nsec] > TV_NSEC_MAX + abs_timeout[:tv_sec] += 1 + abs_timeout[:tv_nsec] -= NS_PER_S + end + + sem_timedwait(@sem, abs_timeout) + else + sem_trywait(@sem) + end + end + + # Close the shared memory block holding the semaphore. + # + # FIXME: May leak the semaphore memory on some platforms, + # according to the Linux man page for sem_destroy(3). (Should not + # be destroyed as it may be in use by other processes.) + def close + # sem_destroy(@sem) + + # Not entirely sure what to do here. sem_destroy() goes with + # sem_init() (unnamed semaphroe), but other processes cannot use + # a destroyed semaphore. + @sem.close + @sem = nil + ObjectSpace.undefine_finalizer(self) + end + end + end +end diff --git a/lib/process_shared/posix/shared_memory.rb b/lib/process_shared/posix/shared_memory.rb new file mode 100644 index 0000000..83a9914 --- /dev/null +++ b/lib/process_shared/posix/shared_memory.rb @@ -0,0 +1,137 @@ +require 'ffi' + +require 'process_shared/posix/errno' +require 'process_shared/posix/libc' +require 'process_shared/shared_memory_io' +require 'process_shared/with_self' + +module ProcessShared + module Posix + # Memory block shared across processes. + class SharedMemory < FFI::Pointer + module Foreign + extend FFI::Library + extend Errno + + # FIXME: mac and linux OK, but what about everything else? + if FFI::Platform.mac? + ffi_lib 'c' + else + ffi_lib 'rt' + end + + attach_function :shm_open, [:string, :int, :mode_t], :int + attach_function :shm_unlink, [:string], :int + + error_check :shm_open, :shm_unlink + end + + include SharedMemory::Foreign + include LibC + + include ProcessShared::WithSelf + + attr_reader :size, :type, :type_size, :count, :fd + + def self.open(size, &block) + new(size).with_self(&block) + end + + def self.make_finalizer(addr, size, fd) + proc do + pointer = FFI::Pointer.new(addr) + LibC.munmap(pointer, size) + LibC.close(fd) + end + end + + def initialize(type_or_count = 1, count = 1) + @type, @count = case type_or_count + when Symbol + [type_or_count, count] + else + [:uchar, type_or_count] + end + + @type_size = FFI.type_size(@type) + @size = @type_size * @count + + name = "/ps-shm#{rand(10000)}" + @fd = shm_open(name, + O_CREAT | O_RDWR | O_EXCL, + 0777) + shm_unlink(name) + + ftruncate(@fd, @size) + @pointer = mmap(nil, + @size, + LibC::PROT_READ | LibC::PROT_WRITE, + LibC::MAP_SHARED, + @fd, + 0). + slice(0, size) # slice to get FFI::Pointer that knows its size + # (and thus does bounds checking) + + @finalize = self.class.make_finalizer(@pointer.address, @size, @fd) + ObjectSpace.define_finalizer(self, @finalize) + + super(@pointer) + end + + # Write the serialization of +obj+ (using Marshal.dump) to this + # shared memory object at +offset+ (in bytes). + # + # Raises IndexError if there is insufficient space. + def put_object(offset, obj) + # FIXME: This is a workaround to an issue I'm seeing in + # 1.8.7-p352 (not tested in other 1.8's). If I used the code + # below that works in 1.9, then inside SharedMemoryIO#write, the + # passed string object is 'terminated' (garbage collected?) and + # won't respond to any methods... This less efficient since it + # involves the creation of an intermediate string, but it works + # in 1.8.7-p352. + if RUBY_VERSION =~ /^1.8/ + str = Marshal.dump(obj) + return put_bytes(offset, str, 0, str.size) + end + + io = to_shm_io + io.seek(offset) + Marshal.dump(obj, io) + end + + # Read the serialized object at +offset+ (in bytes) using + # Marshal.load. + # + # @return [Object] + def get_object(offset) + io = to_shm_io + io.seek(offset) + Marshal.load(io) + end + + # Equivalent to {#put_object(0, obj)} + def write_object(obj) + put_object(0, obj) + end + + # Equivalent to {#read_object(0, obj)} + # + # @return [Object] + def read_object + Marshal.load(to_shm_io) + end + + def close + ObjectSpace.undefine_finalizer(self) + @finalize.call + end + + private + + def to_shm_io + ProcessShared::SharedMemoryIO.new(self) + end + end + end +end diff --git a/lib/process_shared/posix/time_spec.rb b/lib/process_shared/posix/time_spec.rb new file mode 100644 index 0000000..36e36c6 --- /dev/null +++ b/lib/process_shared/posix/time_spec.rb @@ -0,0 +1,10 @@ +require 'ffi' + +module ProcessShared + module Posix + class TimeSpec < FFI::Struct + layout(:tv_sec, :time_t, + :tv_nsec, :long) + end + end +end diff --git a/lib/process_shared/posix/time_val.rb b/lib/process_shared/posix/time_val.rb new file mode 100644 index 0000000..edac9df --- /dev/null +++ b/lib/process_shared/posix/time_val.rb @@ -0,0 +1,10 @@ +require 'ffi' + +module ProcessShared + module Posix + class TimeVal < FFI::Struct + layout(:tv_sec, :time_t, + :tv_usec, :suseconds_t) + end + end +end