Initial commit.

This commit is contained in:
Patrick Mahoney 2011-12-11 21:39:55 -06:00
commit cf45ab9891
42 changed files with 1704 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
*~
pkg
tmp
Gemfile.lock

19
COPYING Normal file
View File

@ -0,0 +1,19 @@
Copyright (c) 2011 Patrick Mahoney
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

0
ChangeLog Normal file
View File

2
Gemfile Normal file
View File

@ -0,0 +1,2 @@
source 'http://rubygems.org'
gemspec

69
README.rdoc Normal file
View File

@ -0,0 +1,69 @@
== Description
Concurrency primitives that may be used in a cross-process way to
coordinate share memory between processes.
A small C library (libpsem) is compiled to support portable access to
semaphores. This library is then accessed using FFI to implement Ruby
classes ProcessShared::Semaphore, ProcessShared::BoundedSemaphore,
ProcessShared::Mutex, and ProcessShared::SharedMemory.
This is an incomplete work in progress.
== License
MIT
== Install
Install the gem with:
gem install process_shared
== Usage
require 'process_shared'
mutex = ProcessShared::Mutex.new
mem = ProcessShared::SharedMemory.new(:int) # extends FFI::Pointer
mem.put_int(0, 0)
pid1 = fork do
puts "in process 1 (#{Process.pid})"
10.times do
sleep 0.01
mutex.synchronize do
value = mem.get_int(0)
sleep 0.01
puts "process 1 (#{Process.pid}) incrementing"
mem.put_int(0, value + 1)
end
end
end
pid2 = fork do
puts "in process 2 (#{Process.pid})"
10.times do
sleep 0.01
mutex.synchronize do
value = mem.get_int(0)
sleep 0.01
puts "process 2 (#{Process.pid}) decrementing"
mem.put_int(0, value - 1)
end
end
end
Process.wait(pid1)
Process.wait(pid2)
puts "value should be zero: #{mem.get_int(0)}"
== Todo
* Implement ConditionVariable
* Implement optional override of core Thread/Mutex classes
* Extend libpsem to win32? (See Python's processing library)
* Break out tests that use PSem.getvalue() (which isn't supported on Mac OS X)
so that the test suite will pass
* Add finalizer to Mutex? (finalizer on Semaphore objects may be enough) or a method to
explicitly close and release resources?

24
Rakefile Normal file
View File

@ -0,0 +1,24 @@
require 'rake/extensiontask'
require 'rake/testtask'
require 'rubygems/package_task'
def gemspec
@gemspec ||= eval(File.read('process_shared.gemspec'), binding, 'process_shared.gemspec')
end
Rake::ExtensionTask.new('libpsem') do |ext|
ext.lib_dir = 'lib/process_shared'
end
desc 'Run the tests'
task :default => [:test]
Rake::TestTask.new do |t|
t.pattern = 'spec/**/*_spec.rb'
t.libs.push 'spec'
end
Gem::PackageTask.new(gemspec) do |p|
p.need_tar = true
p.gem_spec = gemspec
end

188
ext/libpsem/bsem.c Normal file
View File

@ -0,0 +1,188 @@
/*
* Extensions atop psem. Recursive mutex, bounded semaphore.
*/
#include <stdlib.h> /* malloc, free */
#include "mempcpy.h" /* includes string.h */
#include "psem.h"
#include "psem_error.h"
#include "bsem.h"
#define MAX_NAME 128 /* This is much less the POSIX max
name. Users of this library must
not use longer names. */
static const char bsem_lock_suffix[] = "-bsem-lock";
#define MAX_LOCK_NAME (MAX_NAME + strlen(bsem_lock_suffix) + 1)
/**
* Assumes dest has sufficient space to hold "[MAX_NAME]-bsem-lock".
*/
static int
make_lockname(char *dest, const char *name, error_t **err)
{
int namelen;
namelen = strlen(name);
if (namelen > MAX_NAME) {
error_new(err, E_SOURCE_PSEM, E_NAME_TOO_LONG);
return ERROR;
}
*((char *) mempcpy(mempcpy(dest, name, namelen),
bsem_lock_suffix,
strlen(bsem_lock_suffix))) = '\0';
}
size_t sizeof_bsem_t = sizeof (bsem_t);
bsem_t *
bsem_alloc(void) {
return malloc(sizeof(bsem_t));
}
void
bsem_free(bsem_t *bsem) {
free(bsem);
}
#define call_or_return(exp) \
do { if ((exp) == ERROR) { return ERROR; } } while (0)
#define bsem_lock_or_return(bsem, err) call_or_return(bsem_lock((bsem), (err)))
#define bsem_unlock_or_return(bsem, err) call_or_return(bsem_unlock((bsem), (err)))
int
bsem_open(bsem_t *bsem, const char *name, unsigned int maxvalue, unsigned int value, error_t **err)
{
char lockname[MAX_LOCK_NAME];
call_or_return(psem_open(&bsem->psem, name, value, err));
call_or_return(make_lockname(lockname, name, err));
call_or_return(psem_open(&bsem->lock, lockname, 1, err));
bsem->maxvalue = maxvalue;
return OK;
}
static int
bsem_lock(bsem_t *bsem, error_t **err)
{
call_or_return(psem_wait(&bsem->lock, err));
return OK;
}
static int
bsem_unlock(bsem_t *bsem, error_t **err)
{
call_or_return(psem_post(&bsem->lock, err));
return OK;
}
int
bsem_close(bsem_t *bsem, error_t **err)
{
bsem_lock_or_return(bsem, err);
if (psem_close(&bsem->psem, err) == ERROR) {
bsem_unlock(bsem, NULL);
return ERROR;
}
bsem_unlock_or_return(bsem, err);
call_or_return(psem_close(&bsem->lock, err));
return OK;
}
int
bsem_unlink(const char *name, error_t **err)
{
char lockname[MAX_LOCK_NAME];
call_or_return(psem_unlink(name, err));
call_or_return(make_lockname(lockname, name, err));
call_or_return(psem_unlink(lockname, err));
return OK;
}
int
bsem_post(bsem_t *bsem, error_t **err)
{
int sval;
bsem_lock_or_return(bsem, err);
/* FIXME: maxvalue is broken on some systems... (cygwin? mac?) */
if (psem_getvalue(&bsem->psem, &sval, err) == ERROR) {
bsem_unlock(bsem, err);
return ERROR;
}
if (sval >= bsem->maxvalue) {
/* ignored silently */
bsem_unlock(bsem, err);
return OK;
}
if (psem_post(&bsem->psem, err) == ERROR) {
bsem_unlock(bsem, err);
return ERROR;
}
bsem_unlock_or_return(bsem, err);
return OK;
}
int
bsem_wait(bsem_t *bsem, error_t **err)
{
call_or_return(psem_wait(&bsem->psem, err));
return OK;
}
int
bsem_trywait(bsem_t *bsem, error_t **err)
{
bsem_lock_or_return(bsem, err);
if (psem_trywait(&bsem->psem, err) == ERROR) {
bsem_unlock(bsem, NULL);
return ERROR;
}
bsem_unlock_or_return(bsem, err);
return OK;
}
int
bsem_timedwait(bsem_t *bsem, float timeout_s, error_t **err)
{
bsem_lock_or_return(bsem, err);
if (psem_timedwait(&bsem->psem, timeout_s, err) == ERROR) {
bsem_unlock(bsem, NULL);
return ERROR;
}
bsem_unlock_or_return(bsem, err);
return OK;
}
int
bsem_getvalue(bsem_t *bsem, int *sval, error_t **err)
{
bsem_lock_or_return(bsem, err);
if (psem_getvalue(&bsem->psem, sval, err) == ERROR) {
bsem_unlock(bsem, NULL);
return ERROR;
}
bsem_unlock_or_return(bsem, err);
return OK;
}

32
ext/libpsem/bsem.h Normal file
View File

@ -0,0 +1,32 @@
#ifndef __BSEM_H__
#define __BSEM_H__
#include "psem.h"
#include "psem_error.h"
struct bsem {
psem_t psem;
psem_t lock;
int maxvalue;
};
typedef struct bsem bsem_t;
extern size_t sizeof_bsem_t;
bsem_t * bsem_alloc();
void bsem_free(bsem_t *bsem);
int bsem_open(bsem_t *, const char *, unsigned int, unsigned int, error_t **);
int bsem_close(bsem_t *, error_t **);
int bsem_unlink(const char *, error_t **);
int bsem_post(bsem_t *, error_t **);
int bsem_wait(bsem_t *, error_t **);
int bsem_trywait(bsem_t *, error_t **);
int bsem_timedwait(bsem_t *, float, error_t **);
int bsem_getvalue(bsem_t *, int *, error_t **);
#endif /* __BSEM_H__ */

22
ext/libpsem/constants.c Normal file
View File

@ -0,0 +1,22 @@
/**
* Define and extern various constants defined as macros.
*/
#include <sys/mman.h> /* PROT_*, MAP_* */
#include <fcntl.h> /* O_* */
#include "constants.h"
int o_rdwr = O_RDWR;
int o_creat = O_CREAT;
int o_excl = O_EXCL;
int prot_read = PROT_READ;
int prot_write = PROT_WRITE;
int prot_exec = PROT_EXEC;
int prot_none = PROT_NONE;
void * map_failed = MAP_FAILED;
int map_shared = MAP_SHARED;
int map_private = MAP_PRIVATE;

18
ext/libpsem/constants.h Normal file
View File

@ -0,0 +1,18 @@
#ifndef __CONSTANTS_H__
#define __CONSTANTS_H__
extern int o_rdwr;
extern int o_creat;
extern int o_excl;
extern int prot_read;
extern int prot_write;
extern int prot_exec;
extern int prot_none;
extern void * map_failed;
extern int map_shared;
extern int map_private;
#endif

36
ext/libpsem/extconf.rb Normal file
View File

@ -0,0 +1,36 @@
require 'mkmf'
$objs = []
# posix semaphores
if have_func('sem_open', 'semaphore.h')
have_func('floorf', 'math.h') or abort("Missing required floorf() in math.h")
have_library('m', 'floorf')
unless have_func('mempcpy', 'string.h')
$objs << 'mempcpy.o'
end
have_library('rt', 'sem_open')
end
c_sources = ['psem.c', 'psem_error.c', 'psem_posix.c', 'bsem.c', 'constants.c']
$objs += ['psem.o', 'psem_error.o', 'bsem.o', 'constants.o']
depend_rules <<-END
psem.c: psem.h psem_posix.c
psem_error.c: psem_error.h
bsem.h: psem.h psem_error.h
bsem.c: psem.h psem_error.h bsem.h
constants.c: constants.h
mempcpy.c: mempcpy.h
#{$objs.map { |o| "#{o}: #{o.chomp(".o")}.c" }.join("\n")}
libpsem.o: #{$objs.join(' ')}
END
create_makefile('libpsem')

7
ext/libpsem/mempcpy.c Normal file
View File

@ -0,0 +1,7 @@
#include "mempcpy.h"
void *
mempcpy(void *dest, const void *src, size_t n)
{
return (char *) memcpy(dest, src, n) + n;
}

13
ext/libpsem/mempcpy.h Normal file
View File

@ -0,0 +1,13 @@
#ifndef __MEMPCPY_H__
#define __MEMPCPY_H__
#ifdef HAVE_MEMPCPY
#define __USE_GNU
#else
#include <stdlib.h>
void *mempcpy(void *, const void *, size_t);
#endif
#include <string.h>
#endif /* __MEMPCPY_H__ */

15
ext/libpsem/mutex.c Normal file
View File

@ -0,0 +1,15 @@
#include <stdlib.h> /* malloc, free */
#include "mutex.h"
size_t sizeof_mutex_t = sizeof (mutex_t);
mutex_t *
mutex_alloc(void) {
return malloc(sizeof(mutex_t));
}
void
mutex_free(mutex_t * mutex) {
free(mutex);
}

14
ext/libpsem/mutex.h Normal file
View File

@ -0,0 +1,14 @@
#ifndef __MUTEX_H__
#define __MUTEX_H__
#include "bsem.h"
struct mutex {
bsem_t *bsem;
};
typedef struct mutex mutex_t;
extern size_t sizeof_mutex_t;
#endif /* __MUTEX_H__ */

15
ext/libpsem/psem.c Normal file
View File

@ -0,0 +1,15 @@
#include "psem.h"
int OK = 0;
int ERROR = -1;
int E_SOURCE_SYSTEM = 1;
int E_SOURCE_PSEM = 2;
int E_NAME_TOO_LONG = 1;
#ifdef HAVE_SEM_OPEN
#include "psem_posix.c"
#endif
size_t sizeof_psem_t = sizeof (psem_t);

43
ext/libpsem/psem.h Normal file
View File

@ -0,0 +1,43 @@
#ifndef __PSEM_H__
#define __PSEM_H__
/**
* Portable semaphore interface focusing on cross-process use.
*/
#ifdef HAVE_SEM_OPEN
#include "psem_posix.h"
#endif
#include "psem_error.h"
typedef struct psem psem_t;
extern size_t sizeof_psem_t;
extern int OK;
extern int ERROR;
extern int E_SOURCE_SYSTEM;
extern int E_SOURCE_PSEM;
extern int E_NAME_TOO_LONG;
int psem_errno();
psem_t * psem_alloc();
void psem_free(psem_t *);
int psem_open(psem_t *, const char *, unsigned int, error_t **);
int psem_close(psem_t *, error_t **);
int psem_unlink(const char *, error_t **);
int psem_post(psem_t *, error_t **);
int psem_wait(psem_t *, error_t **);
int psem_trywait(psem_t *, error_t **);
int psem_timedwait(psem_t *, float, error_t **);
int psem_getvalue(psem_t *, int *, error_t **);
#endif /* __PSEM_H__ */

46
ext/libpsem/psem_error.c Normal file
View File

@ -0,0 +1,46 @@
/**
* Similar to GError from GLib.
*/
#include <stdlib.h> /* malloc, free */
#include "psem_error.h"
struct error {
int error_source;
int error_number;
};
error_t *
error_alloc()
{
return (error_t *) malloc(sizeof (error_t));
}
void
error_free(error_t *err)
{
free(err);
}
void
error_set(error_t *err, int source, int value)
{
err->error_source = source;
err->error_number = value;
}
void
error_new(error_t **err, int source, int value)
{
if (err != NULL) {
if (*err == NULL) {
*err = error_alloc();
error_set(*err, source, value);
} else {
/* tried to create a new error atop an existing error... */
}
} else {
/* error is being ignored by caller */
}
}

11
ext/libpsem/psem_error.h Normal file
View File

@ -0,0 +1,11 @@
#ifndef __PSEM_ERROR_H__
#define __PSEM_ERROR_H__
typedef struct error error_t;
error_t * error_alloc();
void error_free(error_t *);
void error_set(error_t *, int, int);
#endif /* __PSEM_ERROR_H__ */

130
ext/libpsem/psem_posix.c Normal file
View File

@ -0,0 +1,130 @@
/*
* A type which wraps a semaphore
*
* semaphore.c
*
* Copyright (c) 2006-2008, R Oudkerk
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* 3. Neither the name of author nor the names of any contributors
* may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR
* OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/*
* Modifications Copyright (c) 2011, Patrick Mahoney
*/
#include <errno.h>
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <semaphore.h>
#include <stdlib.h> /* malloc, free */
#include <math.h> /* floorf */
#include <time.h> /* timespec */
#include "psem.h"
#include "psem_posix.h"
psem_t *
psem_alloc(void) {
return (psem_t *) malloc(sizeof(psem_t));
}
void
psem_free(psem_t *psem) {
free(psem);
}
#define errcheck_val(expr, errval, err) \
do { \
if ((expr) == (errval)) { \
error_new((err), E_SOURCE_SYSTEM, errno); \
return ERROR; \
} \
return OK; \
} while (0)
#define errcheck(expr, err) errcheck_val((expr), -1, (err))
int
psem_open(psem_t *psem, const char *name, unsigned int value, error_t **err)
{
errcheck_val(psem->sem = sem_open(name, O_CREAT | O_EXCL, 0600, value),
SEM_FAILED,
err);
}
int
psem_close(psem_t *psem, error_t **err)
{
errcheck(sem_close(psem->sem), err);
}
int
psem_unlink(const char *name, error_t **err)
{
errcheck(sem_unlink(name), err);
}
int
psem_post(psem_t *psem, error_t **err)
{
errcheck(sem_post(psem->sem), err);
}
int
psem_wait(psem_t *psem, error_t **err)
{
errcheck(sem_wait(psem->sem), err);
}
int
psem_trywait(psem_t *psem, error_t **err)
{
errcheck(sem_trywait(psem->sem), err);
}
int
psem_timedwait(psem_t *psem, float timeout_s, error_t **err)
{
struct timespec abs_timeout;
abs_timeout.tv_sec = floorf(timeout_s);
abs_timeout.tv_nsec =
floorf((timeout_s - abs_timeout.tv_sec) * (1000 * 1000 * 1000));
errcheck(sem_timedwait(psem->sem, &abs_timeout), err);
}
int
psem_getvalue(psem_t *psem, int *sval, error_t **err)
{
errcheck(sem_getvalue(psem->sem, sval), err);
}

10
ext/libpsem/psem_posix.h Normal file
View File

@ -0,0 +1,10 @@
#ifndef __PSEM_POSIX_H__
#define __PSEM_POSIX_H__
#include <semaphore.h>
struct psem {
sem_t *sem;
};
#endif /* __PSEM_POSIX_H__ */

6
lib/process_shared.rb Normal file
View File

@ -0,0 +1,6 @@
require 'ffi'
require 'process_shared/semaphore'
require 'process_shared/bounded_semaphore'
require 'process_shared/mutex'
require 'process_shared/shared_memory'

View File

@ -0,0 +1,50 @@
require 'process_shared/psem'
require 'process_shared/with_self'
module ProcessShared
class AbstractSemaphore
include WithSelf
protected
include ProcessShared::PSem
public
# Generate a name for a semaphore.
def self.gen_name(middle, name = nil)
if name
name
else
@count ||= 0
@count += 1
"ps-#{middle}-#{Process.pid}-#{@count}"
end
end
def self.make_finalizer(name)
proc { ProcessShared::PSem.psem_unlink(name, nil) }
end
# private_class_method :new
protected
attr_reader :sem, :err
def init(size, middle, name, &block)
@sem = FFI::MemoryPointer.new(size)
@err = FFI::MemoryPointer.new(:pointer)
psem_name = AbstractSemaphore.gen_name(middle, name)
block.call(psem_name)
if name
# name explicitly given. Don't unlink because we might want to share it with another process.
# Instead, register a finalizer to unlink.
ObjectSpace.define_finalizer(self, self.class.make_finalizer(name))
else
# On Linux, removes the entry in /dev/shm and prevents other
# processes from opening this semaphore unless they inherit it
# as forked children.
psem_unlink(psem_name, err) unless name
end
end
end
end

View File

@ -0,0 +1,43 @@
require 'process_shared/psem'
require 'process_shared/semaphore'
module ProcessShared
class BoundedSemaphore < Semaphore
# With no associated block, open is a synonym for
# Semaphore.new. If the optional code block is given, it will be
# passed `sem` as an argument, and the Semaphore object will
# automatically be closed when the block terminates. In this
# instance, Semaphore.open returns the value of the block.
#
# @param [Integer] value the initial semaphore value
# @param [String] name not currently supported
def self.open(maxvalue, value = 1, name = nil, &block)
new(maxvalue, value, name).with_self(&block)
end
# Create a new semaphore with initial value `value`. After
# Kernel#fork, the semaphore will be shared across two (or more)
# processes. The semaphore must be closed with #close in each
# process that no longer needs the semaphore.
#
# (An object finalizer is registered that will close the semaphore
# to avoid memory leaks, but this should be considered a last
# resort).
#
# @param [Integer] value the initial semaphore value
# @param [String] name not currently supported
def initialize(maxvalue, value = 1, name = nil)
init(PSem.sizeof_bsem_t, 'bsem', name) do |sem_name|
bsem_open(sem, sem_name, maxvalue, value, err)
end
end
protected
alias_method :psem_unlink, :bsem_unlink
alias_method :psem_close, :bsem_close
alias_method :psem_wait, :bsem_wait
alias_method :psem_post, :bsem_post
alias_method :psem_getvalue, :bsem_getvalue
end
end

View File

@ -0,0 +1,27 @@
require 'process_shared/semaphore'
module ProcessShared
# TODO: implement this
class ConditionVariable
def initialize
@sem = Semaphore.new
end
def broadcast
@sem.post
end
def signal
@sem.post
end
def wait(mutex, timeout = nil)
mutex.unlock
begin
@sem.wait
ensure
mutex.lock
end
end
end
end

View File

@ -0,0 +1,36 @@
require 'ffi'
require 'process_shared/posix_call'
require 'process_shared/psem'
module ProcessShared
module LibC
extend FFI::Library
extend PosixCall
ffi_lib FFI::Library::LIBC
MAP_FAILED = FFI::Pointer.new(-1)
MAP_SHARED = PSem.map_shared
MAP_PRIVATE = PSem.map_private
PROT_READ = PSem.prot_read
PROT_WRITE = PSem.prot_write
PROT_EXEC = PSem.prot_exec
PROT_NONE = PSem.prot_none
O_RDWR = PSem.o_rdwr
O_CREAT = PSem.o_creat
O_EXCL = PSem.o_excl
attach_variable :errno, :int
attach_function :mmap, [:pointer, :size_t, :int, :int, :int, :off_t], :pointer
attach_function :munmap, [:pointer, :size_t], :int
attach_function :ftruncate, [:int, :off_t], :int
attach_function :close, [:int], :int
error_check(:mmap) { |v| v == MAP_FAILED }
error_check(:munmap, :ftruncate, :close)
end
end

103
lib/process_shared/mutex.rb Normal file
View File

@ -0,0 +1,103 @@
require 'process_shared/bounded_semaphore'
require 'process_shared/with_self'
require 'process_shared/shared_memory'
require 'process_shared/process_error'
module ProcessShared
class Mutex
include WithSelf
def self.open(&block)
new.with_self(&block)
end
def initialize
@internal_sem = BoundedSemaphore.new(1)
@locked_by = SharedMemory.new(:int)
@sem = BoundedSemaphore.new(1)
end
# @return [Mutex]
def lock
@sem.wait
self.locked_by = ::Process.pid
self
end
# @return [Boolean]
def locked?
locked_by > 0
end
# Releases the lock and sleeps timeout seconds if it is given and
# non-nil or forever.
#
# @return [Numeric]
def sleep(timeout = nil)
unlock
begin
timeout ? sleep(timeout) : sleep
ensure
lock
end
end
# @return [Boolean]
def try_lock
with_internal_lock do
if @locked_by.get_int(0) > 0
false # was locked
else
@sem.wait
self.locked_by = ::Process.pid
true
end
end
end
# @return [Mutex]
def unlock
if (p = locked_by) != ::Process.pid
raise ProcessError, "lock is held by #{p} not #{::Process.pid}"
end
@sem.post
self
end
# Acquire the lock, yield the block, then ensure the lock is
# unlocked.
def synchronize
lock
begin
yield
ensure
unlock
end
end
private
def locked_by
with_internal_lock do
@locked_by.get_int(0)
end
end
def locked_by=(val)
with_internal_lock do
@locked_by.put_int(0, val)
end
end
def with_internal_lock
@internal_sem.wait
begin
yield
ensure
@internal_sem.post
end
end
end
end

View File

@ -0,0 +1,29 @@
# require 'process_shared/libc' - circular dependency here...
module ProcessShared
module PosixCall
# Replace methods in `syms` with error checking wrappers that
# invoke the original method and raise a SystemCallError with the
# current errno if the return value is an error.
#
# Errors are detected if the block returns true when called with
# the original method's return value.
def error_check(*syms, &is_err)
unless block_given?
is_err = lambda { |v| (v == -1) }
end
syms.each do |sym|
method = self.method(sym)
define_singleton_method(sym) do |*args|
ret = method.call(*args)
if is_err.call(ret)
raise SystemCallError.new("error in #{sym}", LibC.errno)
else
ret
end
end
end
end
end
end

View File

@ -0,0 +1,3 @@
module ProcessShared
class ProcessError < Exception; end
end

109
lib/process_shared/psem.rb Normal file
View File

@ -0,0 +1,109 @@
require 'ffi'
module ProcessShared
module PSem
class Error < FFI::Struct
layout(:source, :int,
:errno, :int)
end
extend FFI::Library
# Workaround FFI dylib/bundle issue. See https://github.com/ffi/ffi/issues/42
suffix = if FFI::Platform.mac?
'bundle'
else
FFI::Platform::LIBSUFFIX
end
ffi_lib File.join(File.expand_path(File.dirname(__FILE__)),
'libpsem.' + suffix)
class << self
# Replace methods in `syms` with error checking wrappers that
# invoke the original psem method and raise an appropriate
# error.
def psem_error_check(*syms)
syms.each do |sym|
method = self.method(sym)
block = lambda do |*args|
if method.call(*args) < 0
errp = args[-1]
unless errp.nil?
begin
err = Error.new(errp.get_pointer(0))
if err[:source] == PSem.e_source_system
raise SystemCallError.new("error in #{sym}", err[:errno])
else
raise "error in #{sym}: #{err.get_integer(1)}"
end
ensure
psem_error_free(err)
end
end
end
end
define_method(sym, &block)
define_singleton_method(sym, &block)
end
end
end
# Generic constants
int_consts = [:o_rdwr,
:o_creat,
:o_excl,
:prot_read,
:prot_write,
:prot_exec,
:prot_none,
:map_shared,
:map_private]
int_consts.each { |sym| attach_variable sym, :int }
# Other constants, functions
attach_function :psem_error_free, :error_free, [:pointer], :void
attach_variable :e_source_system, :E_SOURCE_SYSTEM, :int
attach_variable :e_source_psem, :E_SOURCE_PSEM, :int
attach_variable :e_name_too_long, :E_NAME_TOO_LONG, :int
attach_variable :sizeof_psem_t, :size_t
attach_variable :sizeof_bsem_t, :size_t
# PSem functions
attach_function :psem_open, [:pointer, :string, :uint, :pointer], :int
attach_function :psem_close, [:pointer, :pointer], :int
attach_function :psem_unlink, [:string, :pointer], :int
attach_function :psem_post, [:pointer, :pointer], :int
attach_function :psem_wait, [:pointer, :pointer], :int
attach_function :psem_trywait, [:pointer, :pointer], :int
attach_function :psem_timedwait, [:pointer, :pointer, :pointer], :int
attach_function :psem_getvalue, [:pointer, :pointer, :pointer], :int
psem_error_check(:psem_open, :psem_close, :psem_unlink, :psem_post,
:psem_wait, :psem_trywait, :psem_timedwait, :psem_getvalue)
# BSem functions
attach_function :bsem_open, [:pointer, :string, :uint, :uint, :pointer], :int
attach_function :bsem_close, [:pointer, :pointer], :int
attach_function :bsem_unlink, [:string, :pointer], :int
attach_function :bsem_post, [:pointer, :pointer], :int
attach_function :bsem_wait, [:pointer, :pointer], :int
attach_function :bsem_trywait, [:pointer, :pointer], :int
attach_function :bsem_timedwait, [:pointer, :pointer, :pointer], :int
attach_function :bsem_getvalue, [:pointer, :pointer, :pointer], :int
psem_error_check(:bsem_open, :bsem_close, :bsem_unlink, :bsem_post,
:bsem_wait, :bsem_trywait, :bsem_timedwait, :bsem_getvalue)
end
end

21
lib/process_shared/rt.rb Normal file
View File

@ -0,0 +1,21 @@
require 'process_shared/posix_call'
require 'process_shared/psem'
module ProcessShared
module RT
extend FFI::Library
extend PosixCall
# FIXME: mac and linux OK, but what about everything else?
if FFI::Platform.mac?
ffi_lib 'c'
else
ffi_lib 'rt'
end
attach_function :shm_open, [:string, :int, :mode_t], :int
attach_function :shm_unlink, [:string], :int
error_check :shm_open, :shm_unlink
end
end

View File

@ -0,0 +1,60 @@
require 'process_shared/psem'
require 'process_shared/abstract_semaphore'
module ProcessShared
class Semaphore < AbstractSemaphore
# With no associated block, open is a synonym for
# Semaphore.new. If the optional code block is given, it will be
# passed `sem` as an argument, and the Semaphore object will
# automatically be closed when the block terminates. In this
# instance, Semaphore.open returns the value of the block.
#
# @param [Integer] value the initial semaphore value
# @param [String] name not currently supported
def self.open(value = 1, name = nil, &block)
new(value, name).with_self(&block)
end
# Create a new semaphore with initial value `value`. After
# Kernel#fork, the semaphore will be shared across two (or more)
# processes. The semaphore must be closed with #close in each
# process that no longer needs the semaphore.
#
# (An object finalizer is registered that will close the semaphore
# to avoid memory leaks, but this should be considered a last
# resort).
#
# @param [Integer] value the initial semaphore value
# @param [String] name not currently supported
def initialize(value = 1, name = nil)
init(PSem.sizeof_psem_t, 'psem', name) do |sem_name|
psem_open(sem, sem_name, value, err)
end
end
# Decrement the value of the semaphore. If the value is zero,
# wait until another process increments via #post.
def wait
psem_wait(sem, err)
end
# Increment the value of the semaphore. If other processes are
# waiting on this semaphore, one will be woken.
def post
psem_post(sem, err)
end
# Get the current value of the semaphore.
#
# @return [Integer] the current value of the semaphore.
def value
int = FFI::MemoryPointer.new(:int)
psem_getvalue(sem, int, err)
int.get_int(0)
end
def close
psem_close(sem, err)
end
end
end

View File

@ -0,0 +1,45 @@
require 'process_shared/rt'
require 'process_shared/libc'
require 'process_shared/with_self'
module ProcessShared
# Memory block shared across processes. TODO: finalizer that closes...
class SharedMemory < FFI::Pointer
include WithSelf
attr_reader :size, :fd
def self.open(size, &block)
new(size).with_self(&block)
end
def initialize(size)
@size = case size
when Symbol
FFI.type_size(size)
else
size
end
name = "/ps-shm#{rand(10000)}"
@fd = RT.shm_open(name,
LibC::O_CREAT | LibC::O_RDWR | LibC::O_EXCL,
0777)
RT.shm_unlink(name)
LibC.ftruncate(@fd, @size)
@pointer = LibC.mmap(nil,
@size,
LibC::PROT_READ | LibC::PROT_WRITE,
LibC::MAP_SHARED,
@fd,
0)
super(@pointer)
end
def close
LibC.munmap(@pointer, @size)
LibC.close(@fd)
end
end
end

View File

@ -0,0 +1,20 @@
module ProcessShared
module WithSelf
# With no associated block, return self. If the optional code
# block is given, it will be passed `self` as an argument, and the
# self object will automatically be closed (by invoking `close` on
# `self`) when the block terminates. In this instance, value of
# the block is returned.
def with_self
if block_given?
begin
yield self
ensure
self.close
end
else
self
end
end
end
end

20
process_shared.gemspec Normal file
View File

@ -0,0 +1,20 @@
Gem::Specification.new do |s|
s.name = 'process_shared'
s.version = '0.0.1'
s.platform = Gem::Platform::RUBY
s.has_rdoc = true
s.extra_rdoc_files = ["README.rdoc", "ChangeLog", "COPYING"]
s.summary = 'process-shared synchronization primitives'
s.description = 'FFI wrapper around portable semaphore library with mutex and condition vars built on top.'
s.author = 'Patrick Mahoney'
s.email = 'pat@polycrystal.org'
s.homepage = ''
s.files = Dir['lib/**/*.rb', 'lib/**/libpsem*', 'ext/**/*.{c,h,rb}', 'spec/**/*.rb']
s.extensions = FileList["ext/**/extconf.rb"]
s.add_dependency('ffi', '~> 1.0')
s.add_development_dependency('rake-compiler')
s.add_development_dependency('minitest')
s.add_development_dependency('minitest-matchers')
end

View File

@ -0,0 +1,48 @@
require 'spec_helper'
require 'process_shared/bounded_semaphore'
module ProcessShared
describe BoundedSemaphore do
it 'never rises above its max value' do
max = 10
BoundedSemaphore.open(max) do |sem|
pids = []
10.times do |i|
pids << fork do
100.times do
if rand(3) == 0
sem.wait
else
sem.post
end
end
exit i
end
end
100.times do
sem.value.must be_lte(max)
end
pids.each { |pid| Process.wait(pid) }
end
end
describe '#post and #wait' do
it 'increments and decrements the value' do
Semaphore.open(0) do |sem|
10.times do |i|
sem.post
sem.value.must_equal(i + 1)
end
10.times do |i|
sem.wait
sem.value.must_equal(10 - i - 1)
end
end
end
end
end
end

View File

@ -0,0 +1,9 @@
require 'process_shared/libc'
module ProcessShared
describe LibC do
it 'throws exceptions with invalid args' do
proc { LibC.mmap nil,2,0,0,1,0 }.must_raise(Errno::EINVAL)
end
end
end

View File

@ -0,0 +1,74 @@
require 'spec_helper'
require 'process_shared/mutex'
require 'process_shared/shared_memory'
module ProcessShared
describe Mutex do
it 'protects access to a shared variable' do
mutex = Mutex.new
mem = SharedMemory.new(:char)
mem.put_char(0, 0)
pids = []
10.times do |i|
inc = (-1) ** i # half the procs increment; half decrement
pids << fork do
10.times do
mutex.lock
begin
mem.put_char(0, mem.get_char(0) + inc)
sleep 0.001
ensure
mutex.unlock
end
end
Kernel.exit!
end
end
pids.each { |pid| ::Process.wait(pid) }
mem.get_char(0).must_equal(0)
end
it 'protects access to a shared variable with synchronize' do
mutex = Mutex.new
mem = SharedMemory.new(:char)
mem.put_char(0, 0)
pids = []
10.times do |i|
inc = (-1) ** i # half the procs increment; half decrement
pids << fork do
10.times do
mutex.synchronize do
mem.put_char(0, mem.get_char(0) + inc)
sleep 0.001
end
end
Kernel.exit!
end
end
pids.each { |pid| ::Process.wait(pid) }
mem.get_char(0).must_equal(0)
end
it 'raises exception when unlocked by other process' do
mutex = Mutex.new
pid = Kernel.fork do
mutex.lock
sleep 0.2
mutex.unlock
Kernel.exit!
end
sleep 0.1
proc { mutex.unlock }.must_raise(ProcessError)
::Process.wait(pid)
end
end
end

View File

@ -0,0 +1,136 @@
require 'spec_helper'
require 'process_shared/psem'
module ProcessShared
describe PSem do
before do
extend PSem
end
before(:each) do
@err = FFI::MemoryPointer.new(:pointer)
end
describe '.psem_open' do
it 'opens a psem' do
psem = FFI::MemoryPointer.new(PSem.sizeof_psem_t)
psem_open(psem, "psem-test", 1, @err)
psem_unlink("psem-test", @err)
end
it 'raises excpetion if name alredy exists' do
psem1 = FFI::MemoryPointer.new(PSem.sizeof_psem_t)
psem2 = FFI::MemoryPointer.new(PSem.sizeof_psem_t)
psem_open(psem1, "psem-test", 1, @err)
proc { psem_open(psem2, "psem-test", 1, @err) }.must_raise(Errno::EEXIST)
psem_unlink("psem-test", @err)
psem_open(psem2, "psem-test", 1, @err)
psem_unlink("psem-test", @err)
end
end
describe '.psem_wait' do
before(:each) do
@psem = FFI::MemoryPointer.new(PSem.sizeof_psem_t)
psem_open(@psem, 'psem-test', 1, @err)
psem_unlink('psem-test', @err)
@int = FFI::MemoryPointer.new(:int)
end
after(:each) do
#psem_close(@psem, @err)
end
def value
psem_getvalue(@psem, @int, @err)
@int.get_int(0)
end
it 'decrements psem value' do
value.must_equal 1
psem_wait(@psem, @err)
value.must_equal(0)
end
it 'waits until another process posts' do
psem_wait(@psem, @err)
# child exits with ~ time spent waiting
child = fork do
start = Time.now
psem_wait(@psem, @err)
exit (Time.now - start).ceil
end
t = 1.5
sleep t
psem_post(@psem, @err)
_pid, status = Process.wait2(child)
status.exitstatus.must_equal 2
end
end
describe '.bsem_open' do
it 'opens a bsem' do
bsem = FFI::MemoryPointer.new(PSem.sizeof_bsem_t)
bsem_open(bsem, "bsem-test", 1, 1, @err)
bsem_unlink("bsem-test", @err)
end
it 'raises excpetion if name alredy exists' do
bsem1 = FFI::MemoryPointer.new(PSem.sizeof_bsem_t)
bsem2 = FFI::MemoryPointer.new(PSem.sizeof_bsem_t)
bsem_open(bsem1, "bsem-test", 1, 1, @err)
proc { bsem_open(bsem2, "bsem-test", 1, 1, @err) }.must_raise(Errno::EEXIST)
bsem_unlink("bsem-test", @err)
bsem_open(bsem2, "bsem-test", 1, 1, @err)
bsem_unlink("bsem-test", @err)
end
end
describe '.bsem_wait' do
before(:each) do
@bsem = FFI::MemoryPointer.new(PSem.sizeof_bsem_t)
bsem_open(@bsem, 'bsem-test', 1, 1, @err)
bsem_unlink('bsem-test', @err)
@int = FFI::MemoryPointer.new(:int)
end
after do
#bsem_close(@bsem, @err)
end
def value
bsem_getvalue(@bsem, @int, @err)
@int.get_int(0)
end
it 'decrements bsem value' do
value.must_equal 1
bsem_wait(@bsem, @err)
value.must_equal 0
end
it 'waits until another process posts' do
bsem_wait(@bsem, @err)
# child exits with ~ time spent waiting
child = fork do
start = Time.now
bsem_wait(@bsem, @err)
exit (Time.now - start).ceil
end
t = 1.5
sleep t
bsem_post(@bsem, @err)
_pid, status = Process.wait2(child)
status.exitstatus.must_equal 2
end
end
end
end

View File

@ -0,0 +1,76 @@
require 'spec_helper'
require 'ffi'
require 'process_shared/semaphore'
require 'process_shared/shared_memory'
module ProcessShared
describe Semaphore do
it 'coordinates access to shared object' do
nprocs = 4 # number of processes
nincrs = 1000 # each process increments nincrs times
do_increments = lambda do |mem, sem|
nincrs.times do
sem.wait
begin
val = mem.get_int(0)
# ensure other procs have a chance to interfere
sleep 0.001 if rand(100) == 0
mem.put_int(0, val + 1)
rescue => e
"#{Process.pid} die'ing because #{e}"
ensure
sem.post
end
end
end
# Make sure it fails with no synchronization
no_sem = Object.new
class << no_sem
def wait; end
def post; end
end
SharedMemory.open(FFI.type_size(:int)) do |mem|
pids = []
nprocs.times do
pids << fork { do_increments.call(mem, no_sem); exit }
end
pids.each { |p| Process.wait(p) }
# puts "mem is #{mem.get_int(0)}"
mem.get_int(0).must be_lt(nprocs * nincrs)
end
# Now try with synchronization
SharedMemory.open(FFI.type_size(:int)) do |mem|
pids = []
Semaphore.open do |sem|
nprocs.times do
pids << fork { do_increments.call(mem, sem); exit }
end
end
pids.each { |p| Process.wait(p) }
mem.get_int(0).must_equal(nprocs * nincrs)
end
end
describe '#post and #wait' do
it 'increments and decrements the value' do
Semaphore.open(0) do |sem|
10.times do |i|
sem.post
sem.value.must_equal(i + 1)
end
10.times do |i|
sem.wait
sem.value.must_equal(10 - i - 1)
end
end
end
end
end
end

View File

@ -0,0 +1,36 @@
require 'spec_helper'
require 'process_shared/shared_memory'
module ProcessShared
describe SharedMemory do
it 'shares memory across processes' do
mem = SharedMemory.new(1)
mem.put_char(0, 0)
mem.get_char(0).must_equal(0)
pid = fork do
mem.put_char(0, 123)
Kernel.exit!
end
::Process.wait(pid)
mem.get_char(0).must_equal(123)
end
it 'initializes with type symbol' do
mem = SharedMemory.new(:int)
mem.put_int(0, 0)
mem.get_int(0).must_equal(0)
pid = fork do
mem.put_int(0, 1234567)
Kernel.exit!
end
::Process.wait(pid)
mem.get_int(0).must_equal(1234567)
end
end
end

35
spec/spec_helper.rb Normal file
View File

@ -0,0 +1,35 @@
gem 'minitest'
require 'minitest/spec'
require 'minitest/autorun'
require 'minitest/matchers'
class RangeMatcher
def initialize(operator, limit)
@operator = operator.to_sym
@limit = limit
end
def description
"be #{operator} #{@limit}"
end
def matches?(subject)
subject.send(@operator, @limit)
end
def failure_message_for_should
"expected #{operator} #{@limit}"
end
def failure_message_for_should_not
"expected not #{operator} #{@limit}"
end
end
def be_lt(value)
RangeMatcher.new('<', value)
end
def be_lte(value)
RangeMatcher.new('<=', value)
end