Add Ruby/FFI wrapper around POSIX shared memory and semaphore (obviating libpsem).

This commit is contained in:
Patrick Mahoney 2012-01-29 22:54:22 -06:00
parent b58a1a7cda
commit ae67dc6889
6 changed files with 429 additions and 0 deletions

View File

@ -0,0 +1,40 @@
require 'ffi'
module ProcessShared
module Posix
module Errno
extend FFI::Library
ffi_lib FFI::Library::LIBC
attach_variable :errno, :int
# Replace methods in +syms+ with error checking wrappers that
# invoke the original method and raise a {SystemCallError} with
# the current errno if the return value is an error.
#
# Errors are detected if the block returns true when called with
# the original method's return value.
def error_check(*syms, &is_err)
unless block_given?
is_err = lambda { |v| (v == -1) }
end
syms.each do |sym|
method = self.method(sym)
new_method_body = proc do |*args|
ret = method.call(*args)
if is_err.call(ret)
raise SystemCallError.new("error in #{sym}", Errno.errno)
else
ret
end
end
define_singleton_method(sym, &new_method_body)
define_method(sym, &new_method_body)
end
end
end
end
end

View File

@ -0,0 +1,78 @@
require 'ffi'
require 'process_shared/posix/errno'
require 'process_shared/posix/time_val'
module ProcessShared
module Posix
module LibC
module Helper
extend FFI::Library
# Workaround FFI dylib/bundle issue. See https://github.com/ffi/ffi/issues/42
suffix = if FFI::Platform.mac?
'bundle'
else
FFI::Platform::LIBSUFFIX
end
ffi_lib File.join(File.expand_path(File.dirname(__FILE__)),
'helper.' + suffix)
[:o_rdwr,
:o_creat,
:o_excl,
:prot_read,
:prot_write,
:prot_exec,
:prot_none,
:map_shared,
:map_private].each do |sym|
attach_variable sym, :int
end
[:sizeof_sem_t].each do |sym|
attach_variable sym, :size_t
end
end
extend FFI::Library
extend Errno
ffi_lib FFI::Library::LIBC
MAP_FAILED = FFI::Pointer.new(-1)
MAP_SHARED = Helper.map_shared
MAP_PRIVATE = Helper.map_private
PROT_READ = Helper.prot_read
PROT_WRITE = Helper.prot_write
PROT_EXEC = Helper.prot_exec
PROT_NONE = Helper.prot_none
O_RDWR = Helper.o_rdwr
O_CREAT = Helper.o_creat
O_EXCL = Helper.o_excl
def self.type_size(type)
case type
when :sem_t
Helper.sizeof_sem_t
else
FFI.type_size(type)
end
end
attach_function :mmap, [:pointer, :size_t, :int, :int, :int, :off_t], :pointer
attach_function :munmap, [:pointer, :size_t], :int
attach_function :ftruncate, [:int, :off_t], :int
attach_function :close, [:int], :int
attach_function :gettimeofday, [TimeVal, :pointer], :int
error_check(:mmap) { |v| v == MAP_FAILED }
error_check(:munmap, :ftruncate, :close, :gettimeofday)
end
end
end

View File

@ -0,0 +1,154 @@
require 'process_shared/posix/errno'
require 'process_shared/posix/libc'
require 'process_shared/posix/time_val'
require 'process_shared/posix/time_spec'
require 'process_shared/with_self'
module ProcessShared
module Posix
class Semaphore
module Foreign
extend FFI::Library
extend Errno
ffi_lib 'rt' # 'pthread'
typedef :pointer, :sem_p
attach_function :sem_open, [:string, :int], :sem_p
attach_function :sem_close, [:sem_p], :int
attach_function :sem_unlink, [:string], :int
attach_function :sem_init, [:sem_p, :int, :uint], :int
attach_function :sem_destroy, [:sem_p], :int
attach_function :sem_getvalue, [:sem_p, :pointer], :int
attach_function :sem_post, [:sem_p], :int
attach_function :sem_wait, [:sem_p], :int
attach_function :sem_trywait, [:sem_p], :int
attach_function :sem_timedwait, [:sem_p, TimeSpec], :int
error_check(:sem_close, :sem_unlink, :sem_init, :sem_destroy,
:sem_getvalue, :sem_post, :sem_wait, :sem_trywait,
:sem_timedwait)
end
include Foreign
include ProcessShared::WithSelf
# Make a Proc suitable for use as a finalizer that will call
# +shm_unlink+ on +sem+.
#
# @return [Proc] a finalizer
def self.make_finalizer(sem)
proc { LibC.shm_unlink(sem) }
end
# With no associated block, open is a synonym for
# Semaphore.new. If the optional code block is given, it will be
# passed +sem+ as an argument, and the Semaphore object will
# automatically be closed when the block terminates. In this
# instance, Semaphore.open returns the value of the block.
#
# @param [Integer] value the initial semaphore value
def self.open(value = 1, &block)
new(value).with_self(&block)
end
# Create a new semaphore with initial value +value+. After
# Kernel#fork, the semaphore will be shared across two (or more)
# processes. The semaphore must be closed with #close in each
# process that no longer needs the semaphore.
#
# (An object finalizer is registered that will close the semaphore
# to avoid memory leaks, but this should be considered a last
# resort).
#
# @param [Integer] value the initial semaphore value
def initialize(value)
@sem = SharedMemory.new(LibC.type_size(:sem_t))
sem_init(@sem, 1, value)
ObjectSpace.define_finalizer(self, self.class.make_finalizer(@sem))
end
# Get the current value of the semaphore. Raises {Errno::NOTSUP} on
# platforms that don't support this (e.g. Mac OS X).
#
# @return [Integer] the current value of the semaphore.
def value
int = FFI::MemoryPointer.new(:int)
sem_getvalue(@sem, int)
int.read_int
end
# Increment the value of the semaphore. If other processes are
# waiting on this semaphore, one will be woken.
def post
sem_post(@sem)
end
# Decrement the value of the semaphore. If the value is zero,
# wait until another process increments via {#post}.
def wait
sem_wait(@sem)
end
NS_PER_S = 1e9
US_PER_NS = 1000
TV_NSEC_MAX = (NS_PER_S - 1)
# Decrement the value of the semaphore if it can be done
# immediately (i.e. if it was non-zero). Otherwise, wait up to
# +timeout+ seconds until another process increments via {#post}.
#
# @param timeout [Numeric] the maximum seconds to wait, or nil to not wait
#
# @return If +timeout+ is nil and the semaphore cannot be
# decremented immediately, raise Errno::EAGAIN. If +timeout+
# passed before the semaphore could be decremented, raise
# Errno::ETIMEDOUT.
def try_wait(timeout = nil)
if timeout
now = TimeVal.new
abs_timeout = TimeSpec.new
LibC.gettimeofday(now, nil)
abs_timeout[:tv_sec] = now[:tv_sec];
abs_timeout[:tv_nsec] = now[:tv_usec] * US_PER_NS
# add timeout in seconds to abs_timeout; careful with rounding
sec = timeout.floor
nsec = ((timeout - sec) * NS_PER_S).floor
abs_timeout[:tv_sec] += sec
abs_timeout[:tv_nsec] += nsec
while abs_timeout[:tv_nsec] > TV_NSEC_MAX
abs_timeout[:tv_sec] += 1
abs_timeout[:tv_nsec] -= NS_PER_S
end
sem_timedwait(@sem, abs_timeout)
else
sem_trywait(@sem)
end
end
# Close the shared memory block holding the semaphore.
#
# FIXME: May leak the semaphore memory on some platforms,
# according to the Linux man page for sem_destroy(3). (Should not
# be destroyed as it may be in use by other processes.)
def close
# sem_destroy(@sem)
# Not entirely sure what to do here. sem_destroy() goes with
# sem_init() (unnamed semaphroe), but other processes cannot use
# a destroyed semaphore.
@sem.close
@sem = nil
ObjectSpace.undefine_finalizer(self)
end
end
end
end

