Fix Mutex to allow lock from different Ruby (green) thread in process

This commit is contained in:
Marc Siegel 2013-12-27 10:46:50 -05:00
parent 3e4fa34694
commit aeadb74ef6
3 changed files with 62 additions and 22 deletions

View File

@ -9,7 +9,7 @@ module ProcessShared
end end
def lock def lock
if locked_by == ::Process.pid if locked_by == current_process_and_thread
@lock_count += 1 @lock_count += 1
else else
super super
@ -17,7 +17,7 @@ module ProcessShared
end end
def unlock def unlock
if locked_by == ::Process.pid if locked_by == current_process_and_thread
if @lock_count > 0 if @lock_count > 0
@lock_count -= 1 @lock_count -= 1
else else

View File

@ -4,13 +4,14 @@ require 'process_shared/process_error'
module ProcessShared module ProcessShared
# This Mutex class is implemented as a Semaphore with a second # This Mutex class is implemented as a Semaphore with a second
# internal Semaphore used to track the locking process is tracked. # internal Semaphore used to track the locking process and thread.
#
# {ProcessError} is raised if either {#unlock} is called by a # {ProcessError} is raised if either {#unlock} is called by a
# process different from the locking process, or if {#lock} is # process + thread different from the locking process + thread, or
# called while the process already holds the lock (i.e. the mutex is # if {#lock} is called while the process + thread already holds the
# not re-entrant). This tracking is not without performance cost, # lock (i.e. the mutex is not re-entrant). This tracking is not
# of course (current implementation uses the additional {Semaphore} # without performance cost, of course (current implementation uses
# and {SharedMemory} segment). # the additional {Semaphore} and {SharedMemory} segment).
# #
# The API is intended to be identical to the {::Mutex} in the core # The API is intended to be identical to the {::Mutex} in the core
# Ruby library. # Ruby library.
@ -23,25 +24,25 @@ module ProcessShared
def initialize def initialize
@internal_sem = Semaphore.new @internal_sem = Semaphore.new
@locked_by = SharedMemory.new(:int) @locked_by = SharedMemory.new(:uint64, 2) # [Process ID, Thread ID]
@sem = Semaphore.new @sem = Semaphore.new
end end
# @return [Mutex] # @return [Mutex]
def lock def lock
if locked_by == ::Process.pid if (p, t = current_process_and_thread) == locked_by
raise ProcessError, "already locked by this process #{::Process.pid}" raise ProcessError, "already locked by this process #{p}, thread #{t}"
end end
@sem.wait @sem.wait
self.locked_by = ::Process.pid self.locked_by = current_process_and_thread
self self
end end
# @return [Boolean] # @return [Boolean]
def locked? def locked?
locked_by > 0 locked_by != UNLOCKED
end end
# Releases the lock and sleeps timeout seconds if it is given and # Releases the lock and sleeps timeout seconds if it is given and
@ -60,11 +61,11 @@ module ProcessShared
# @return [Boolean] # @return [Boolean]
def try_lock def try_lock
with_internal_lock do with_internal_lock do
if @locked_by.get_int(0) > 0 if locked?
false # was locked false # was locked
else else
@sem.wait # should return immediately @sem.wait # should return immediately
self.locked_by = ::Process.pid self.locked_by = current_process_and_thread
true true
end end
end end
@ -72,11 +73,11 @@ module ProcessShared
# @return [Mutex] # @return [Mutex]
def unlock def unlock
if (p = locked_by) != ::Process.pid if (p, t = locked_by) != (cp, ct = current_process_and_thread)
raise ProcessError, "lock is held by #{p} not #{::Process.pid}" raise ProcessError, "lock is held by process #{p}, thread #{t}: not process #{cp}, thread #{ct}"
end end
self.locked_by = 0 self.locked_by = UNLOCKED
@sem.post @sem.post
self self
end end
@ -96,20 +97,32 @@ module ProcessShared
protected protected
# @return [Array<(Fixnum, Fixnum)>]
# If locked, IDs of the locking process and thread, otherwise +UNLOCKED+
def locked_by def locked_by
with_internal_lock do with_internal_lock do
@locked_by.get_int(0) @locked_by.read_array_of_uint64(2)
end end
end end
def locked_by=(val) # @param [Array<(Fixnum, Fixnum)>] ary
# Set the IDs of the locking process and thread, or +UNLOCKED+ if none
def locked_by=(ary)
with_internal_lock do with_internal_lock do
@locked_by.put_int(0, val) @locked_by.write_array_of_uint64(ary)
end end
end end
def with_internal_lock(&block) def with_internal_lock(&block)
@internal_sem.synchronize &block @internal_sem.synchronize &block
end end
# @return [Array<(Fixnum, Fixnum)>] IDs of the current process and thread
def current_process_and_thread
[::Process.pid, Thread.current.object_id]
end
# Represents the state of being unlocked
UNLOCKED = [0, 0].freeze
end end
end end

View File

@ -24,10 +24,37 @@ module ProcessShared
::Process.wait(pid) ::Process.wait(pid)
end end
it 'raises exception when locked twice by same process' do it 'raises exception when unlocked by other thread in same process' do
t = Thread.new do
@lock.lock
sleep 0.2
@lock.unlock
end
sleep 0.1
proc { @lock.unlock }.must_raise(ProcessError)
t.join
end
it 'raises exception when locked twice by same process and thread' do
@lock.lock @lock.lock
proc { @lock.lock }.must_raise(ProcessError) proc { @lock.lock }.must_raise(ProcessError)
@lock.unlock @lock.unlock
end end
it 'does not raise when locked by different threads on same process' do
t = Thread.new do
@lock.lock
sleep 0.2
@lock.unlock
end
sleep 0.1
@lock.synchronize { }
t.join
end
end end
end end