Add SharedMemory#write_object and #read_object; add SharedMemoryIO helper.

This commit is contained in:
Patrick Mahoney 2011-12-20 22:09:38 -06:00
parent cf6b821b17
commit cc663a8d7f
3 changed files with 376 additions and 1 deletions

View File

@ -1,6 +1,7 @@
require 'process_shared/rt' require 'process_shared/rt'
require 'process_shared/libc' require 'process_shared/libc'
require 'process_shared/with_self' require 'process_shared/with_self'
require 'process_shared/shared_memory_io'
module ProcessShared module ProcessShared
# Memory block shared across processes. # Memory block shared across processes.
@ -44,7 +45,9 @@ module ProcessShared
LibC::PROT_READ | LibC::PROT_WRITE, LibC::PROT_READ | LibC::PROT_WRITE,
LibC::MAP_SHARED, LibC::MAP_SHARED,
@fd, @fd,
0) 0).
slice(0, size) # slice to get FFI::Pointer that knows its size
# (and thus does bounds checking)
@finalize = self.class.make_finalizer(@pointer.address, @size, @fd) @finalize = self.class.make_finalizer(@pointer.address, @size, @fd)
ObjectSpace.define_finalizer(self, @finalize) ObjectSpace.define_finalizer(self, @finalize)
@ -52,9 +55,47 @@ module ProcessShared
super(@pointer) super(@pointer)
end end
# Write the serialization of +obj+ (using Marshal.dump) to this
# shared memory object at +offset+ (in bytes).
#
# Raises IndexError if there is insufficient space.
def put_object(offset, obj)
io = SharedMemoryIO.new(self)
io.seek(offset)
Marshal.dump(obj, io)
end
# Read the serialized object at +offset+ (in bytes) using
# Marshal.load.
#
# @return [Object]
def get_object(offset)
io = to_shm_io
io.seek(offset)
Marshal.load(io)
end
# Equivalent to {#put_object(0, obj)}
def write_object(obj)
Marshal.dump(obj, to_shm_io)
end
# Equivalent to {#read_object(0, obj)}
#
# @return [Object]
def read_object
Marshal.load(to_shm_io)
end
def close def close
ObjectSpace.undefine_finalizer(self) ObjectSpace.undefine_finalizer(self)
@finalize.call @finalize.call
end end
private
def to_shm_io
SharedMemoryIO.new(self)
end
end end
end end

View File

@ -0,0 +1,309 @@
module ProcessShared
# Does some bounds checking for EOF, but assumes underlying memory
# object (FFI::Pointer) will do bounds checking, in particular the
# {#_putbytes} method relies on this.
#
# Note: an unbounded FFI::Pointer may be converted into a bounded
# pointer using +ptr.slice(0, size)+.
class SharedMemoryIO
attr_accessor :pos
attr_reader :mem
def initialize(mem)
@mem = mem
@pos = 0
@closed = false # TODO: actually pay attention to this
end
def <<(*args)
raise NotImplementedError
end
def advise(*args)
# no-op
end
def autoclose=(*args)
raise NotImplementedError
end
def autoclose?
raise NotImplementedError
end
def binmode
# no-op; always in binmode
end
def binmode?
true
end
def bytes
if block_given?
until eof?
yield _getbyte
end
else
raise NotImplementedError
end
end
alias_method :each_byte, :bytes
def chars
raise NotImplementedError
end
alias_method :each_char, :chars
def close
@closed = true
end
def close_on_exec=(bool)
raise NotImplementedError
end
def close_on_exec?
raise NotImplementedError
end
def close_read
raise NotImplementedError
end
def close_write
raise NotImplementedError
end
def closed?
@closed
end
def codepoints
raise NotImplementedError
end
alias_method :each_codepoint, :codepoints
def each
raise NotImplementedError
end
alias_method :each_line, :each
alias_method :lines, :each
def eof?
pos == mem.size
end
alias_method :eof, :eof?
def external_encoding
raise NotImplementedError
end
def fcntl
raise NotImplementedError
end
def fdatasync
raise NotImplementedError
end
def fileno
raise NotImplementedError
end
alias_method :to_i, :fileno
def flush
# no-op
end
def fsync
raise NotImplementedError
end
def getbyte
return nil if eof?
_getbyte
end
def getc
raise NotImplementedError
end
def gets
raise NotImplementedError
end
def internal_encoding
raise NotImplementedError
end
def ioctl
raise NotImplementedError
end
def tty?
false
end
alias_method :isatty, :tty?
def lineno
raise NotImplementedError
end
def lineno=
raise NotImplementedError
end
def lines
raise NotImplementedError
end
def pid
raise NotImplementedError
end
alias_method :tell, :pos
def print(*args)
raise NotImplementedError
end
def printf(*args)
raise NotImplementedError
end
def putc(arg)
raise NotImplementedError
end
def puts(*args)
raise NotImplementedError
end
# FIXME: this doesn't match IO#read exactly (corner cases about
# EOF and whether length was nil or not), but it's enough for
# {Marshal::load}.
def read(length = nil, buffer = nil)
length ||= (mem.size - pos)
buffer ||= ''
actual_length = [(mem.size - pos), length].min
actual_length.times do
buffer << _getbyte
end
buffer
end
def read_nonblock(*args)
raise NotImplementedError
end
def readbyte
raise EOFError if eof?
_getbyte
end
def readchar
raise NotImplementedError
end
def readline
raise NotImplementedError
end
def readlines
raise NotImplementedError
end
def readpartial
raise NotImplementedError
end
def reopen
raise NotImplementedError
end
def rewind
pos = 0
end
def seek(amount, whence = IO::SEEK_SET)
case whence
when IO::SEEK_CUR
self.pos += amount
when IO::SEEK_END
self.pos = (mem.size + amount)
when IO::SEEK_SET
self.pos = amount
else
raise ArgumentError, "bad seek whence #{whence}"
end
end
def set_encoding
raise NotImplementedError
end
def stat
raise NotImplementedError
end
def sync
true
end
def sync=
raise NotImplementedError
end
def sysread(*args)
raise NotImplementedError
end
def sysseek(*args)
raise NotImplementedError
end
def syswrite(*args)
raise NotImplementedError
end
def to_io
raise NotImplementedError
end
def ungetbyte
raise IOError if pos == 0
pos -= 1
end
def ungetc
raise NotImplementedError
end
def write(str)
s = str.to_s
_putbytes(s)
s.size
end
def write_nonblock(str)
raise NotImplementedError
end
private
# Like {#getbyte} but does not perform eof check.
def _getbyte
b = mem.get_uchar(pos)
self.pos += 1
b
end
def _putbytes(str)
mem.put_bytes(pos, str, 0, str.size)
self.pos += str.size
end
end
end

View File

@ -32,5 +32,30 @@ module ProcessShared
mem.get_int(0).must_equal(1234567) mem.get_int(0).must_equal(1234567)
end end
describe 'Object dump/load' do
it 'writes serialized objects' do
mem = SharedMemory.new(1024)
pid = fork do
mem.write_object(['a', 'b'])
Kernel.exit!
end
::Process.wait(pid)
mem.read_object.must_equal ['a', 'b']
end
it 'raises IndexError when insufficient space' do
mem = SharedMemory.new(2)
proc { mem.write_object(['a', 'b']) }.must_raise(IndexError)
end
it 'writes with an offset' do
mem = SharedMemory.new(1024)
mem.put_object(2, 'string')
proc { mem.read_object }.must_raise(TypeError)
proc { mem.get_object(0) }.must_raise(TypeError)
mem.get_object(2).must_equal 'string'
end
end
end end
end end