View File

@ -0,0 +1,137 @@
require 'ffi'
require 'process_shared/posix/errno'
require 'process_shared/posix/libc'
require 'process_shared/shared_memory_io'
require 'process_shared/with_self'
module ProcessShared
module Posix
# Memory block shared across processes.
class SharedMemory < FFI::Pointer
module Foreign
extend FFI::Library
extend Errno
# FIXME: mac and linux OK, but what about everything else?
if FFI::Platform.mac?
ffi_lib 'c'
else
ffi_lib 'rt'
end
attach_function :shm_open, [:string, :int, :mode_t], :int
attach_function :shm_unlink, [:string], :int
error_check :shm_open, :shm_unlink
end
include SharedMemory::Foreign
include LibC
include ProcessShared::WithSelf
attr_reader :size, :type, :type_size, :count, :fd
def self.open(size, &block)
new(size).with_self(&block)
end
def self.make_finalizer(addr, size, fd)
proc do
pointer = FFI::Pointer.new(addr)
LibC.munmap(pointer, size)
LibC.close(fd)
end
end
def initialize(type_or_count = 1, count = 1)
@type, @count = case type_or_count
when Symbol
[type_or_count, count]
else
[:uchar, type_or_count]
end
@type_size = FFI.type_size(@type)
@size = @type_size * @count
name = "/ps-shm#{rand(10000)}"
@fd = shm_open(name,
O_CREAT | O_RDWR | O_EXCL,
0777)
shm_unlink(name)
ftruncate(@fd, @size)
@pointer = mmap(nil,
@size,
LibC::PROT_READ | LibC::PROT_WRITE,
LibC::MAP_SHARED,
@fd,
0).
slice(0, size) # slice to get FFI::Pointer that knows its size
# (and thus does bounds checking)
@finalize = self.class.make_finalizer(@pointer.address, @size, @fd)
ObjectSpace.define_finalizer(self, @finalize)
super(@pointer)
end
# Write the serialization of +obj+ (using Marshal.dump) to this
# shared memory object at +offset+ (in bytes).
#
# Raises IndexError if there is insufficient space.
def put_object(offset, obj)
# FIXME: This is a workaround to an issue I'm seeing in
# 1.8.7-p352 (not tested in other 1.8's). If I used the code
# below that works in 1.9, then inside SharedMemoryIO#write, the
# passed string object is 'terminated' (garbage collected?) and
# won't respond to any methods... This less efficient since it
# involves the creation of an intermediate string, but it works
# in 1.8.7-p352.
if RUBY_VERSION =~ /^1.8/
str = Marshal.dump(obj)
return put_bytes(offset, str, 0, str.size)
end
io = to_shm_io
io.seek(offset)
Marshal.dump(obj, io)
end
# Read the serialized object at +offset+ (in bytes) using
# Marshal.load.
#
# @return [Object]
def get_object(offset)
io = to_shm_io
io.seek(offset)
Marshal.load(io)
end
# Equivalent to {#put_object(0, obj)}
def write_object(obj)
put_object(0, obj)
end
# Equivalent to {#read_object(0, obj)}
#
# @return [Object]
def read_object
Marshal.load(to_shm_io)
end
def close
ObjectSpace.undefine_finalizer(self)
@finalize.call
end
private
def to_shm_io
ProcessShared::SharedMemoryIO.new(self)
end
end
end
end

View File

@ -0,0 +1,10 @@
require 'ffi'
module ProcessShared
module Posix
class TimeSpec < FFI::Struct
layout(:tv_sec, :time_t,
:tv_nsec, :long)
end
end
end

View File

@ -0,0 +1,10 @@
require 'ffi'
module ProcessShared
module Posix
class TimeVal < FFI::Struct
layout(:tv_sec, :time_t,
:tv_usec, :suseconds_t)
end
end
end