Feature #4415: Rubyのtest-allを並列化するパッチが完成しました
http://redmine.ruby-lang.org/issues/show/4415

起票者: Shota Fukumori
ステータス: Open, 優先度: Normal
カテゴリ: lib

# [ruby-dev:43222] の続きです。経緯などはそちらを参照していただければと思います。

パッチが完成したのでチケットを作成しました。

まつもとさんは[ruby-dev:43224]で「バグがとれたら入れちゃったら?」と言っていました。

一部テストについては改変していますが、それについては取りこまなくてもFailになることは無いと思います。
(workerで失敗したテストは並列では無く実行をしなおす為。ただし若干速度に影響がでるかと)

では、コミットをよろしくお願いします。

--patch (git diff --no-prefix)--

diff --git lib/test/unit.rb lib/test/unit.rb
index 76e9fdd..00d7e69 100644
--- lib/test/unit.rb
+++ lib/test/unit.rb
@@ -51,6 +51,11 @@ module Test
         non_options(args, options)
         @help = orig_args.map { |s| s =~ /[\s|&<>$()]/ ? s.inspect : s }.join " "
         @options = options
+        @opts = @options = options
+        if @options[:parallel]
+          @files = args 
+          @args = orig_args
+        end
       end
 
       private
@@ -75,9 +80,35 @@ module Test
         opts.on '-n', '--name PATTERN', "Filter test names on pattern." do |a|
           options[:filter] = a
         end
+ 
+        opts.on '--jobs-status [TYPE]', "Show status of jobs every file; Disabled when --jobs isn't specified." do |type|
+          options[:job_status] = true
+          options[:job_status_type] = type.to_sym if type
+        end
+
+        opts.on '-j N', '--jobs N', "Allow run tests with N jobs at once" do |a|
+          options[:parallel] = a.to_i
+        end
+
+        opts.on '--no-retry', "Don't retry running testcase when --jobs specified" do
+          options[:no_retry] = true
+        end
+
+        opts.on '--ruby VAL', "Path to ruby; It'll have used at -j option" do |a|
+          options[:ruby] = a
+        end
       end
 
       def non_options(files, options)
+        begin
+          require "rbconfig"
+        rescue LoadError
+          warn "#{caller(1)[0]}: warning: Parallel running disabled because can't get path to ruby; run specify with --ruby argument"
+          options[:parallel] = nil
+        else
+          options[:ruby] = RbConfig.ruby
+        end
+
         true
       end
     end
@@ -175,7 +206,7 @@ module Test
             $: << d
           end
           begin
-            require path
+            require path unless options[:parallel]
             result = true
           rescue LoadError
             puts "#{f}: #{$!}"
@@ -186,32 +217,301 @@ module Test
     end
 
     class Runner < MiniTest::Unit
+      include Test::Unit::Options
+      include Test::Unit::RequireFiles
       include Test::Unit::GlobOption
       include Test::Unit::LoadPathOption
       include Test::Unit::GCStressOption
       include Test::Unit::RunCount
 
       class << self; undef autorun; end
+
+      alias orig_run_anything _run_anything
+      undef _run_anything
+
+      def _run_anything type
+        if @opts[:parallel] && @warnings
+          warn ""
+          ary = []
+          @warnings.reject! do |w|
+            r = ary.include?(w[1].message)
+            ary << w[1].message
+            r
+          end
+          @warnings.each do |w|
+            warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
+          end
+          warn ""
+        end
+        orig_run_anything(type)
+      end
+
+      @@stop_auto_run = false
       def self.autorun
         at_exit {
           Test::Unit::RunCount.run_once {
             exit(Test::Unit::Runner.new.run(ARGV) || true)
-          }
+          } unless @@stop_auto_run
         } unless @@installed_at_exit
         @@installed_at_exit = true
       end
 
