Add Semaphore#try_wait. Fix libpsem to convert relative timeout to absolute timeout.

This commit is contained in:
Patrick Mahoney 2011-12-17 09:58:10 -06:00
parent f65ae434e4
commit 7230e9f2c9
6 changed files with 92 additions and 8 deletions

View File

@ -68,7 +68,6 @@ psem_free(psem_t *psem) {
error_new((err), E_SOURCE_SYSTEM, errno); \ error_new((err), E_SOURCE_SYSTEM, errno); \
return ERROR; \ return ERROR; \
} \ } \
return OK; \
} while (0) } while (0)
#define errcheck(expr, err) errcheck_val((expr), -1, (err)) #define errcheck(expr, err) errcheck_val((expr), -1, (err))
@ -79,52 +78,77 @@ psem_open(psem_t *psem, const char *name, unsigned int value, error_t **err)
errcheck_val(psem->sem = sem_open(name, O_CREAT | O_EXCL, 0600, value), errcheck_val(psem->sem = sem_open(name, O_CREAT | O_EXCL, 0600, value),
SEM_FAILED, SEM_FAILED,
err); err);
return OK;
} }
int int
psem_close(psem_t *psem, error_t **err) psem_close(psem_t *psem, error_t **err)
{ {
errcheck(sem_close(psem->sem), err); errcheck(sem_close(psem->sem), err);
return OK;
} }
int int
psem_unlink(const char *name, error_t **err) psem_unlink(const char *name, error_t **err)
{ {
errcheck(sem_unlink(name), err); errcheck(sem_unlink(name), err);
return OK;
} }
int int
psem_post(psem_t *psem, error_t **err) psem_post(psem_t *psem, error_t **err)
{ {
errcheck(sem_post(psem->sem), err); errcheck(sem_post(psem->sem), err);
return OK;
} }
int int
psem_wait(psem_t *psem, error_t **err) psem_wait(psem_t *psem, error_t **err)
{ {
errcheck(sem_wait(psem->sem), err); errcheck(sem_wait(psem->sem), err);
return OK;
} }
int int
psem_trywait(psem_t *psem, error_t **err) psem_trywait(psem_t *psem, error_t **err)
{ {
errcheck(sem_trywait(psem->sem), err); errcheck(sem_trywait(psem->sem), err);
return OK;
} }
#define NS_PER_S (1000 * 1000 * 1000)
#define US_PER_NS (1000)
int int
psem_timedwait(psem_t *psem, float timeout_s, error_t **err) psem_timedwait(psem_t *psem, float timeout_s, error_t **err)
{ {
struct timeval now;
struct timespec abs_timeout; struct timespec abs_timeout;
abs_timeout.tv_sec = floorf(timeout_s); errcheck(gettimeofday(&now, NULL), err);
abs_timeout.tv_nsec = abs_timeout.tv_sec = now.tv_sec;
floorf((timeout_s - abs_timeout.tv_sec) * (1000 * 1000 * 1000)); abs_timeout.tv_nsec = now.tv_usec * US_PER_NS;
/* Fun with rounding: careful adding reltive timeout to abs time */
{
time_t sec; /* relative timeout */
long nsec;
sec = floorf(timeout_s);
nsec = floorf((timeout_s - floorf(timeout_s)) * NS_PER_S);
abs_timeout.tv_sec += sec;
abs_timeout.tv_nsec += nsec;
}
errcheck(sem_timedwait(psem->sem, &abs_timeout), err); errcheck(sem_timedwait(psem->sem, &abs_timeout), err);
return OK;
} }
int int
psem_getvalue(psem_t *psem, int *sval, error_t **err) psem_getvalue(psem_t *psem, int *sval, error_t **err)
{ {
errcheck(sem_getvalue(psem->sem, sval), err); errcheck(sem_getvalue(psem->sem, sval), err);
return OK;
} }

View File

@ -36,6 +36,15 @@ module ProcessShared
# private_class_method :new # private_class_method :new
def synchronize
wait
begin
yield
ensure
post
end
end
protected protected
attr_reader :sem, :err attr_reader :sem, :err

View File

@ -90,7 +90,7 @@ module ProcessShared
attach_function :psem_post, [:pointer, :pointer], :int attach_function :psem_post, [:pointer, :pointer], :int
attach_function :psem_wait, [:pointer, :pointer], :int attach_function :psem_wait, [:pointer, :pointer], :int
attach_function :psem_trywait, [:pointer, :pointer], :int attach_function :psem_trywait, [:pointer, :pointer], :int
attach_function :psem_timedwait, [:pointer, :pointer, :pointer], :int attach_function :psem_timedwait, [:pointer, :float, :pointer], :int
attach_function :psem_getvalue, [:pointer, :pointer, :pointer], :int attach_function :psem_getvalue, [:pointer, :pointer, :pointer], :int
psem_error_check(:psem_open, :psem_close, :psem_unlink, :psem_post, psem_error_check(:psem_open, :psem_close, :psem_unlink, :psem_post,
@ -104,7 +104,7 @@ module ProcessShared
attach_function :bsem_post, [:pointer, :pointer], :int attach_function :bsem_post, [:pointer, :pointer], :int
attach_function :bsem_wait, [:pointer, :pointer], :int attach_function :bsem_wait, [:pointer, :pointer], :int
attach_function :bsem_trywait, [:pointer, :pointer], :int attach_function :bsem_trywait, [:pointer, :pointer], :int
attach_function :bsem_timedwait, [:pointer, :pointer, :pointer], :int attach_function :bsem_timedwait, [:pointer, :float, :pointer], :int
attach_function :bsem_getvalue, [:pointer, :pointer, :pointer], :int attach_function :bsem_getvalue, [:pointer, :pointer, :pointer], :int
psem_error_check(:bsem_open, :bsem_close, :bsem_unlink, :bsem_post, psem_error_check(:bsem_open, :bsem_close, :bsem_unlink, :bsem_post,

View File

@ -38,6 +38,24 @@ module ProcessShared
psem_wait(sem, err) psem_wait(sem, err)
end end
# Decrement the value of the semaphore if it can be done
# immediately (i.e. if it was non-zero). Otherwise, wait up to
# +timeout+ seconds until another process increments via {#post}.
#
# @param timeout [Numeric] the maximum seconds to wait, or nil to not wait
#
# @return If +timeout+ is nil and the semaphore cannot be
# decremented immediately, raise Errno::EAGAIN. If +timeout+
# passed before the semaphore could be decremented, raise
# Errno::ETIMEDOUT.
def try_wait(timeout = nil)
if timeout
psem_timedwait(sem, timeout, err)
else
psem_trywait(sem, err)
end
end
# Increment the value of the semaphore. If other processes are # Increment the value of the semaphore. If other processes are
# waiting on this semaphore, one will be woken. # waiting on this semaphore, one will be woken.
def post def post

View File

@ -72,5 +72,29 @@ module ProcessShared
end end
end end
end end
describe '#try_wait' do
it 'returns immediately with non-zero semaphore' do
Semaphore.open(1) do |sem|
start = Time.now.to_f
sem.try_wait
(Time.now.to_f - start).must be_lt(0.01)
end
end
it 'raises EAGAIN with zero semaphore' do
Semaphore.open(0) do |sem|
proc { sem.try_wait }.must_raise(Errno::EAGAIN)
end
end
it 'raises ETIMEDOUT after timeout expires' do
Semaphore.open(0) do |sem|
start = Time.now.to_f
proc { sem.try_wait(0.1) }.must_raise(Errno::ETIMEDOUT)
(Time.now.to_f - start).must be_gte(0.1)
end
end
end
end end
end end

View File

@ -17,15 +17,16 @@ class RangeMatcher
end end
def matches?(subject) def matches?(subject)
@subject = subject
subject.send(@operator, @limit) subject.send(@operator, @limit)
end end
def failure_message_for_should def failure_message_for_should
"expected #{operator} #{@limit}" "expected #{@operator} #{@limit}, not #{@subject}"
end end
def failure_message_for_should_not def failure_message_for_should_not
"expected not #{operator} #{@limit}" "expected not #{@operator} #{@limit}, not #{@subject}"
end end
end end
@ -33,6 +34,14 @@ def be_lt(value)
RangeMatcher.new('<', value) RangeMatcher.new('<', value)
end end
def be_gt(value)
RangeMatcher.new('>', value)
end
def be_lte(value) def be_lte(value)
RangeMatcher.new('<=', value) RangeMatcher.new('<=', value)
end end
def be_gte(value)
RangeMatcher.new('>=', value)
end