diff --git a/ext/libpsem/psem_posix.c b/ext/libpsem/psem_posix.c index 740145e..42def6f 100644 --- a/ext/libpsem/psem_posix.c +++ b/ext/libpsem/psem_posix.c @@ -68,7 +68,6 @@ psem_free(psem_t *psem) { error_new((err), E_SOURCE_SYSTEM, errno); \ return ERROR; \ } \ - return OK; \ } while (0) #define errcheck(expr, err) errcheck_val((expr), -1, (err)) @@ -79,52 +78,77 @@ psem_open(psem_t *psem, const char *name, unsigned int value, error_t **err) errcheck_val(psem->sem = sem_open(name, O_CREAT | O_EXCL, 0600, value), SEM_FAILED, err); + return OK; } int psem_close(psem_t *psem, error_t **err) { errcheck(sem_close(psem->sem), err); + return OK; } int psem_unlink(const char *name, error_t **err) { errcheck(sem_unlink(name), err); + return OK; } int psem_post(psem_t *psem, error_t **err) { errcheck(sem_post(psem->sem), err); + return OK; } int psem_wait(psem_t *psem, error_t **err) { errcheck(sem_wait(psem->sem), err); + return OK; } int psem_trywait(psem_t *psem, error_t **err) { errcheck(sem_trywait(psem->sem), err); + return OK; } +#define NS_PER_S (1000 * 1000 * 1000) +#define US_PER_NS (1000) + int psem_timedwait(psem_t *psem, float timeout_s, error_t **err) { + struct timeval now; struct timespec abs_timeout; - abs_timeout.tv_sec = floorf(timeout_s); - abs_timeout.tv_nsec = - floorf((timeout_s - abs_timeout.tv_sec) * (1000 * 1000 * 1000)); + errcheck(gettimeofday(&now, NULL), err); + abs_timeout.tv_sec = now.tv_sec; + abs_timeout.tv_nsec = now.tv_usec * US_PER_NS; + + /* Fun with rounding: careful adding reltive timeout to abs time */ + { + time_t sec; /* relative timeout */ + long nsec; + + sec = floorf(timeout_s); + nsec = floorf((timeout_s - floorf(timeout_s)) * NS_PER_S); + + abs_timeout.tv_sec += sec; + abs_timeout.tv_nsec += nsec; + } errcheck(sem_timedwait(psem->sem, &abs_timeout), err); + return OK; } int psem_getvalue(psem_t *psem, int *sval, error_t **err) { errcheck(sem_getvalue(psem->sem, sval), err); + return OK; } + diff --git a/lib/process_shared/abstract_semaphore.rb b/lib/process_shared/abstract_semaphore.rb index 20e3077..8dd4ea6 100644 --- a/lib/process_shared/abstract_semaphore.rb +++ b/lib/process_shared/abstract_semaphore.rb @@ -36,6 +36,15 @@ module ProcessShared # private_class_method :new + def synchronize + wait + begin + yield + ensure + post + end + end + protected attr_reader :sem, :err diff --git a/lib/process_shared/psem.rb b/lib/process_shared/psem.rb index bc42e19..662ef55 100644 --- a/lib/process_shared/psem.rb +++ b/lib/process_shared/psem.rb @@ -90,7 +90,7 @@ module ProcessShared attach_function :psem_post, [:pointer, :pointer], :int attach_function :psem_wait, [:pointer, :pointer], :int attach_function :psem_trywait, [:pointer, :pointer], :int - attach_function :psem_timedwait, [:pointer, :pointer, :pointer], :int + attach_function :psem_timedwait, [:pointer, :float, :pointer], :int attach_function :psem_getvalue, [:pointer, :pointer, :pointer], :int psem_error_check(:psem_open, :psem_close, :psem_unlink, :psem_post, @@ -104,7 +104,7 @@ module ProcessShared attach_function :bsem_post, [:pointer, :pointer], :int attach_function :bsem_wait, [:pointer, :pointer], :int attach_function :bsem_trywait, [:pointer, :pointer], :int - attach_function :bsem_timedwait, [:pointer, :pointer, :pointer], :int + attach_function :bsem_timedwait, [:pointer, :float, :pointer], :int attach_function :bsem_getvalue, [:pointer, :pointer, :pointer], :int psem_error_check(:bsem_open, :bsem_close, :bsem_unlink, :bsem_post, diff --git a/lib/process_shared/semaphore.rb b/lib/process_shared/semaphore.rb index 0ed712c..f24335b 100644 --- a/lib/process_shared/semaphore.rb +++ b/lib/process_shared/semaphore.rb @@ -38,6 +38,24 @@ module ProcessShared psem_wait(sem, err) end + # Decrement the value of the semaphore if it can be done + # immediately (i.e. if it was non-zero). Otherwise, wait up to + # +timeout+ seconds until another process increments via {#post}. + # + # @param timeout [Numeric] the maximum seconds to wait, or nil to not wait + # + # @return If +timeout+ is nil and the semaphore cannot be + # decremented immediately, raise Errno::EAGAIN. If +timeout+ + # passed before the semaphore could be decremented, raise + # Errno::ETIMEDOUT. + def try_wait(timeout = nil) + if timeout + psem_timedwait(sem, timeout, err) + else + psem_trywait(sem, err) + end + end + # Increment the value of the semaphore. If other processes are # waiting on this semaphore, one will be woken. def post diff --git a/spec/process_shared/semaphore_spec.rb b/spec/process_shared/semaphore_spec.rb index 8201d1e..0e3acc4 100644 --- a/spec/process_shared/semaphore_spec.rb +++ b/spec/process_shared/semaphore_spec.rb @@ -72,5 +72,29 @@ module ProcessShared end end end + + describe '#try_wait' do + it 'returns immediately with non-zero semaphore' do + Semaphore.open(1) do |sem| + start = Time.now.to_f + sem.try_wait + (Time.now.to_f - start).must be_lt(0.01) + end + end + + it 'raises EAGAIN with zero semaphore' do + Semaphore.open(0) do |sem| + proc { sem.try_wait }.must_raise(Errno::EAGAIN) + end + end + + it 'raises ETIMEDOUT after timeout expires' do + Semaphore.open(0) do |sem| + start = Time.now.to_f + proc { sem.try_wait(0.1) }.must_raise(Errno::ETIMEDOUT) + (Time.now.to_f - start).must be_gte(0.1) + end + end + end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5a24791..98ea4ef 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -17,15 +17,16 @@ class RangeMatcher end def matches?(subject) + @subject = subject subject.send(@operator, @limit) end def failure_message_for_should - "expected #{operator} #{@limit}" + "expected #{@operator} #{@limit}, not #{@subject}" end def failure_message_for_should_not - "expected not #{operator} #{@limit}" + "expected not #{@operator} #{@limit}, not #{@subject}" end end @@ -33,6 +34,14 @@ def be_lt(value) RangeMatcher.new('<', value) end +def be_gt(value) + RangeMatcher.new('>', value) +end + def be_lte(value) RangeMatcher.new('<=', value) end + +def be_gte(value) + RangeMatcher.new('>=', value) +end