+      def after_worker_down(worker, e=nil, c=1)
+        return unless @opts[:parallel]
+        return if @interrupt
+        after_worker_dead worker
+        if e
+          b = e.backtrace
+          warn "#{b.shift}: #{e.message} (#{e.class})"
+          STDERR.print b.map{|s| "\tfrom #{s}"}.join("\n")
+        end
+        @need_quit = true
+        warn ""
+        warn "Some worker was crashed. It seems ruby interpreter's bug"
+        warn "or, a bug of test/unit/parallel.rb. try again without -j"
+        warn "option."
+        warn ""
+        STDERR.flush
+        exit c
+      end
+
+      def jobs_status
+        return unless @opts[:job_status]
+        puts "" unless @opts[:verbose]
+        if @opts[:job_status]
+          b = []
+          str = @workers.map { |x|
+            a = "#{x[:pid]}:#{x[:status].to_s.ljust(7)}"
+            if x[:file]
+              if @opts[:job_status_type] == :replace
+                a = "#{x[:pid]}=#{x[:file]}"
+              else
+                if a.size > x[:file].size
+                  b << x[:file].ljust(a.size)
+                else
+                  a << " "*(x[:file].size-a.size)
+                  b << x[:file]
+                end
+              end
+            else
+              b << " "*a.size
+            end
+            a
+          }.join(" ")
+          if @opts[:job_status_type] == :replace
+            @terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \
+                            ||  %x{tput cols 2>/dev/null}.to_i.nonzero? \
+                            ||  80
+            @jstr_size ||= 0
+            del_jobs_status
+            STDOUT.flush
+            print str[0... / terminal_width]
+            STDOUT.flush
+            @jstr_size = str.size > @terminal_width ? @terminal_width : str.size
+          else
+            puts str
+            puts b.join(" ")
+          end
+        end
+      end
+
+      def del_jobs_status
+        return unless @opts[:job_status_type] == :replace && @jstr_size
+        print "\r"+" "*@jstr_size+"\r"
+      end
+
+      def after_worker_dead(worker)
+        return unless @opts[:parallel]
+        return if @interrupt
+        worker[:status] = :quit
+        worker[:in].close
+        worker[:out].close
+        @workers.delete(worker)
+        @dead_workers << worker
+        @ios = @workers.map{|w| w[:out] }
+      end
+
       def _run_suites suites, type
         @interrupt = nil
         result = []
