bundle-new/lib/bundler/parallel_workers/unix_worker.rb

102 lines
3.0 KiB
Ruby

module Bundler
module ParallelWorkers
# UnixWorker is used only on platforms where fork is available. The way
# this code works is, it forks a preconfigured number of workers and then
# It starts preconfigured number of threads that write to the connected pipe.
class UnixWorker < Worker
class JobHandler < Struct.new(:pid, :io_r, :io_w)
def work(obj)
Marshal.dump obj, io_w
Marshal.load io_r
rescue IOError, Errno::EPIPE
nil
end
end
def initialize(size, job)
# Close the persistent connections for the main thread before forking
Net::HTTP::Persistent.new('bundler', :ENV).shutdown
super
end
private
# Start forked workers for downloading gems. This version of worker
# is only used on platforms where fork is available.
#
# @param size [Integer] Size of worker pool
# @param func [Proc] Job that should be executed in the worker
def prepare_workers(size, func)
@workers = size.times.map do |num|
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
pid = Process.fork do
begin
parent_read.close
parent_write.close
while !child_read.eof?
obj = Marshal.load child_read
Marshal.dump func.call(obj, num), child_write
end
rescue Exception => e
begin
Marshal.dump WrappedException.new(e), child_write
rescue Errno::EPIPE
nil
end
ensure
child_read.close
child_write.close
end
end
child_read.close
child_write.close
JobHandler.new pid, parent_read, parent_write
end
end
# Start the threads whose job is basically to wait for incoming messages
# on request queue and write that message to the connected pipe. Also retrieve
# messages from child worker via connected pipe and write the message to response queue
#
# @param size [Integer] Number of threads to be started
def prepare_threads(size)
@threads = size.times.map do |i|
Thread.start do
worker = @workers[i]
loop do
obj = @request_queue.deq
break if obj.equal? POISON
@response_queue.enq worker.work(obj)
end
end
end
end
# Kill the forked workers by sending SIGINT to them
def stop_workers
@workers.each do |worker|
worker.io_r.close unless worker.io_r.closed?
worker.io_w.close unless worker.io_w.closed?
begin
Process.kill :INT, worker.pid
rescue Errno::ESRCH
nil
end
end
@workers.each do |worker|
begin
Process.waitpid worker.pid
rescue Errno::ECHILD
nil
end
end
end
end
end
end