Split SharedMemory into module and platform dependent bits. Make SharedArray extend platform dependent class.
This commit is contained in:
parent
db108a1913
commit
6f63c67c6e
|
@ -19,7 +19,23 @@ if RUBY_VERSION =~ /^1.8/
|
||||||
end
|
end
|
||||||
|
|
||||||
require 'process_shared/semaphore'
|
require 'process_shared/semaphore'
|
||||||
require 'process_shared/bounded_semaphore'
|
require 'process_shared/binary_semaphore'
|
||||||
require 'process_shared/mutex'
|
require 'process_shared/mutex'
|
||||||
require 'process_shared/shared_memory'
|
require 'process_shared/shared_memory'
|
||||||
|
|
||||||
|
module ProcessShared
|
||||||
|
case FFI::Platform::OS
|
||||||
|
when 'linux'
|
||||||
|
require 'process_shared/posix/shared_memory'
|
||||||
|
require 'process_shared/posix/semaphore'
|
||||||
|
|
||||||
|
SharedMemory.impl = Posix::SharedMemory
|
||||||
|
Semaphore.impl = Posix::Semaphore
|
||||||
|
when 'darwin'
|
||||||
|
require 'process_shared/posix/shared_memory'
|
||||||
|
require 'process_shared/mach/semaphore'
|
||||||
|
|
||||||
|
SharedMemory.impl = Posix::SharedMemory
|
||||||
|
Semaphore.impl = Mach::Semaphore
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -2,8 +2,7 @@ require 'ffi'
|
||||||
|
|
||||||
require 'process_shared/posix/errno'
|
require 'process_shared/posix/errno'
|
||||||
require 'process_shared/posix/libc'
|
require 'process_shared/posix/libc'
|
||||||
require 'process_shared/shared_memory_io'
|
require 'process_shared/shared_memory'
|
||||||
require 'process_shared/with_self'
|
|
||||||
|
|
||||||
module ProcessShared
|
module ProcessShared
|
||||||
module Posix
|
module Posix
|
||||||
|
@ -29,14 +28,10 @@ module ProcessShared
|
||||||
include SharedMemory::Foreign
|
include SharedMemory::Foreign
|
||||||
include LibC
|
include LibC
|
||||||
|
|
||||||
include ProcessShared::WithSelf
|
include ProcessShared::SharedMemory
|
||||||
|
|
||||||
attr_reader :size, :type, :type_size, :count, :fd
|
attr_reader :size, :type, :type_size, :count, :fd
|
||||||
|
|
||||||
def self.open(size, &block)
|
|
||||||
new(size).with_self(&block)
|
|
||||||
end
|
|
||||||
|
|
||||||
def self.make_finalizer(addr, size, fd)
|
def self.make_finalizer(addr, size, fd)
|
||||||
proc do
|
proc do
|
||||||
pointer = FFI::Pointer.new(addr)
|
pointer = FFI::Pointer.new(addr)
|
||||||
|
@ -78,60 +73,10 @@ module ProcessShared
|
||||||
super(@pointer)
|
super(@pointer)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Write the serialization of +obj+ (using Marshal.dump) to this
|
|
||||||
# shared memory object at +offset+ (in bytes).
|
|
||||||
#
|
|
||||||
# Raises IndexError if there is insufficient space.
|
|
||||||
def put_object(offset, obj)
|
|
||||||
# FIXME: This is a workaround to an issue I'm seeing in
|
|
||||||
# 1.8.7-p352 (not tested in other 1.8's). If I used the code
|
|
||||||
# below that works in 1.9, then inside SharedMemoryIO#write, the
|
|
||||||
# passed string object is 'terminated' (garbage collected?) and
|
|
||||||
# won't respond to any methods... This less efficient since it
|
|
||||||
# involves the creation of an intermediate string, but it works
|
|
||||||
# in 1.8.7-p352.
|
|
||||||
if RUBY_VERSION =~ /^1.8/
|
|
||||||
str = Marshal.dump(obj)
|
|
||||||
return put_bytes(offset, str, 0, str.size)
|
|
||||||
end
|
|
||||||
|
|
||||||
io = to_shm_io
|
|
||||||
io.seek(offset)
|
|
||||||
Marshal.dump(obj, io)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Read the serialized object at +offset+ (in bytes) using
|
|
||||||
# Marshal.load.
|
|
||||||
#
|
|
||||||
# @return [Object]
|
|
||||||
def get_object(offset)
|
|
||||||
io = to_shm_io
|
|
||||||
io.seek(offset)
|
|
||||||
Marshal.load(io)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Equivalent to {#put_object(0, obj)}
|
|
||||||
def write_object(obj)
|
|
||||||
put_object(0, obj)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Equivalent to {#read_object(0, obj)}
|
|
||||||
#
|
|
||||||
# @return [Object]
|
|
||||||
def read_object
|
|
||||||
Marshal.load(to_shm_io)
|
|
||||||
end
|
|
||||||
|
|
||||||
def close
|
def close
|
||||||
ObjectSpace.undefine_finalizer(self)
|
ObjectSpace.undefine_finalizer(self)
|
||||||
@finalize.call
|
@finalize.call
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
def to_shm_io
|
|
||||||
ProcessShared::SharedMemoryIO.new(self)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
|
require 'process_shared'
|
||||||
|
|
||||||
module ProcessShared
|
module ProcessShared
|
||||||
class SharedArray < SharedMemory
|
class SharedArray < SharedMemory.impl
|
||||||
include Enumerable
|
include Enumerable
|
||||||
|
|
||||||
# A fixed-size array in shared memory. Processes forked from this
|
# A fixed-size array in shared memory. Processes forked from this
|
||||||
|
|
|
@ -5,54 +5,19 @@ require 'process_shared/shared_memory_io'
|
||||||
|
|
||||||
module ProcessShared
|
module ProcessShared
|
||||||
# Memory block shared across processes.
|
# Memory block shared across processes.
|
||||||
class SharedMemory < FFI::Pointer
|
module SharedMemory
|
||||||
include WithSelf
|
include ProcessShared::WithSelf
|
||||||
|
|
||||||
attr_reader :size, :type, :type_size, :count, :fd
|
class << self
|
||||||
|
attr_accessor :impl
|
||||||
|
|
||||||
def self.open(size, &block)
|
def new(*args)
|
||||||
|
impl.new(*args)
|
||||||
|
end
|
||||||
|
|
||||||
|
def open(size, &block)
|
||||||
new(size).with_self(&block)
|
new(size).with_self(&block)
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.make_finalizer(addr, size, fd)
|
|
||||||
proc do
|
|
||||||
pointer = FFI::Pointer.new(addr)
|
|
||||||
LibC.munmap(pointer, size)
|
|
||||||
LibC.close(fd)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def initialize(type_or_count = 1, count = 1)
|
|
||||||
@type, @count = case type_or_count
|
|
||||||
when Symbol
|
|
||||||
[type_or_count, count]
|
|
||||||
else
|
|
||||||
[:uchar, type_or_count]
|
|
||||||
end
|
|
||||||
|
|
||||||
@type_size = FFI.type_size(@type)
|
|
||||||
@size = @type_size * @count
|
|
||||||
|
|
||||||
name = "/ps-shm#{rand(10000)}"
|
|
||||||
@fd = RT.shm_open(name,
|
|
||||||
LibC::O_CREAT | LibC::O_RDWR | LibC::O_EXCL,
|
|
||||||
0777)
|
|
||||||
RT.shm_unlink(name)
|
|
||||||
|
|
||||||
LibC.ftruncate(@fd, @size)
|
|
||||||
@pointer = LibC.mmap(nil,
|
|
||||||
@size,
|
|
||||||
LibC::PROT_READ | LibC::PROT_WRITE,
|
|
||||||
LibC::MAP_SHARED,
|
|
||||||
@fd,
|
|
||||||
0).
|
|
||||||
slice(0, size) # slice to get FFI::Pointer that knows its size
|
|
||||||
# (and thus does bounds checking)
|
|
||||||
|
|
||||||
@finalize = self.class.make_finalizer(@pointer.address, @size, @fd)
|
|
||||||
ObjectSpace.define_finalizer(self, @finalize)
|
|
||||||
|
|
||||||
super(@pointer)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Write the serialization of +obj+ (using Marshal.dump) to this
|
# Write the serialization of +obj+ (using Marshal.dump) to this
|
||||||
|
@ -99,12 +64,7 @@ module ProcessShared
|
||||||
Marshal.load(to_shm_io)
|
Marshal.load(to_shm_io)
|
||||||
end
|
end
|
||||||
|
|
||||||
def close
|
protected
|
||||||
ObjectSpace.undefine_finalizer(self)
|
|
||||||
@finalize.call
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
def to_shm_io
|
def to_shm_io
|
||||||
SharedMemoryIO.new(self)
|
SharedMemoryIO.new(self)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
require 'process_shared/shared_memory'
|
require 'process_shared'
|
||||||
|
|
||||||
module ProcessShared
|
module ProcessShared
|
||||||
describe SharedMemory do
|
describe SharedMemory do
|
||||||
|
|
Loading…
Reference in New Issue