-        suites.each {|suite|
+        if @opts[:parallel]
           begin
-            result << _run_suite(suite, type)
+            # Require needed things for parallel running
+            require 'thread'
+            require 'timeout'
+            @tasks = @files.dup # Array of filenames.
+            @need_quit = false
+            @dead_workers = []  # Array of dead workers.
+            @warnings = []
+            shutting_down = false
+            errors = []
+            failures = []
+            skips = []
+            rep = []
+
+            # Array of workers.
+            @workers = @opts[:parallel].times.map do
+              i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master
+              j,k = IO.pipe("ASCII-8BIT") # worker <j|<k master
+              k.sync = true
+              pid = spawn(*@opts[:ruby].split(/ /),File.dirname(__FILE__) +
+                          "/unit/parallel.rb", *@args, out: o, in: j)
+              [o,j].each{|io| io.close }
+              {in: k, out: i, pid: pid, status: :waiting}
+            end
+
+            # Thread: watchdog
+            watchdog = Thread.new do
+              while stat = Process.wait2
+                break if @interrupt # Break when interrupt
+                w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup
+                next unless w
+                unless w[:status] == :quit
+                  # Worker down
+                  after_worker_down w, nil, stat[1].to_i
+                end
+              end
+            end
+            @workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker
+            @ios = @workers.map{|w| w[:out] } # Array of worker IOs
+
+            while _io = IO.select(@ios)[0]
+              break unless _io.each do |io|
+                break if @need_quit
+                a = @workers_hash[io]
+                buf = ((a[:status] == :quit) ? io.read : io.gets).chomp
+                case buf
+                when /^okay$/ # Worker will run task
+                  a[:status] = :running
+                  jobs_status
+                when /^ready$/ # Worker is ready
+                  a[:status] = :ready
+                  if @tasks.empty?
+                    break unless @workers.find{|x| x[:status] == :running }
+                  else
+                    task = @tasks.shift
+                    a[:file] = File.basename(task).gsub(/\.rb/,"")
+                    a[:real_file] = task
+                    begin
+                      a[:loadpath] ||= []
+                      a[:in].puts "loadpath #{[Marshal.dump($:-a[:loadpath])].pack("m").gsub("\n","")}"
+                      a[:loadpath] = $:.dup
+                      a[:in].puts "run #{task} #{type}"
+                      a[:status] = :prepare
+                    rescue Errno::EPIPE
+                      after_worker_down a
+                    rescue IOError
+                      raise unless ["stream closed","closed stream"].include? $!.message
+                      after_worker_down a
+                    end
+                  end
+
+                  jobs_status
+                when /^done (.+?)$/ # Worker ran a one of suites in a file
+                  r = Marshal.load($1.unpack("m")[0])
+                  # [result,result,report,$:]
+                  result << r[0..1]
+                  rep << {file: a[:real_file], report: r[2], result: r[3],
+                          testcase: r[5]}
+                  errors << [a[:real_file],r[5],r[3][0]]
+                  failures << [a[:real_file],r[5],r[3][1]]
+                  skips << [a[:real_file],r[5],r[3][2]]
+                  $:.push(*r[4]).uniq!
+                  a[:status] = :done
+                  jobs_status if @opts[:job_status_type] == :replace
+                  a[:status] = :running
+                when /^p (.+?)$/ # Worker wanna print to STDOUT
+                  del_jobs_status
+                  print $1.unpack("m")[0]
+                  jobs_status if @opts[:job_status_type] == :replace
+                when /^after (.+?)$/
+                  @warnings << Marshal.load($1.unpack("m")[0])
+                when /^bye (.+?)$/ # Worker will shutdown
+                  e = Marshal.load($1.unpack("m")[0])
+                  after_worker_down a, e
+                when /^bye$/ # Worker will shutdown
+                  if shutting_down
+                    after_worker_dead a
+                  else
+                    after_worker_down a
+                  end
+                end
+                break if @need_quit
+              end
+            end
+
+            # Retry
+            # TODO: Interrupt?
           rescue Interrupt => e
             @interrupt = e
-            break
+            return result
+          ensure
+            shutting_down = true
+
+            watchdog.kill if watchdog
+            @workers.each do |w|
+              begin
+                timeout(1) do
+                  w[:in].puts "quit"
+                end
+              rescue Errno::EPIPE
+              rescue Timeout::Error
+              end
+              [:in,:out].each do |x|
+                w[x].close
+              end
+            end
+            begin
+              timeout(0.2*@workers.size) do
+                Process.waitall
+              end
+            rescue Timeout::Error
+              @workers.each do |w|
+                begin
+                  Process.kill(:KILL,w[:pid])
+                rescue Errno::ESRCH; end
+              end
+            end
+
+            unless @need_quit
+              if @interrupt || @opts[:no_retry]
+                rep.each do |r|
+                  report.push(*r[:report])
+                end
+                @errors += errors.map(&:last).inject(:+)
+                @failures += failures.map(&:last).inject(:+)
+                @skips += skips.map(&:last).inject(:+)
+              else
+                puts ""
+                puts "Retrying..."
+                puts ""
+                @options = @opts
+                rep.each do |r|
+                  if r[:testcase] && r[:file] && !r[:report].empty?
+                    require r[:file]
+                    _run_suite(eval(r[:testcase]),type)
+                  else
+                    report.push(*r[:report])
+                    @errors += r[:result][0]
+                    @failures += r[:result][1]
+                    @skips += r[:result][1]
+                  end
+                end
+              end
+            end
+
+
           end
-        }
+        else
+          suites.each {|suite|
+            begin
+              result << _run_suite(suite, type)
+            rescue Interrupt => e
+              @interrupt = e
+              break
+            end
+          }
+        end
         result
       end
 
@@ -223,10 +523,6 @@ module Test
     end
 
     class AutoRunner
-      class Runner < Test::Unit::Runner
-        include Test::Unit::RequireFiles
-      end
-
       attr_accessor :to_run, :options
 
       def initialize(force_standalone = false, default_dir = nil, argv = ARGV)
diff --git lib/test/unit/parallel.rb lib/test/unit/parallel.rb
new file mode 100644
index 0000000..acfdc84
--- /dev/null
+++ lib/test/unit/parallel.rb
@@ -0,0 +1,139 @@
+require 'test/unit'
+
+module Test
+  module Unit
+    class Worker < Runner
+      class << self
+        undef autorun
+      end
+      
+      alias orig_run_suite _run_suite
+      undef _run_suite
+      undef _run_suites
+
+      def _run_suites suites, type
+        suites.map do |suite|
+          result = _run_suite(suite, type)
+        end
+      end
+
+      def _run_suite(suite, type)
+        r = report.dup
+        orig_stdout = MiniTest::Unit.output
+        i,o = IO.pipe
+        MiniTest::Unit.output = o
+
+        stdout = STDOUT.dup
+
+        th = Thread.new(i.dup) do |io|
+          begin
+            while buf = (self.verbose ? io.gets : io.read(5))
+              stdout.puts "p #{[buf].pack("m").gsub("\n","")}"
+            end
+          rescue IOError
+          rescue Errno::EPIPE
+          end
+        end
+
+        e, f, s = @errors, @failures, @skips
+
+        result = orig_run_suite(suite, type)
+
+        MiniTest::Unit.output = orig_stdout
+
+        o.close
+        i.close
+
+        begin
+          th.join
+        rescue IOError
+          raise unless ["stream closed","closed stream"].include? $!.message
+        end
+
+        result << (report - r)
+        result << [@errors-e,@failures-f,@skips-s]
+        result << ($: - @old_loadpath)
+        result << suite.name
+
+        begin
+          STDOUT.puts "done #{[Marshal.dump(result)].pack("m").gsub("\n","")}"
+        rescue Errno::EPIPE; end
+        return result
+      ensure
+        MiniTest::Unit.output = orig_stdout
+        o.close if o && !o.closed?
+        i.close if i && !i.closed?
+      end
+
+      def run(args = [])
+        process_args args
+        @@stop_auto_run = true
+        @opts = @options.dup
+
+        STDOUT.sync = true
+        STDOUT.puts "ready"
+        Signal.trap(:INT,"IGNORE")
+
+
+        @old_loadpath = []
+        begin
+          stdin = STDIN.dup
+          stdout = STDOUT.dup
+          while buf = stdin.gets
+            case buf.chomp
+            when /^loadpath (.+?)$/
+              @old_loadpath = $:.dup
+              $:.push(*Marshal.load($1.unpack("m")[0].force_encoding("ASCII-8BIT"))).uniq!
+            when /^run (.+?) (.+?)$/
+              STDOUT.puts "okay"
+
+              th = Thread.new do
+                while puf = stdin.gets
+                  if puf.chomp == "quit"
+                    begin
+                      stdout.puts "bye"
+                    rescue Errno::EPIPE; end
+                    exit 
+                  end
+                end
+              end
+
+              @options = @opts.dup
+              suites = MiniTest::Unit::TestCase.test_suites
+
+              begin
+                require $1
+              rescue LoadError
+                th.kill
+                STDOUT.puts "after #{[Marshal.dump([$1, $!])].pack("m").gsub("\n","")}"
+                STDOUT.puts "ready"
+                next
+              end
+              _run_suites MiniTest::Unit::TestCase.test_suites-suites, $2.to_sym
+
+              STDIN.reopen(stdin)
+              STDOUT.reopen(stdout)
+
+              th.kill
+              STDOUT.puts "ready"
+            when /^quit$/
+              begin
+                STDOUT.puts "bye"
+              rescue Errno::EPIPE; end
+              exit
+            end
+          end
+        rescue Exception => e
+          begin
+            STDOUT.puts "bye #{[Marshal.dump(e)].pack("m").gsub("\n","")}"
+          rescue Errno::EPIPE;end
+          exit
+        ensure
+          stdin.close
+        end
+      end
+    end
+  end
+end
+
+Test::Unit::Worker.new.run(ARGV)
diff --git test/csv/test_serialization.rb test/csv/test_serialization.rb
index 0adb972..ba19b7a 100755
--- test/csv/test_serialization.rb
+++ test/csv/test_serialization.rb
@@ -131,7 +131,7 @@ class TestCSV::Serialization < TestCSV
   def test_io
     test_class_dump
 
-    data_file = File.join(File.dirname(__FILE__), "temp_test_data.csv")
+    data_file = File.join(File.dirname(__FILE__), "serialization_test_data.csv")
     CSV.dump(@names, File.open(data_file, "wb"))
 
     assert(File.exist?(data_file))
