On 01/17/2014 12:19 AM, Martin Hansen wrote:
> This is a cross post from SO:
> http://stackoverflow.com/questions/21173136/how-to-resolve-pipe-dead-lock-in-ruby
>
> I am trying to emulate UNIX command line pipes in a Ruby-only solution
> that uses multiple cores. Eventually, the records piped from command to
> command will be Ruby objects marshaled using msgpack. Unfortunately, the
> code hangs after the first dump command. I am really trying to figure
> out what causes this deadlock and how to resolve it.

This seems to work, without the parallel gem (sorry, not familiar with it):

#!/usr/bin/env ruby

require 'msgpack'
require 'pp'

class Pipe
   def initialize
     @commands = []
   end

   def add(command, options = {})
     @commands << Command.new(command, options)

     self
   end

   def run
     writers = {}
     @commands.each_cons(2) do |c_in, c_out|
       reader, writer = IO.pipe

       c_out.input = MessagePack::Unpacker.new(reader)
       c_in.output = MessagePack::Packer.new(writer)
       writers[c_in] = writer
     end

     @commands.map do |command|
       fork do
         command.run
       end
       writers[command].close if writers[command]
     end

     Process.waitall
   end

   class Command
     attr_accessor :input, :output

     def initialize(command, options)
       @command = command
       @options = options
       @input   = nil
       @output  = nil
     end

     def run
       send @command
     end

     def cat
       @input.each { |record| @output.write(record).flush } if @input

       File.open(@options[:input]) do |ios|
         ios.each { |record| @output.write(record).flush } if @output
       end
     end

     def dump
       @input.each do |record|
         puts record
         @output.write(record).flush if @output
       end
     end
   end
end

p = Pipe.new
p.add(:cat, input: "foo.tab").add(:dump).add(:cat, input: 
"table.txt").add(:dump)
p.run