diff --git a/lib/process_shared/monitor.rb b/lib/process_shared/monitor.rb index 51ea44a..e5f0bf5 100644 --- a/lib/process_shared/monitor.rb +++ b/lib/process_shared/monitor.rb @@ -9,7 +9,7 @@ module ProcessShared end def lock - if locked_by == ::Process.pid + if locked_by == current_process_and_thread @lock_count += 1 else super @@ -17,7 +17,7 @@ module ProcessShared end def unlock - if locked_by == ::Process.pid + if locked_by == current_process_and_thread if @lock_count > 0 @lock_count -= 1 else diff --git a/lib/process_shared/mutex.rb b/lib/process_shared/mutex.rb index 2b31d58..04efdeb 100644 --- a/lib/process_shared/mutex.rb +++ b/lib/process_shared/mutex.rb @@ -4,13 +4,14 @@ require 'process_shared/process_error' module ProcessShared # This Mutex class is implemented as a Semaphore with a second - # internal Semaphore used to track the locking process is tracked. + # internal Semaphore used to track the locking process and thread. + # # {ProcessError} is raised if either {#unlock} is called by a - # process different from the locking process, or if {#lock} is - # called while the process already holds the lock (i.e. the mutex is - # not re-entrant). This tracking is not without performance cost, - # of course (current implementation uses the additional {Semaphore} - # and {SharedMemory} segment). + # process + thread different from the locking process + thread, or + # if {#lock} is called while the process + thread already holds the + # lock (i.e. the mutex is not re-entrant). This tracking is not + # without performance cost, of course (current implementation uses + # the additional {Semaphore} and {SharedMemory} segment). # # The API is intended to be identical to the {::Mutex} in the core # Ruby library. @@ -23,25 +24,25 @@ module ProcessShared def initialize @internal_sem = Semaphore.new - @locked_by = SharedMemory.new(:int) + @locked_by = SharedMemory.new(:uint64, 2) # [Process ID, Thread ID] @sem = Semaphore.new end # @return [Mutex] def lock - if locked_by == ::Process.pid - raise ProcessError, "already locked by this process #{::Process.pid}" + if (p, t = current_process_and_thread) == locked_by + raise ProcessError, "already locked by this process #{p}, thread #{t}" end @sem.wait - self.locked_by = ::Process.pid + self.locked_by = current_process_and_thread self end # @return [Boolean] def locked? - locked_by > 0 + locked_by != UNLOCKED end # Releases the lock and sleeps timeout seconds if it is given and @@ -60,11 +61,11 @@ module ProcessShared # @return [Boolean] def try_lock with_internal_lock do - if @locked_by.get_int(0) > 0 + if locked? false # was locked else @sem.wait # should return immediately - self.locked_by = ::Process.pid + self.locked_by = current_process_and_thread true end end @@ -72,11 +73,11 @@ module ProcessShared # @return [Mutex] def unlock - if (p = locked_by) != ::Process.pid - raise ProcessError, "lock is held by #{p} not #{::Process.pid}" + if (p, t = locked_by) != (cp, ct = current_process_and_thread) + raise ProcessError, "lock is held by process #{p}, thread #{t}: not process #{cp}, thread #{ct}" end - self.locked_by = 0 + self.locked_by = UNLOCKED @sem.post self end @@ -96,20 +97,32 @@ module ProcessShared protected + # @return [Array<(Fixnum, Fixnum)>] + # If locked, IDs of the locking process and thread, otherwise +UNLOCKED+ def locked_by with_internal_lock do - @locked_by.get_int(0) + @locked_by.read_array_of_uint64(2) end end - def locked_by=(val) + # @param [Array<(Fixnum, Fixnum)>] ary + # Set the IDs of the locking process and thread, or +UNLOCKED+ if none + def locked_by=(ary) with_internal_lock do - @locked_by.put_int(0, val) + @locked_by.write_array_of_uint64(ary) end end def with_internal_lock(&block) @internal_sem.synchronize &block end + + # @return [Array<(Fixnum, Fixnum)>] IDs of the current process and thread + def current_process_and_thread + [::Process.pid, Thread.current.object_id] + end + + # Represents the state of being unlocked + UNLOCKED = [0, 0].freeze end end diff --git a/spec/process_shared/mutex_spec.rb b/spec/process_shared/mutex_spec.rb index 31e167e..9a0fd7e 100644 --- a/spec/process_shared/mutex_spec.rb +++ b/spec/process_shared/mutex_spec.rb @@ -24,10 +24,37 @@ module ProcessShared ::Process.wait(pid) end - it 'raises exception when locked twice by same process' do + it 'raises exception when unlocked by other thread in same process' do + t = Thread.new do + @lock.lock + sleep 0.2 + @lock.unlock + end + + sleep 0.1 + proc { @lock.unlock }.must_raise(ProcessError) + + t.join + end + + it 'raises exception when locked twice by same process and thread' do @lock.lock proc { @lock.lock }.must_raise(ProcessError) @lock.unlock end + + it 'does not raise when locked by different threads on same process' do + t = Thread.new do + @lock.lock + sleep 0.2 + @lock.unlock + end + + sleep 0.1 + @lock.synchronize { } + + t.join + end + end end