diff --git test/net/http/test_https.rb test/net/http/test_https.rb
index 983ba7f..12684f6 100644
--- test/net/http/test_https.rb
+++ test/net/http/test_https.rb
@@ -24,7 +24,7 @@ class TestNetHTTPS < Test::Unit::TestCase
 
   CONFIG = {
     'host' => '127.0.0.1',
-    'port' => 10082, # different from test_http.rb
+    'port' => 10082,
     'proxy_host' => nil,
     'proxy_port' => nil,
     'ssl_enable' => true,
diff --git test/rake/test_file_task.rb test/rake/test_file_task.rb
index 1b0c0a5..0232ac9 100644
--- test/rake/test_file_task.rb
+++ test/rake/test_file_task.rb
@@ -29,7 +29,9 @@ class Rake::TestFileTask < Test::Unit::TestCase
   end
 
   def test_file_times_new_depends_on_old
-    create_timed_files(OLDFILE, NEWFILE)
+    until File.exist?(OLDFILE) && File.exist?(NEWFILE)
+      create_timed_files(OLDFILE, NEWFILE)
+    end
 
     t1 = Rake.application.intern(FileTask, NEWFILE).enhance([OLDFILE])
     t2 = Rake.application.intern(FileTask, OLDFILE)
@@ -38,7 +40,9 @@ class Rake::TestFileTask < Test::Unit::TestCase
   end
 
   def test_file_times_old_depends_on_new
-    create_timed_files(OLDFILE, NEWFILE)
+    until File.exist?(OLDFILE) && File.exist?(NEWFILE)
+      create_timed_files(OLDFILE, NEWFILE)
+    end
 
     t1 = Rake.application.intern(FileTask,OLDFILE).enhance([NEWFILE])
     t2 = Rake.application.intern(FileTask, NEWFILE)
@@ -93,46 +97,46 @@ class Rake::TestDirectoryTask < Test::Unit::TestCase
   include Rake
 
   def setup
-    rm_rf "testdata", :verbose=>false
+    rm_rf "testdata2", :verbose=>false
   end
 
   def teardown
-    rm_rf "testdata", :verbose=>false
+    rm_rf "testdata2", :verbose=>false
   end
 
   def test_directory
     desc "DESC"
-    directory "testdata/a/b/c"
-    assert_equal FileCreationTask, Task["testdata"].class
-    assert_equal FileCreationTask, Task["testdata/a"].class
-    assert_equal FileCreationTask, Task["testdata/a/b/c"].class
-    assert_nil             Task["testdata"].comment
-    assert_equal "DESC",   Task["testdata/a/b/c"].comment
-    assert_nil             Task["testdata/a/b"].comment
+    directory "testdata2/a/b/c"
+    assert_equal FileCreationTask, Task["testdata2"].class
+    assert_equal FileCreationTask, Task["testdata2/a"].class
+    assert_equal FileCreationTask, Task["testdata2/a/b/c"].class
+    assert_nil             Task["testdata2"].comment
+    assert_equal "DESC",   Task["testdata2/a/b/c"].comment
+    assert_nil             Task["testdata2/a/b"].comment
     verbose(false) {
-      Task['testdata/a/b'].invoke
+      Task['testdata2/a/b'].invoke
     }
-    assert File.exist?("testdata/a/b")
-    assert ! File.exist?("testdata/a/b/c")
+    assert File.exist?("testdata2/a/b")
+    assert ! File.exist?("testdata2/a/b/c")
   end
 
   if Rake::Win32.windows?
     def test_directory_win32
       desc "WIN32 DESC"
-      FileUtils.mkdir_p("testdata")
-      Dir.chdir("testdata") do
-        directory 'c:/testdata/a/b/c'
-        assert_equal FileCreationTask, Task['c:/testdata'].class
-        assert_equal FileCreationTask, Task['c:/testdata/a'].class
-        assert_equal FileCreationTask, Task['c:/testdata/a/b/c'].class
-        assert_nil             Task['c:/testdata'].comment
-        assert_equal "WIN32 DESC",   Task['c:/testdata/a/b/c'].comment
-        assert_nil             Task['c:/testdata/a/b'].comment
+      FileUtils.mkdir_p("testdata2")
+      Dir.chdir("testdata2") do
+        directory 'c:/testdata2/a/b/c'
+        assert_equal FileCreationTask, Task['c:/testdata2'].class
+        assert_equal FileCreationTask, Task['c:/testdata2/a'].class
+        assert_equal FileCreationTask, Task['c:/testdata2/a/b/c'].class
+        assert_nil             Task['c:/testdata2'].comment
+        assert_equal "WIN32 DESC",   Task['c:/testdata2/a/b/c'].comment
+        assert_nil             Task['c:/testdata2/a/b'].comment
         verbose(false) {
-          Task['c:/testdata/a/b'].invoke
+          Task['c:/testdata2/a/b'].invoke
         }
-        assert File.exist?('c:/testdata/a/b')
-        assert ! File.exist?('c:/testdata/a/b/c')
+        assert File.exist?('c:/testdata2/a/b')
+        assert ! File.exist?('c:/testdata2/a/b/c')
       end
     end
   end


----------------------------------------
http://redmine.ruby-lang.org