From cf45ab989133b7ca83d728d9c3f1458f8cd35a7e Mon Sep 17 00:00:00 2001 From: Patrick Mahoney Date: Sun, 11 Dec 2011 21:39:55 -0600 Subject: [PATCH] Initial commit. --- .gitignore | 4 + COPYING | 19 ++ ChangeLog | 0 Gemfile | 2 + README.rdoc | 69 +++++++ Rakefile | 24 +++ ext/libpsem/bsem.c | 188 ++++++++++++++++++ ext/libpsem/bsem.h | 32 +++ ext/libpsem/constants.c | 22 ++ ext/libpsem/constants.h | 18 ++ ext/libpsem/extconf.rb | 36 ++++ ext/libpsem/mempcpy.c | 7 + ext/libpsem/mempcpy.h | 13 ++ ext/libpsem/mutex.c | 15 ++ ext/libpsem/mutex.h | 14 ++ ext/libpsem/psem.c | 15 ++ ext/libpsem/psem.h | 43 ++++ ext/libpsem/psem_error.c | 46 +++++ ext/libpsem/psem_error.h | 11 + ext/libpsem/psem_posix.c | 130 ++++++++++++ ext/libpsem/psem_posix.h | 10 + lib/process_shared.rb | 6 + lib/process_shared/abstract_semaphore.rb | 50 +++++ lib/process_shared/bounded_semaphore.rb | 43 ++++ lib/process_shared/condition_variable.rb | 27 +++ lib/process_shared/libc.rb | 36 ++++ lib/process_shared/mutex.rb | 103 ++++++++++ lib/process_shared/posix_call.rb | 29 +++ lib/process_shared/process_error.rb | 3 + lib/process_shared/psem.rb | 109 ++++++++++ lib/process_shared/rt.rb | 21 ++ lib/process_shared/semaphore.rb | 60 ++++++ lib/process_shared/shared_memory.rb | 45 +++++ lib/process_shared/with_self.rb | 20 ++ process_shared.gemspec | 20 ++ spec/process_shared/bounded_semaphore_spec.rb | 48 +++++ spec/process_shared/libc_spec.rb | 9 + spec/process_shared/mutex_spec.rb | 74 +++++++ spec/process_shared/psem_spec.rb | 136 +++++++++++++ spec/process_shared/semaphore_spec.rb | 76 +++++++ spec/process_shared/shared_memory_spec.rb | 36 ++++ spec/spec_helper.rb | 35 ++++ 42 files changed, 1704 insertions(+) create mode 100644 .gitignore create mode 100644 COPYING create mode 100644 ChangeLog create mode 100644 Gemfile create mode 100644 README.rdoc create mode 100644 Rakefile create mode 100644 ext/libpsem/bsem.c create mode 100644 ext/libpsem/bsem.h create mode 100644 ext/libpsem/constants.c create mode 100644 ext/libpsem/constants.h create mode 100644 ext/libpsem/extconf.rb create mode 100644 ext/libpsem/mempcpy.c create mode 100644 ext/libpsem/mempcpy.h create mode 100644 ext/libpsem/mutex.c create mode 100644 ext/libpsem/mutex.h create mode 100644 ext/libpsem/psem.c create mode 100644 ext/libpsem/psem.h create mode 100644 ext/libpsem/psem_error.c create mode 100644 ext/libpsem/psem_error.h create mode 100644 ext/libpsem/psem_posix.c create mode 100644 ext/libpsem/psem_posix.h create mode 100644 lib/process_shared.rb create mode 100644 lib/process_shared/abstract_semaphore.rb create mode 100644 lib/process_shared/bounded_semaphore.rb create mode 100644 lib/process_shared/condition_variable.rb create mode 100644 lib/process_shared/libc.rb create mode 100644 lib/process_shared/mutex.rb create mode 100644 lib/process_shared/posix_call.rb create mode 100644 lib/process_shared/process_error.rb create mode 100644 lib/process_shared/psem.rb create mode 100644 lib/process_shared/rt.rb create mode 100644 lib/process_shared/semaphore.rb create mode 100644 lib/process_shared/shared_memory.rb create mode 100644 lib/process_shared/with_self.rb create mode 100644 process_shared.gemspec create mode 100644 spec/process_shared/bounded_semaphore_spec.rb create mode 100644 spec/process_shared/libc_spec.rb create mode 100644 spec/process_shared/mutex_spec.rb create mode 100644 spec/process_shared/psem_spec.rb create mode 100644 spec/process_shared/semaphore_spec.rb create mode 100644 spec/process_shared/shared_memory_spec.rb create mode 100644 spec/spec_helper.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d50d4f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*~ +pkg +tmp +Gemfile.lock \ No newline at end of file diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..6521312 --- /dev/null +++ b/COPYING @@ -0,0 +1,19 @@ +Copyright (c) 2011 Patrick Mahoney + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..e69de29 diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..817f62a --- /dev/null +++ b/Gemfile @@ -0,0 +1,2 @@ +source 'http://rubygems.org' +gemspec diff --git a/README.rdoc b/README.rdoc new file mode 100644 index 0000000..925ca6c --- /dev/null +++ b/README.rdoc @@ -0,0 +1,69 @@ +== Description + +Concurrency primitives that may be used in a cross-process way to +coordinate share memory between processes. + +A small C library (libpsem) is compiled to support portable access to +semaphores. This library is then accessed using FFI to implement Ruby +classes ProcessShared::Semaphore, ProcessShared::BoundedSemaphore, +ProcessShared::Mutex, and ProcessShared::SharedMemory. + +This is an incomplete work in progress. + +== License + +MIT + +== Install +Install the gem with: + + gem install process_shared + +== Usage + + require 'process_shared' + + mutex = ProcessShared::Mutex.new + mem = ProcessShared::SharedMemory.new(:int) # extends FFI::Pointer + mem.put_int(0, 0) + + pid1 = fork do + puts "in process 1 (#{Process.pid})" + 10.times do + sleep 0.01 + mutex.synchronize do + value = mem.get_int(0) + sleep 0.01 + puts "process 1 (#{Process.pid}) incrementing" + mem.put_int(0, value + 1) + end + end + end + + pid2 = fork do + puts "in process 2 (#{Process.pid})" + 10.times do + sleep 0.01 + mutex.synchronize do + value = mem.get_int(0) + sleep 0.01 + puts "process 2 (#{Process.pid}) decrementing" + mem.put_int(0, value - 1) + end + end + end + + Process.wait(pid1) + Process.wait(pid2) + + puts "value should be zero: #{mem.get_int(0)}" + +== Todo + +* Implement ConditionVariable +* Implement optional override of core Thread/Mutex classes +* Extend libpsem to win32? (See Python's processing library) +* Break out tests that use PSem.getvalue() (which isn't supported on Mac OS X) + so that the test suite will pass +* Add finalizer to Mutex? (finalizer on Semaphore objects may be enough) or a method to + explicitly close and release resources? \ No newline at end of file diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..911ce76 --- /dev/null +++ b/Rakefile @@ -0,0 +1,24 @@ +require 'rake/extensiontask' +require 'rake/testtask' +require 'rubygems/package_task' + +def gemspec + @gemspec ||= eval(File.read('process_shared.gemspec'), binding, 'process_shared.gemspec') +end + +Rake::ExtensionTask.new('libpsem') do |ext| + ext.lib_dir = 'lib/process_shared' +end + +desc 'Run the tests' +task :default => [:test] + +Rake::TestTask.new do |t| + t.pattern = 'spec/**/*_spec.rb' + t.libs.push 'spec' +end + +Gem::PackageTask.new(gemspec) do |p| + p.need_tar = true + p.gem_spec = gemspec +end diff --git a/ext/libpsem/bsem.c b/ext/libpsem/bsem.c new file mode 100644 index 0000000..63e69bb --- /dev/null +++ b/ext/libpsem/bsem.c @@ -0,0 +1,188 @@ +/* + * Extensions atop psem. Recursive mutex, bounded semaphore. + */ + +#include /* malloc, free */ + +#include "mempcpy.h" /* includes string.h */ +#include "psem.h" +#include "psem_error.h" +#include "bsem.h" + +#define MAX_NAME 128 /* This is much less the POSIX max + name. Users of this library must + not use longer names. */ + +static const char bsem_lock_suffix[] = "-bsem-lock"; + +#define MAX_LOCK_NAME (MAX_NAME + strlen(bsem_lock_suffix) + 1) + +/** + * Assumes dest has sufficient space to hold "[MAX_NAME]-bsem-lock". + */ +static int +make_lockname(char *dest, const char *name, error_t **err) +{ + int namelen; + + namelen = strlen(name); + if (namelen > MAX_NAME) { + error_new(err, E_SOURCE_PSEM, E_NAME_TOO_LONG); + return ERROR; + } + + *((char *) mempcpy(mempcpy(dest, name, namelen), + bsem_lock_suffix, + strlen(bsem_lock_suffix))) = '\0'; +} + +size_t sizeof_bsem_t = sizeof (bsem_t); + +bsem_t * +bsem_alloc(void) { + return malloc(sizeof(bsem_t)); +} + +void +bsem_free(bsem_t *bsem) { + free(bsem); +} + +#define call_or_return(exp) \ + do { if ((exp) == ERROR) { return ERROR; } } while (0) + +#define bsem_lock_or_return(bsem, err) call_or_return(bsem_lock((bsem), (err))) + +#define bsem_unlock_or_return(bsem, err) call_or_return(bsem_unlock((bsem), (err))) + +int +bsem_open(bsem_t *bsem, const char *name, unsigned int maxvalue, unsigned int value, error_t **err) +{ + char lockname[MAX_LOCK_NAME]; + + call_or_return(psem_open(&bsem->psem, name, value, err)); + call_or_return(make_lockname(lockname, name, err)); + call_or_return(psem_open(&bsem->lock, lockname, 1, err)); + + bsem->maxvalue = maxvalue; + + return OK; +} + +static int +bsem_lock(bsem_t *bsem, error_t **err) +{ + call_or_return(psem_wait(&bsem->lock, err)); + return OK; +} + +static int +bsem_unlock(bsem_t *bsem, error_t **err) +{ + call_or_return(psem_post(&bsem->lock, err)); + return OK; +} + +int +bsem_close(bsem_t *bsem, error_t **err) +{ + bsem_lock_or_return(bsem, err); + + if (psem_close(&bsem->psem, err) == ERROR) { + bsem_unlock(bsem, NULL); + return ERROR; + } + + bsem_unlock_or_return(bsem, err); + + call_or_return(psem_close(&bsem->lock, err)); + return OK; +} + +int +bsem_unlink(const char *name, error_t **err) +{ + char lockname[MAX_LOCK_NAME]; + + call_or_return(psem_unlink(name, err)); + call_or_return(make_lockname(lockname, name, err)); + call_or_return(psem_unlink(lockname, err)); + return OK; +} + +int +bsem_post(bsem_t *bsem, error_t **err) +{ + int sval; + + bsem_lock_or_return(bsem, err); + + /* FIXME: maxvalue is broken on some systems... (cygwin? mac?) */ + if (psem_getvalue(&bsem->psem, &sval, err) == ERROR) { + bsem_unlock(bsem, err); + return ERROR; + } + + if (sval >= bsem->maxvalue) { + /* ignored silently */ + bsem_unlock(bsem, err); + return OK; + } + + if (psem_post(&bsem->psem, err) == ERROR) { + bsem_unlock(bsem, err); + return ERROR; + } + + bsem_unlock_or_return(bsem, err); + return OK; +} + +int +bsem_wait(bsem_t *bsem, error_t **err) +{ + call_or_return(psem_wait(&bsem->psem, err)); + return OK; +} + +int +bsem_trywait(bsem_t *bsem, error_t **err) +{ + bsem_lock_or_return(bsem, err); + + if (psem_trywait(&bsem->psem, err) == ERROR) { + bsem_unlock(bsem, NULL); + return ERROR; + } + + bsem_unlock_or_return(bsem, err); + return OK; +} + +int +bsem_timedwait(bsem_t *bsem, float timeout_s, error_t **err) +{ + bsem_lock_or_return(bsem, err); + + if (psem_timedwait(&bsem->psem, timeout_s, err) == ERROR) { + bsem_unlock(bsem, NULL); + return ERROR; + } + + bsem_unlock_or_return(bsem, err); + return OK; +} + +int +bsem_getvalue(bsem_t *bsem, int *sval, error_t **err) +{ + bsem_lock_or_return(bsem, err); + + if (psem_getvalue(&bsem->psem, sval, err) == ERROR) { + bsem_unlock(bsem, NULL); + return ERROR; + } + + bsem_unlock_or_return(bsem, err); + return OK; +} diff --git a/ext/libpsem/bsem.h b/ext/libpsem/bsem.h new file mode 100644 index 0000000..97db5ca --- /dev/null +++ b/ext/libpsem/bsem.h @@ -0,0 +1,32 @@ +#ifndef __BSEM_H__ +#define __BSEM_H__ + +#include "psem.h" +#include "psem_error.h" + +struct bsem { + psem_t psem; + psem_t lock; + int maxvalue; +}; + +typedef struct bsem bsem_t; + +extern size_t sizeof_bsem_t; + +bsem_t * bsem_alloc(); +void bsem_free(bsem_t *bsem); + +int bsem_open(bsem_t *, const char *, unsigned int, unsigned int, error_t **); +int bsem_close(bsem_t *, error_t **); +int bsem_unlink(const char *, error_t **); + +int bsem_post(bsem_t *, error_t **); +int bsem_wait(bsem_t *, error_t **); +int bsem_trywait(bsem_t *, error_t **); +int bsem_timedwait(bsem_t *, float, error_t **); + +int bsem_getvalue(bsem_t *, int *, error_t **); + +#endif /* __BSEM_H__ */ + diff --git a/ext/libpsem/constants.c b/ext/libpsem/constants.c new file mode 100644 index 0000000..13c496b --- /dev/null +++ b/ext/libpsem/constants.c @@ -0,0 +1,22 @@ +/** + * Define and extern various constants defined as macros. + */ + +#include /* PROT_*, MAP_* */ +#include /* O_* */ + +#include "constants.h" + +int o_rdwr = O_RDWR; +int o_creat = O_CREAT; +int o_excl = O_EXCL; + +int prot_read = PROT_READ; +int prot_write = PROT_WRITE; +int prot_exec = PROT_EXEC; +int prot_none = PROT_NONE; + +void * map_failed = MAP_FAILED; + +int map_shared = MAP_SHARED; +int map_private = MAP_PRIVATE; diff --git a/ext/libpsem/constants.h b/ext/libpsem/constants.h new file mode 100644 index 0000000..c0a1023 --- /dev/null +++ b/ext/libpsem/constants.h @@ -0,0 +1,18 @@ +#ifndef __CONSTANTS_H__ +#define __CONSTANTS_H__ + +extern int o_rdwr; +extern int o_creat; +extern int o_excl; + +extern int prot_read; +extern int prot_write; +extern int prot_exec; +extern int prot_none; + +extern void * map_failed; + +extern int map_shared; +extern int map_private; + +#endif diff --git a/ext/libpsem/extconf.rb b/ext/libpsem/extconf.rb new file mode 100644 index 0000000..1ec1407 --- /dev/null +++ b/ext/libpsem/extconf.rb @@ -0,0 +1,36 @@ +require 'mkmf' + +$objs = [] + +# posix semaphores +if have_func('sem_open', 'semaphore.h') + have_func('floorf', 'math.h') or abort("Missing required floorf() in math.h") + have_library('m', 'floorf') + + unless have_func('mempcpy', 'string.h') + $objs << 'mempcpy.o' + end + + have_library('rt', 'sem_open') +end + +c_sources = ['psem.c', 'psem_error.c', 'psem_posix.c', 'bsem.c', 'constants.c'] +$objs += ['psem.o', 'psem_error.o', 'bsem.o', 'constants.o'] + +depend_rules <<-END +psem.c: psem.h psem_posix.c +psem_error.c: psem_error.h + +bsem.h: psem.h psem_error.h +bsem.c: psem.h psem_error.h bsem.h + +constants.c: constants.h +mempcpy.c: mempcpy.h + +#{$objs.map { |o| "#{o}: #{o.chomp(".o")}.c" }.join("\n")} + +libpsem.o: #{$objs.join(' ')} +END + + +create_makefile('libpsem') diff --git a/ext/libpsem/mempcpy.c b/ext/libpsem/mempcpy.c new file mode 100644 index 0000000..9a66d50 --- /dev/null +++ b/ext/libpsem/mempcpy.c @@ -0,0 +1,7 @@ +#include "mempcpy.h" + +void * +mempcpy(void *dest, const void *src, size_t n) +{ + return (char *) memcpy(dest, src, n) + n; +} diff --git a/ext/libpsem/mempcpy.h b/ext/libpsem/mempcpy.h new file mode 100644 index 0000000..3aaeddb --- /dev/null +++ b/ext/libpsem/mempcpy.h @@ -0,0 +1,13 @@ +#ifndef __MEMPCPY_H__ +#define __MEMPCPY_H__ + +#ifdef HAVE_MEMPCPY +#define __USE_GNU +#else +#include +void *mempcpy(void *, const void *, size_t); +#endif + +#include + +#endif /* __MEMPCPY_H__ */ diff --git a/ext/libpsem/mutex.c b/ext/libpsem/mutex.c new file mode 100644 index 0000000..0b137d6 --- /dev/null +++ b/ext/libpsem/mutex.c @@ -0,0 +1,15 @@ +#include /* malloc, free */ + +#include "mutex.h" + +size_t sizeof_mutex_t = sizeof (mutex_t); + +mutex_t * +mutex_alloc(void) { + return malloc(sizeof(mutex_t)); +} + +void +mutex_free(mutex_t * mutex) { + free(mutex); +} diff --git a/ext/libpsem/mutex.h b/ext/libpsem/mutex.h new file mode 100644 index 0000000..fe0b93e --- /dev/null +++ b/ext/libpsem/mutex.h @@ -0,0 +1,14 @@ +#ifndef __MUTEX_H__ +#define __MUTEX_H__ + +#include "bsem.h" + +struct mutex { + bsem_t *bsem; +}; + +typedef struct mutex mutex_t; + +extern size_t sizeof_mutex_t; + +#endif /* __MUTEX_H__ */ diff --git a/ext/libpsem/psem.c b/ext/libpsem/psem.c new file mode 100644 index 0000000..c77075d --- /dev/null +++ b/ext/libpsem/psem.c @@ -0,0 +1,15 @@ +#include "psem.h" + +int OK = 0; +int ERROR = -1; + +int E_SOURCE_SYSTEM = 1; +int E_SOURCE_PSEM = 2; + +int E_NAME_TOO_LONG = 1; + +#ifdef HAVE_SEM_OPEN +#include "psem_posix.c" +#endif + +size_t sizeof_psem_t = sizeof (psem_t); diff --git a/ext/libpsem/psem.h b/ext/libpsem/psem.h new file mode 100644 index 0000000..c95ead7 --- /dev/null +++ b/ext/libpsem/psem.h @@ -0,0 +1,43 @@ +#ifndef __PSEM_H__ +#define __PSEM_H__ + +/** + * Portable semaphore interface focusing on cross-process use. + */ + +#ifdef HAVE_SEM_OPEN +#include "psem_posix.h" +#endif + +#include "psem_error.h" + +typedef struct psem psem_t; + +extern size_t sizeof_psem_t; + +extern int OK; +extern int ERROR; + +extern int E_SOURCE_SYSTEM; +extern int E_SOURCE_PSEM; + +extern int E_NAME_TOO_LONG; + +int psem_errno(); + +psem_t * psem_alloc(); +void psem_free(psem_t *); + +int psem_open(psem_t *, const char *, unsigned int, error_t **); +int psem_close(psem_t *, error_t **); +int psem_unlink(const char *, error_t **); + +int psem_post(psem_t *, error_t **); + +int psem_wait(psem_t *, error_t **); +int psem_trywait(psem_t *, error_t **); +int psem_timedwait(psem_t *, float, error_t **); + +int psem_getvalue(psem_t *, int *, error_t **); + +#endif /* __PSEM_H__ */ diff --git a/ext/libpsem/psem_error.c b/ext/libpsem/psem_error.c new file mode 100644 index 0000000..4ea7cd7 --- /dev/null +++ b/ext/libpsem/psem_error.c @@ -0,0 +1,46 @@ +/** + * Similar to GError from GLib. + */ + +#include /* malloc, free */ + +#include "psem_error.h" + +struct error { + int error_source; + int error_number; +}; + +error_t * +error_alloc() +{ + return (error_t *) malloc(sizeof (error_t)); +} + +void +error_free(error_t *err) +{ + free(err); +} + +void +error_set(error_t *err, int source, int value) +{ + err->error_source = source; + err->error_number = value; +} + +void +error_new(error_t **err, int source, int value) +{ + if (err != NULL) { + if (*err == NULL) { + *err = error_alloc(); + error_set(*err, source, value); + } else { + /* tried to create a new error atop an existing error... */ + } + } else { + /* error is being ignored by caller */ + } +} diff --git a/ext/libpsem/psem_error.h b/ext/libpsem/psem_error.h new file mode 100644 index 0000000..ac760a0 --- /dev/null +++ b/ext/libpsem/psem_error.h @@ -0,0 +1,11 @@ +#ifndef __PSEM_ERROR_H__ +#define __PSEM_ERROR_H__ + +typedef struct error error_t; + +error_t * error_alloc(); +void error_free(error_t *); + +void error_set(error_t *, int, int); + +#endif /* __PSEM_ERROR_H__ */ diff --git a/ext/libpsem/psem_posix.c b/ext/libpsem/psem_posix.c new file mode 100644 index 0000000..740145e --- /dev/null +++ b/ext/libpsem/psem_posix.c @@ -0,0 +1,130 @@ +/* + * A type which wraps a semaphore + * + * semaphore.c + * + * Copyright (c) 2006-2008, R Oudkerk + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials provided + * with the distribution. + * + * 3. Neither the name of author nor the names of any contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A + * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * Modifications Copyright (c) 2011, Patrick Mahoney + */ + +#include +#include /* For O_* constants */ +#include /* For mode constants */ +#include +#include /* malloc, free */ +#include /* floorf */ +#include /* timespec */ + +#include "psem.h" +#include "psem_posix.h" + +psem_t * +psem_alloc(void) { + return (psem_t *) malloc(sizeof(psem_t)); +} + +void +psem_free(psem_t *psem) { + free(psem); +} + +#define errcheck_val(expr, errval, err) \ + do { \ + if ((expr) == (errval)) { \ + error_new((err), E_SOURCE_SYSTEM, errno); \ + return ERROR; \ + } \ + return OK; \ + } while (0) + +#define errcheck(expr, err) errcheck_val((expr), -1, (err)) + +int +psem_open(psem_t *psem, const char *name, unsigned int value, error_t **err) +{ + errcheck_val(psem->sem = sem_open(name, O_CREAT | O_EXCL, 0600, value), + SEM_FAILED, + err); +} + +int +psem_close(psem_t *psem, error_t **err) +{ + errcheck(sem_close(psem->sem), err); +} + +int +psem_unlink(const char *name, error_t **err) +{ + errcheck(sem_unlink(name), err); +} + +int +psem_post(psem_t *psem, error_t **err) +{ + errcheck(sem_post(psem->sem), err); +} + +int +psem_wait(psem_t *psem, error_t **err) +{ + errcheck(sem_wait(psem->sem), err); +} + +int +psem_trywait(psem_t *psem, error_t **err) +{ + errcheck(sem_trywait(psem->sem), err); +} + +int +psem_timedwait(psem_t *psem, float timeout_s, error_t **err) +{ + struct timespec abs_timeout; + + abs_timeout.tv_sec = floorf(timeout_s); + abs_timeout.tv_nsec = + floorf((timeout_s - abs_timeout.tv_sec) * (1000 * 1000 * 1000)); + + errcheck(sem_timedwait(psem->sem, &abs_timeout), err); +} + +int +psem_getvalue(psem_t *psem, int *sval, error_t **err) +{ + errcheck(sem_getvalue(psem->sem, sval), err); +} diff --git a/ext/libpsem/psem_posix.h b/ext/libpsem/psem_posix.h new file mode 100644 index 0000000..a52599a --- /dev/null +++ b/ext/libpsem/psem_posix.h @@ -0,0 +1,10 @@ +#ifndef __PSEM_POSIX_H__ +#define __PSEM_POSIX_H__ + +#include + +struct psem { + sem_t *sem; +}; + +#endif /* __PSEM_POSIX_H__ */ diff --git a/lib/process_shared.rb b/lib/process_shared.rb new file mode 100644 index 0000000..717d0ee --- /dev/null +++ b/lib/process_shared.rb @@ -0,0 +1,6 @@ +require 'ffi' + +require 'process_shared/semaphore' +require 'process_shared/bounded_semaphore' +require 'process_shared/mutex' +require 'process_shared/shared_memory' diff --git a/lib/process_shared/abstract_semaphore.rb b/lib/process_shared/abstract_semaphore.rb new file mode 100644 index 0000000..31662c0 --- /dev/null +++ b/lib/process_shared/abstract_semaphore.rb @@ -0,0 +1,50 @@ +require 'process_shared/psem' +require 'process_shared/with_self' + +module ProcessShared + class AbstractSemaphore + include WithSelf + protected + include ProcessShared::PSem + public + + # Generate a name for a semaphore. + def self.gen_name(middle, name = nil) + if name + name + else + @count ||= 0 + @count += 1 + "ps-#{middle}-#{Process.pid}-#{@count}" + end + end + + def self.make_finalizer(name) + proc { ProcessShared::PSem.psem_unlink(name, nil) } + end + + # private_class_method :new + + protected + + attr_reader :sem, :err + + def init(size, middle, name, &block) + @sem = FFI::MemoryPointer.new(size) + @err = FFI::MemoryPointer.new(:pointer) + psem_name = AbstractSemaphore.gen_name(middle, name) + block.call(psem_name) + + if name + # name explicitly given. Don't unlink because we might want to share it with another process. + # Instead, register a finalizer to unlink. + ObjectSpace.define_finalizer(self, self.class.make_finalizer(name)) + else + # On Linux, removes the entry in /dev/shm and prevents other + # processes from opening this semaphore unless they inherit it + # as forked children. + psem_unlink(psem_name, err) unless name + end + end + end +end diff --git a/lib/process_shared/bounded_semaphore.rb b/lib/process_shared/bounded_semaphore.rb new file mode 100644 index 0000000..812e33a --- /dev/null +++ b/lib/process_shared/bounded_semaphore.rb @@ -0,0 +1,43 @@ +require 'process_shared/psem' +require 'process_shared/semaphore' + +module ProcessShared + class BoundedSemaphore < Semaphore + # With no associated block, open is a synonym for + # Semaphore.new. If the optional code block is given, it will be + # passed `sem` as an argument, and the Semaphore object will + # automatically be closed when the block terminates. In this + # instance, Semaphore.open returns the value of the block. + # + # @param [Integer] value the initial semaphore value + # @param [String] name not currently supported + def self.open(maxvalue, value = 1, name = nil, &block) + new(maxvalue, value, name).with_self(&block) + end + + # Create a new semaphore with initial value `value`. After + # Kernel#fork, the semaphore will be shared across two (or more) + # processes. The semaphore must be closed with #close in each + # process that no longer needs the semaphore. + # + # (An object finalizer is registered that will close the semaphore + # to avoid memory leaks, but this should be considered a last + # resort). + # + # @param [Integer] value the initial semaphore value + # @param [String] name not currently supported + def initialize(maxvalue, value = 1, name = nil) + init(PSem.sizeof_bsem_t, 'bsem', name) do |sem_name| + bsem_open(sem, sem_name, maxvalue, value, err) + end + end + + protected + + alias_method :psem_unlink, :bsem_unlink + alias_method :psem_close, :bsem_close + alias_method :psem_wait, :bsem_wait + alias_method :psem_post, :bsem_post + alias_method :psem_getvalue, :bsem_getvalue + end +end diff --git a/lib/process_shared/condition_variable.rb b/lib/process_shared/condition_variable.rb new file mode 100644 index 0000000..afca674 --- /dev/null +++ b/lib/process_shared/condition_variable.rb @@ -0,0 +1,27 @@ +require 'process_shared/semaphore' + +module ProcessShared + # TODO: implement this + class ConditionVariable + def initialize + @sem = Semaphore.new + end + + def broadcast + @sem.post + end + + def signal + @sem.post + end + + def wait(mutex, timeout = nil) + mutex.unlock + begin + @sem.wait + ensure + mutex.lock + end + end + end +end diff --git a/lib/process_shared/libc.rb b/lib/process_shared/libc.rb new file mode 100644 index 0000000..0a49ef2 --- /dev/null +++ b/lib/process_shared/libc.rb @@ -0,0 +1,36 @@ +require 'ffi' + +require 'process_shared/posix_call' +require 'process_shared/psem' + +module ProcessShared + module LibC + extend FFI::Library + extend PosixCall + + ffi_lib FFI::Library::LIBC + + MAP_FAILED = FFI::Pointer.new(-1) + MAP_SHARED = PSem.map_shared + MAP_PRIVATE = PSem.map_private + + PROT_READ = PSem.prot_read + PROT_WRITE = PSem.prot_write + PROT_EXEC = PSem.prot_exec + PROT_NONE = PSem.prot_none + + O_RDWR = PSem.o_rdwr + O_CREAT = PSem.o_creat + O_EXCL = PSem.o_excl + + attach_variable :errno, :int + + attach_function :mmap, [:pointer, :size_t, :int, :int, :int, :off_t], :pointer + attach_function :munmap, [:pointer, :size_t], :int + attach_function :ftruncate, [:int, :off_t], :int + attach_function :close, [:int], :int + + error_check(:mmap) { |v| v == MAP_FAILED } + error_check(:munmap, :ftruncate, :close) + end +end diff --git a/lib/process_shared/mutex.rb b/lib/process_shared/mutex.rb new file mode 100644 index 0000000..496ed97 --- /dev/null +++ b/lib/process_shared/mutex.rb @@ -0,0 +1,103 @@ +require 'process_shared/bounded_semaphore' +require 'process_shared/with_self' +require 'process_shared/shared_memory' +require 'process_shared/process_error' + +module ProcessShared + class Mutex + include WithSelf + + def self.open(&block) + new.with_self(&block) + end + + def initialize + @internal_sem = BoundedSemaphore.new(1) + @locked_by = SharedMemory.new(:int) + + @sem = BoundedSemaphore.new(1) + end + + # @return [Mutex] + def lock + @sem.wait + self.locked_by = ::Process.pid + self + end + + # @return [Boolean] + def locked? + locked_by > 0 + end + + # Releases the lock and sleeps timeout seconds if it is given and + # non-nil or forever. + # + # @return [Numeric] + def sleep(timeout = nil) + unlock + begin + timeout ? sleep(timeout) : sleep + ensure + lock + end + end + + # @return [Boolean] + def try_lock + with_internal_lock do + if @locked_by.get_int(0) > 0 + false # was locked + else + @sem.wait + self.locked_by = ::Process.pid + true + end + end + end + + # @return [Mutex] + def unlock + if (p = locked_by) != ::Process.pid + raise ProcessError, "lock is held by #{p} not #{::Process.pid}" + end + + @sem.post + self + end + + # Acquire the lock, yield the block, then ensure the lock is + # unlocked. + def synchronize + lock + begin + yield + ensure + unlock + end + end + + private + + def locked_by + with_internal_lock do + @locked_by.get_int(0) + end + end + + def locked_by=(val) + with_internal_lock do + @locked_by.put_int(0, val) + end + end + + def with_internal_lock + @internal_sem.wait + begin + yield + ensure + @internal_sem.post + end + end + end +end diff --git a/lib/process_shared/posix_call.rb b/lib/process_shared/posix_call.rb new file mode 100644 index 0000000..5d1bc34 --- /dev/null +++ b/lib/process_shared/posix_call.rb @@ -0,0 +1,29 @@ +# require 'process_shared/libc' - circular dependency here... + +module ProcessShared + module PosixCall + # Replace methods in `syms` with error checking wrappers that + # invoke the original method and raise a SystemCallError with the + # current errno if the return value is an error. + # + # Errors are detected if the block returns true when called with + # the original method's return value. + def error_check(*syms, &is_err) + unless block_given? + is_err = lambda { |v| (v == -1) } + end + + syms.each do |sym| + method = self.method(sym) + define_singleton_method(sym) do |*args| + ret = method.call(*args) + if is_err.call(ret) + raise SystemCallError.new("error in #{sym}", LibC.errno) + else + ret + end + end + end + end + end +end diff --git a/lib/process_shared/process_error.rb b/lib/process_shared/process_error.rb new file mode 100644 index 0000000..acba381 --- /dev/null +++ b/lib/process_shared/process_error.rb @@ -0,0 +1,3 @@ +module ProcessShared + class ProcessError < Exception; end +end diff --git a/lib/process_shared/psem.rb b/lib/process_shared/psem.rb new file mode 100644 index 0000000..b425db2 --- /dev/null +++ b/lib/process_shared/psem.rb @@ -0,0 +1,109 @@ +require 'ffi' + +module ProcessShared + module PSem + class Error < FFI::Struct + layout(:source, :int, + :errno, :int) + end + + extend FFI::Library + + # Workaround FFI dylib/bundle issue. See https://github.com/ffi/ffi/issues/42 + suffix = if FFI::Platform.mac? + 'bundle' + else + FFI::Platform::LIBSUFFIX + end + + ffi_lib File.join(File.expand_path(File.dirname(__FILE__)), + 'libpsem.' + suffix) + + class << self + # Replace methods in `syms` with error checking wrappers that + # invoke the original psem method and raise an appropriate + # error. + def psem_error_check(*syms) + syms.each do |sym| + method = self.method(sym) + + block = lambda do |*args| + if method.call(*args) < 0 + errp = args[-1] + unless errp.nil? + begin + err = Error.new(errp.get_pointer(0)) + if err[:source] == PSem.e_source_system + raise SystemCallError.new("error in #{sym}", err[:errno]) + else + raise "error in #{sym}: #{err.get_integer(1)}" + end + ensure + psem_error_free(err) + end + end + end + end + + define_method(sym, &block) + define_singleton_method(sym, &block) + end + end + end + + # Generic constants + + int_consts = [:o_rdwr, + :o_creat, + :o_excl, + + :prot_read, + :prot_write, + :prot_exec, + :prot_none, + + :map_shared, + :map_private] + int_consts.each { |sym| attach_variable sym, :int } + + # Other constants, functions + + attach_function :psem_error_free, :error_free, [:pointer], :void + + attach_variable :e_source_system, :E_SOURCE_SYSTEM, :int + attach_variable :e_source_psem, :E_SOURCE_PSEM, :int + + attach_variable :e_name_too_long, :E_NAME_TOO_LONG, :int + + attach_variable :sizeof_psem_t, :size_t + attach_variable :sizeof_bsem_t, :size_t + + # PSem functions + + attach_function :psem_open, [:pointer, :string, :uint, :pointer], :int + attach_function :psem_close, [:pointer, :pointer], :int + attach_function :psem_unlink, [:string, :pointer], :int + attach_function :psem_post, [:pointer, :pointer], :int + attach_function :psem_wait, [:pointer, :pointer], :int + attach_function :psem_trywait, [:pointer, :pointer], :int + attach_function :psem_timedwait, [:pointer, :pointer, :pointer], :int + attach_function :psem_getvalue, [:pointer, :pointer, :pointer], :int + + psem_error_check(:psem_open, :psem_close, :psem_unlink, :psem_post, + :psem_wait, :psem_trywait, :psem_timedwait, :psem_getvalue) + + # BSem functions + + attach_function :bsem_open, [:pointer, :string, :uint, :uint, :pointer], :int + attach_function :bsem_close, [:pointer, :pointer], :int + attach_function :bsem_unlink, [:string, :pointer], :int + attach_function :bsem_post, [:pointer, :pointer], :int + attach_function :bsem_wait, [:pointer, :pointer], :int + attach_function :bsem_trywait, [:pointer, :pointer], :int + attach_function :bsem_timedwait, [:pointer, :pointer, :pointer], :int + attach_function :bsem_getvalue, [:pointer, :pointer, :pointer], :int + + psem_error_check(:bsem_open, :bsem_close, :bsem_unlink, :bsem_post, + :bsem_wait, :bsem_trywait, :bsem_timedwait, :bsem_getvalue) + end +end diff --git a/lib/process_shared/rt.rb b/lib/process_shared/rt.rb new file mode 100644 index 0000000..672d386 --- /dev/null +++ b/lib/process_shared/rt.rb @@ -0,0 +1,21 @@ +require 'process_shared/posix_call' +require 'process_shared/psem' + +module ProcessShared + module RT + extend FFI::Library + extend PosixCall + + # FIXME: mac and linux OK, but what about everything else? + if FFI::Platform.mac? + ffi_lib 'c' + else + ffi_lib 'rt' + end + + attach_function :shm_open, [:string, :int, :mode_t], :int + attach_function :shm_unlink, [:string], :int + + error_check :shm_open, :shm_unlink + end +end diff --git a/lib/process_shared/semaphore.rb b/lib/process_shared/semaphore.rb new file mode 100644 index 0000000..7c07371 --- /dev/null +++ b/lib/process_shared/semaphore.rb @@ -0,0 +1,60 @@ +require 'process_shared/psem' +require 'process_shared/abstract_semaphore' + +module ProcessShared + class Semaphore < AbstractSemaphore + # With no associated block, open is a synonym for + # Semaphore.new. If the optional code block is given, it will be + # passed `sem` as an argument, and the Semaphore object will + # automatically be closed when the block terminates. In this + # instance, Semaphore.open returns the value of the block. + # + # @param [Integer] value the initial semaphore value + # @param [String] name not currently supported + def self.open(value = 1, name = nil, &block) + new(value, name).with_self(&block) + end + + # Create a new semaphore with initial value `value`. After + # Kernel#fork, the semaphore will be shared across two (or more) + # processes. The semaphore must be closed with #close in each + # process that no longer needs the semaphore. + # + # (An object finalizer is registered that will close the semaphore + # to avoid memory leaks, but this should be considered a last + # resort). + # + # @param [Integer] value the initial semaphore value + # @param [String] name not currently supported + def initialize(value = 1, name = nil) + init(PSem.sizeof_psem_t, 'psem', name) do |sem_name| + psem_open(sem, sem_name, value, err) + end + end + + # Decrement the value of the semaphore. If the value is zero, + # wait until another process increments via #post. + def wait + psem_wait(sem, err) + end + + # Increment the value of the semaphore. If other processes are + # waiting on this semaphore, one will be woken. + def post + psem_post(sem, err) + end + + # Get the current value of the semaphore. + # + # @return [Integer] the current value of the semaphore. + def value + int = FFI::MemoryPointer.new(:int) + psem_getvalue(sem, int, err) + int.get_int(0) + end + + def close + psem_close(sem, err) + end + end +end diff --git a/lib/process_shared/shared_memory.rb b/lib/process_shared/shared_memory.rb new file mode 100644 index 0000000..4e187d2 --- /dev/null +++ b/lib/process_shared/shared_memory.rb @@ -0,0 +1,45 @@ +require 'process_shared/rt' +require 'process_shared/libc' +require 'process_shared/with_self' + +module ProcessShared + # Memory block shared across processes. TODO: finalizer that closes... + class SharedMemory < FFI::Pointer + include WithSelf + + attr_reader :size, :fd + + def self.open(size, &block) + new(size).with_self(&block) + end + + def initialize(size) + @size = case size + when Symbol + FFI.type_size(size) + else + size + end + + name = "/ps-shm#{rand(10000)}" + @fd = RT.shm_open(name, + LibC::O_CREAT | LibC::O_RDWR | LibC::O_EXCL, + 0777) + RT.shm_unlink(name) + + LibC.ftruncate(@fd, @size) + @pointer = LibC.mmap(nil, + @size, + LibC::PROT_READ | LibC::PROT_WRITE, + LibC::MAP_SHARED, + @fd, + 0) + super(@pointer) + end + + def close + LibC.munmap(@pointer, @size) + LibC.close(@fd) + end + end +end diff --git a/lib/process_shared/with_self.rb b/lib/process_shared/with_self.rb new file mode 100644 index 0000000..80d2a85 --- /dev/null +++ b/lib/process_shared/with_self.rb @@ -0,0 +1,20 @@ +module ProcessShared + module WithSelf + # With no associated block, return self. If the optional code + # block is given, it will be passed `self` as an argument, and the + # self object will automatically be closed (by invoking `close` on + # `self`) when the block terminates. In this instance, value of + # the block is returned. + def with_self + if block_given? + begin + yield self + ensure + self.close + end + else + self + end + end + end +end diff --git a/process_shared.gemspec b/process_shared.gemspec new file mode 100644 index 0000000..656b9c5 --- /dev/null +++ b/process_shared.gemspec @@ -0,0 +1,20 @@ +Gem::Specification.new do |s| + s.name = 'process_shared' + s.version = '0.0.1' + s.platform = Gem::Platform::RUBY + s.has_rdoc = true + s.extra_rdoc_files = ["README.rdoc", "ChangeLog", "COPYING"] + s.summary = 'process-shared synchronization primitives' + s.description = 'FFI wrapper around portable semaphore library with mutex and condition vars built on top.' + s.author = 'Patrick Mahoney' + s.email = 'pat@polycrystal.org' + s.homepage = '' + s.files = Dir['lib/**/*.rb', 'lib/**/libpsem*', 'ext/**/*.{c,h,rb}', 'spec/**/*.rb'] + s.extensions = FileList["ext/**/extconf.rb"] + + s.add_dependency('ffi', '~> 1.0') + + s.add_development_dependency('rake-compiler') + s.add_development_dependency('minitest') + s.add_development_dependency('minitest-matchers') +end diff --git a/spec/process_shared/bounded_semaphore_spec.rb b/spec/process_shared/bounded_semaphore_spec.rb new file mode 100644 index 0000000..f996f49 --- /dev/null +++ b/spec/process_shared/bounded_semaphore_spec.rb @@ -0,0 +1,48 @@ +require 'spec_helper' +require 'process_shared/bounded_semaphore' + +module ProcessShared + describe BoundedSemaphore do + it 'never rises above its max value' do + max = 10 + BoundedSemaphore.open(max) do |sem| + pids = [] + 10.times do |i| + pids << fork do + 100.times do + if rand(3) == 0 + sem.wait + else + sem.post + end + end + + exit i + end + end + + 100.times do + sem.value.must be_lte(max) + end + + pids.each { |pid| Process.wait(pid) } + end + end + + describe '#post and #wait' do + it 'increments and decrements the value' do + Semaphore.open(0) do |sem| + 10.times do |i| + sem.post + sem.value.must_equal(i + 1) + end + + 10.times do |i| + sem.wait + sem.value.must_equal(10 - i - 1) + end + end + end + end + end +end diff --git a/spec/process_shared/libc_spec.rb b/spec/process_shared/libc_spec.rb new file mode 100644 index 0000000..18ef8c9 --- /dev/null +++ b/spec/process_shared/libc_spec.rb @@ -0,0 +1,9 @@ +require 'process_shared/libc' + +module ProcessShared + describe LibC do + it 'throws exceptions with invalid args' do + proc { LibC.mmap nil,2,0,0,1,0 }.must_raise(Errno::EINVAL) + end + end +end diff --git a/spec/process_shared/mutex_spec.rb b/spec/process_shared/mutex_spec.rb new file mode 100644 index 0000000..84869b5 --- /dev/null +++ b/spec/process_shared/mutex_spec.rb @@ -0,0 +1,74 @@ +require 'spec_helper' +require 'process_shared/mutex' +require 'process_shared/shared_memory' + +module ProcessShared + describe Mutex do + it 'protects access to a shared variable' do + mutex = Mutex.new + mem = SharedMemory.new(:char) + mem.put_char(0, 0) + + pids = [] + 10.times do |i| + inc = (-1) ** i # half the procs increment; half decrement + pids << fork do + 10.times do + mutex.lock + begin + mem.put_char(0, mem.get_char(0) + inc) + sleep 0.001 + ensure + mutex.unlock + end + end + Kernel.exit! + end + end + + pids.each { |pid| ::Process.wait(pid) } + + mem.get_char(0).must_equal(0) + end + + it 'protects access to a shared variable with synchronize' do + mutex = Mutex.new + mem = SharedMemory.new(:char) + mem.put_char(0, 0) + + pids = [] + 10.times do |i| + inc = (-1) ** i # half the procs increment; half decrement + pids << fork do + 10.times do + mutex.synchronize do + mem.put_char(0, mem.get_char(0) + inc) + sleep 0.001 + end + end + Kernel.exit! + end + end + + pids.each { |pid| ::Process.wait(pid) } + + mem.get_char(0).must_equal(0) + end + + it 'raises exception when unlocked by other process' do + mutex = Mutex.new + + pid = Kernel.fork do + mutex.lock + sleep 0.2 + mutex.unlock + Kernel.exit! + end + + sleep 0.1 + proc { mutex.unlock }.must_raise(ProcessError) + + ::Process.wait(pid) + end + end +end diff --git a/spec/process_shared/psem_spec.rb b/spec/process_shared/psem_spec.rb new file mode 100644 index 0000000..ad05d28 --- /dev/null +++ b/spec/process_shared/psem_spec.rb @@ -0,0 +1,136 @@ +require 'spec_helper' +require 'process_shared/psem' + +module ProcessShared + describe PSem do + before do + extend PSem + end + + before(:each) do + @err = FFI::MemoryPointer.new(:pointer) + end + + describe '.psem_open' do + it 'opens a psem' do + psem = FFI::MemoryPointer.new(PSem.sizeof_psem_t) + psem_open(psem, "psem-test", 1, @err) + psem_unlink("psem-test", @err) + end + + it 'raises excpetion if name alredy exists' do + psem1 = FFI::MemoryPointer.new(PSem.sizeof_psem_t) + psem2 = FFI::MemoryPointer.new(PSem.sizeof_psem_t) + psem_open(psem1, "psem-test", 1, @err) + proc { psem_open(psem2, "psem-test", 1, @err) }.must_raise(Errno::EEXIST) + + psem_unlink("psem-test", @err) + psem_open(psem2, "psem-test", 1, @err) + psem_unlink("psem-test", @err) + end + end + + describe '.psem_wait' do + before(:each) do + @psem = FFI::MemoryPointer.new(PSem.sizeof_psem_t) + psem_open(@psem, 'psem-test', 1, @err) + psem_unlink('psem-test', @err) + + @int = FFI::MemoryPointer.new(:int) + end + + after(:each) do + #psem_close(@psem, @err) + end + + def value + psem_getvalue(@psem, @int, @err) + @int.get_int(0) + end + + it 'decrements psem value' do + value.must_equal 1 + psem_wait(@psem, @err) + value.must_equal(0) + end + + it 'waits until another process posts' do + psem_wait(@psem, @err) + + # child exits with ~ time spent waiting + child = fork do + start = Time.now + psem_wait(@psem, @err) + exit (Time.now - start).ceil + end + + t = 1.5 + sleep t + psem_post(@psem, @err) + _pid, status = Process.wait2(child) + status.exitstatus.must_equal 2 + end + end + + describe '.bsem_open' do + it 'opens a bsem' do + bsem = FFI::MemoryPointer.new(PSem.sizeof_bsem_t) + bsem_open(bsem, "bsem-test", 1, 1, @err) + bsem_unlink("bsem-test", @err) + end + + it 'raises excpetion if name alredy exists' do + bsem1 = FFI::MemoryPointer.new(PSem.sizeof_bsem_t) + bsem2 = FFI::MemoryPointer.new(PSem.sizeof_bsem_t) + bsem_open(bsem1, "bsem-test", 1, 1, @err) + proc { bsem_open(bsem2, "bsem-test", 1, 1, @err) }.must_raise(Errno::EEXIST) + + bsem_unlink("bsem-test", @err) + bsem_open(bsem2, "bsem-test", 1, 1, @err) + bsem_unlink("bsem-test", @err) + end + end + + describe '.bsem_wait' do + before(:each) do + @bsem = FFI::MemoryPointer.new(PSem.sizeof_bsem_t) + bsem_open(@bsem, 'bsem-test', 1, 1, @err) + bsem_unlink('bsem-test', @err) + + @int = FFI::MemoryPointer.new(:int) + end + + after do + #bsem_close(@bsem, @err) + end + + def value + bsem_getvalue(@bsem, @int, @err) + @int.get_int(0) + end + + it 'decrements bsem value' do + value.must_equal 1 + bsem_wait(@bsem, @err) + value.must_equal 0 + end + + it 'waits until another process posts' do + bsem_wait(@bsem, @err) + + # child exits with ~ time spent waiting + child = fork do + start = Time.now + bsem_wait(@bsem, @err) + exit (Time.now - start).ceil + end + + t = 1.5 + sleep t + bsem_post(@bsem, @err) + _pid, status = Process.wait2(child) + status.exitstatus.must_equal 2 + end + end + end +end diff --git a/spec/process_shared/semaphore_spec.rb b/spec/process_shared/semaphore_spec.rb new file mode 100644 index 0000000..8201d1e --- /dev/null +++ b/spec/process_shared/semaphore_spec.rb @@ -0,0 +1,76 @@ +require 'spec_helper' + +require 'ffi' +require 'process_shared/semaphore' +require 'process_shared/shared_memory' + +module ProcessShared + describe Semaphore do + it 'coordinates access to shared object' do + nprocs = 4 # number of processes + nincrs = 1000 # each process increments nincrs times + + do_increments = lambda do |mem, sem| + nincrs.times do + sem.wait + begin + val = mem.get_int(0) + # ensure other procs have a chance to interfere + sleep 0.001 if rand(100) == 0 + mem.put_int(0, val + 1) + rescue => e + "#{Process.pid} die'ing because #{e}" + ensure + sem.post + end + end + end + + # Make sure it fails with no synchronization + no_sem = Object.new + class << no_sem + def wait; end + def post; end + end + SharedMemory.open(FFI.type_size(:int)) do |mem| + pids = [] + nprocs.times do + pids << fork { do_increments.call(mem, no_sem); exit } + end + + pids.each { |p| Process.wait(p) } + # puts "mem is #{mem.get_int(0)}" + mem.get_int(0).must be_lt(nprocs * nincrs) + end + + # Now try with synchronization + SharedMemory.open(FFI.type_size(:int)) do |mem| + pids = [] + Semaphore.open do |sem| + nprocs.times do + pids << fork { do_increments.call(mem, sem); exit } + end + end + + pids.each { |p| Process.wait(p) } + mem.get_int(0).must_equal(nprocs * nincrs) + end + end + + describe '#post and #wait' do + it 'increments and decrements the value' do + Semaphore.open(0) do |sem| + 10.times do |i| + sem.post + sem.value.must_equal(i + 1) + end + + 10.times do |i| + sem.wait + sem.value.must_equal(10 - i - 1) + end + end + end + end + end +end diff --git a/spec/process_shared/shared_memory_spec.rb b/spec/process_shared/shared_memory_spec.rb new file mode 100644 index 0000000..d635157 --- /dev/null +++ b/spec/process_shared/shared_memory_spec.rb @@ -0,0 +1,36 @@ +require 'spec_helper' +require 'process_shared/shared_memory' + +module ProcessShared + describe SharedMemory do + it 'shares memory across processes' do + mem = SharedMemory.new(1) + mem.put_char(0, 0) + mem.get_char(0).must_equal(0) + + pid = fork do + mem.put_char(0, 123) + Kernel.exit! + end + + ::Process.wait(pid) + + mem.get_char(0).must_equal(123) + end + + it 'initializes with type symbol' do + mem = SharedMemory.new(:int) + mem.put_int(0, 0) + mem.get_int(0).must_equal(0) + + pid = fork do + mem.put_int(0, 1234567) + Kernel.exit! + end + + ::Process.wait(pid) + + mem.get_int(0).must_equal(1234567) + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..b415a70 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,35 @@ +gem 'minitest' +require 'minitest/spec' +require 'minitest/autorun' +require 'minitest/matchers' + +class RangeMatcher + def initialize(operator, limit) + @operator = operator.to_sym + @limit = limit + end + + def description + "be #{operator} #{@limit}" + end + + def matches?(subject) + subject.send(@operator, @limit) + end + + def failure_message_for_should + "expected #{operator} #{@limit}" + end + + def failure_message_for_should_not + "expected not #{operator} #{@limit}" + end +end + +def be_lt(value) + RangeMatcher.new('<', value) +end + +def be_lte(value) + RangeMatcher.new('<=', value) +end