Merge pull request #6 from ms-ati/allow-mutex-lock-from-different-thread
Fix Mutex to allow lock from different Ruby (green) thread in process
This commit is contained in:
		
						commit
						dddf0be4ef
					
				|  | @ -9,7 +9,7 @@ module ProcessShared | |||
|     end | ||||
| 
 | ||||
|     def lock | ||||
|       if locked_by == ::Process.pid | ||||
|       if locked_by == current_process_and_thread | ||||
|         @lock_count += 1 | ||||
|       else | ||||
|         super | ||||
|  | @ -17,7 +17,7 @@ module ProcessShared | |||
|     end | ||||
| 
 | ||||
|     def unlock | ||||
|       if locked_by == ::Process.pid | ||||
|       if locked_by == current_process_and_thread | ||||
|         if @lock_count > 0 | ||||
|           @lock_count -= 1 | ||||
|         else | ||||
|  |  | |||
|  | @ -4,13 +4,14 @@ require 'process_shared/process_error' | |||
| 
 | ||||
| module ProcessShared | ||||
|   # This Mutex class is implemented as a Semaphore with a second | ||||
|   # internal Semaphore used to track the locking process is tracked. | ||||
|   # internal Semaphore used to track the locking process and thread. | ||||
|   # | ||||
|   # {ProcessError} is raised if either {#unlock} is called by a | ||||
|   # process different from the locking process, or if {#lock} is | ||||
|   # called while the process already holds the lock (i.e. the mutex is | ||||
|   # not re-entrant).  This tracking is not without performance cost, | ||||
|   # of course (current implementation uses the additional {Semaphore} | ||||
|   # and {SharedMemory} segment). | ||||
|   # process + thread different from the locking process + thread, or | ||||
|   # if {#lock} is called while the process + thread already holds the | ||||
|   # lock (i.e. the mutex is not re-entrant).  This tracking is not | ||||
|   # without performance cost, of course (current implementation uses | ||||
|   # the additional {Semaphore} and {SharedMemory} segment). | ||||
|   # | ||||
|   # The API is intended to be identical to the {::Mutex} in the core | ||||
|   # Ruby library. | ||||
|  | @ -23,25 +24,25 @@ module ProcessShared | |||
| 
 | ||||
|     def initialize | ||||
|       @internal_sem = Semaphore.new | ||||
|       @locked_by = SharedMemory.new(:int) | ||||
|       @locked_by = SharedMemory.new(:uint64, 2)  # [Process ID, Thread ID] | ||||
| 
 | ||||
|       @sem = Semaphore.new | ||||
|     end | ||||
| 
 | ||||
|     # @return [Mutex] | ||||
|     def lock | ||||
|       if locked_by == ::Process.pid | ||||
|         raise ProcessError, "already locked by this process #{::Process.pid}" | ||||
|       if (p, t = current_process_and_thread) == locked_by | ||||
|         raise ProcessError, "already locked by this process #{p}, thread #{t}" | ||||
|       end | ||||
| 
 | ||||
|       @sem.wait | ||||
|       self.locked_by = ::Process.pid | ||||
|       self.locked_by = current_process_and_thread | ||||
|       self | ||||
|     end | ||||
| 
 | ||||
|     # @return [Boolean] | ||||
|     def locked? | ||||
|       locked_by > 0 | ||||
|       locked_by != UNLOCKED | ||||
|     end | ||||
| 
 | ||||
|     # Releases the lock and sleeps timeout seconds if it is given and | ||||
|  | @ -60,11 +61,11 @@ module ProcessShared | |||
|     # @return [Boolean] | ||||
|     def try_lock | ||||
|       with_internal_lock do | ||||
|         if @locked_by.get_int(0) > 0 | ||||
|         if locked? | ||||
|           false                 # was locked | ||||
|         else | ||||
|           @sem.wait             # should return immediately | ||||
|           self.locked_by = ::Process.pid | ||||
|           self.locked_by = current_process_and_thread | ||||
|           true | ||||
|         end | ||||
|       end | ||||
|  | @ -72,11 +73,11 @@ module ProcessShared | |||
| 
 | ||||
|     # @return [Mutex] | ||||
|     def unlock | ||||
|       if (p = locked_by) != ::Process.pid | ||||
|         raise ProcessError, "lock is held by #{p} not #{::Process.pid}" | ||||
|       if (p, t = locked_by) != (cp, ct = current_process_and_thread) | ||||
|         raise ProcessError, "lock is held by process #{p}, thread #{t}: not process #{cp}, thread #{ct}" | ||||
|       end | ||||
| 
 | ||||
|       self.locked_by = 0 | ||||
|       self.locked_by = UNLOCKED | ||||
|       @sem.post | ||||
|       self | ||||
|     end | ||||
|  | @ -96,20 +97,32 @@ module ProcessShared | |||
| 
 | ||||
|     protected | ||||
| 
 | ||||
|     # @return [Array<(Fixnum, Fixnum)>] | ||||
|     #   If locked, IDs of the locking process and thread, otherwise +UNLOCKED+ | ||||
|     def locked_by | ||||
|       with_internal_lock do | ||||
|         @locked_by.get_int(0) | ||||
|         @locked_by.read_array_of_uint64(2) | ||||
|       end | ||||
|     end | ||||
| 
 | ||||
|     def locked_by=(val) | ||||
|     # @param [Array<(Fixnum, Fixnum)>] ary | ||||
|     #   Set the IDs of the locking process and thread, or +UNLOCKED+ if none | ||||
|     def locked_by=(ary) | ||||
|       with_internal_lock do | ||||
|         @locked_by.put_int(0, val) | ||||
|         @locked_by.write_array_of_uint64(ary) | ||||
|       end | ||||
|     end | ||||
| 
 | ||||
|     def with_internal_lock(&block) | ||||
|       @internal_sem.synchronize &block | ||||
|     end | ||||
| 
 | ||||
|     # @return [Array<(Fixnum, Fixnum)>]  IDs of the current process and thread | ||||
|     def current_process_and_thread | ||||
|       [::Process.pid, Thread.current.object_id] | ||||
|     end | ||||
| 
 | ||||
|     # Represents the state of being unlocked | ||||
|     UNLOCKED = [0, 0].freeze | ||||
|   end | ||||
| end | ||||
|  |  | |||
|  | @ -24,10 +24,37 @@ module ProcessShared | |||
|       ::Process.wait(pid) | ||||
|     end | ||||
| 
 | ||||
|     it 'raises exception when locked twice by same process' do | ||||
|     it 'raises exception when unlocked by other thread in same process' do | ||||
|       t = Thread.new do | ||||
|         @lock.lock | ||||
|         sleep 0.2 | ||||
|         @lock.unlock | ||||
|       end | ||||
| 
 | ||||
|       sleep 0.1 | ||||
|       proc { @lock.unlock }.must_raise(ProcessError) | ||||
| 
 | ||||
|       t.join | ||||
|     end | ||||
| 
 | ||||
|     it 'raises exception when locked twice by same process and thread' do | ||||
|       @lock.lock | ||||
|       proc { @lock.lock }.must_raise(ProcessError) | ||||
|       @lock.unlock | ||||
|     end | ||||
| 
 | ||||
|     it 'does not raise when locked by different threads on same process' do | ||||
|       t = Thread.new do | ||||
|         @lock.lock | ||||
|         sleep 0.2 | ||||
|         @lock.unlock | ||||
|       end | ||||
| 
 | ||||
|       sleep 0.1 | ||||
|       @lock.synchronize { } | ||||
| 
 | ||||
|       t.join | ||||
|     end | ||||
| 
 | ||||
|   end | ||||
| end | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue