I'm a Unix admin at a research lab in the technion, and use (and 
loooove) Ruby
for all kinds of useful applications I wrote for use in the lab. One of 
them is
the Matlab Application Server, a queue/batch manager for Matlab jobs. 
Old version
was perl, text files, and one file per job - new version is Rails for 
the UI
(which is on another machine), vanilla Ruby for the daemon, and MySQL to
communicate between them.

Usually it works spectacularily, and people don't have to lock down the 
Windows
workstations overnight (or over many nights) so they finish their jobs. 
The queue
is also rather fair - 2 simultaneous jobs, 1 per each user.

However, today something strange happened. The server was under heavy 
load and
using a lot of swap, as usual when two Matlab jobs are running. Two jobs 
were
indeed running: pids A and B (A < B). However, they were both by the 
same user
(which shouldn't happen). Furthermore, Only B was a son of the daemon, 
whereas
A was a son of init. The log file said that earlier, a son C died which 
wasn't
in the SQL table of running jobs (dying children are removed from there, 
because
it means they finished running). And weirdest, the SQL table of running 
jobs
contained job D, which wasn't run (and belonged to a different user).

Very odd, and I can't seem to find the bug. So here are relevant bits of 
code,
if anyone has an idea I'd greatly appreciate it.

  (snip)

  module MatlabAS
    class Daemon
      def initialize
        load_config
        @total_procs = 0
        @user_procs = {}

        mysql_connect

        setup_signals
        clear_running_jobs
        main_loop
      end

      # snip

      protected

      # Main program loop
      def main_loop
        @continue_running = true
        while @continue_running
          mysql_connect
          begin
            handle_command # For handling manual 'kill job' commands, 
not relevant
          rescue
            $stderr.print 'Error handling command: '
            $stderr.puts $!
          end
          if @total_procs < @config['limits']['total_procs']
            queue_entry = get_queue_entry
            if queue_entry
              user_id = queue_entry[:user][:id]
              @user_procs[user_id] = 0 if @user_procs[user_id].nil?
              @user_procs[user_id] += 1
              @total_procs += 1

              delete_queue_entry queue_entry[:id]

              child_pid = fork do
                run_worker_process queue_entry[:user][:username],
                                   queue_entry[:directory]
              end
              add_running_job_entry queue_entry[:sent_at],
                                    queue_entry[:user],
                                    queue_entry[:directory],
                                    child_pid
            end
          end

          mysql_disconnect

          really_sleep @config['limits']['sleep_cycle']
        end
      end

      # Make sure that the duration is slept, even if signals were 
caught
      def really_sleep(duration=0)
        # Special case for 0-duration sleep, even though we don't use it
        if duration == 0
          sleep 0
        else
          nap_remaining = duration
          while nap_remaining > 0
            nap_remaining -= sleep nap_remaining
          end
        end
      end

      # Do cleanup-work when children die
      def handle_dying_children
        mysql_connect
        foreach_dying_child do |child_pid|
          begin
            child = get_running_job_by_pid child_pid
            MatlabAS.mail child[:user][:username], "(snipped)"
            @total_procs -= 1
            @user_procs[child[:user][:id]] -= 1
            add_historic_job_entry child[:sent_at], child[:started_at],
                                   child[:user], child[:directory]
          rescue
            $stderr.print 'Error rescuing a dying child: '
            $stderr.puts $!
          end
        end
      end

      # Loop across all currently dying children, yielding each
      # one's +pid+. Very useful for handling +SIGCHLD+.
      def foreach_dying_child
        begin
          while pid = Process.waitpid(-1,Process::WNOHANG)
            yield pid
          end
        rescue Errno::ECHILD
          # Out of dying children, no problem.
        end
      end

      # Run an actual matlab job
      def run_worker_process(user, directory)
        user_info = Etc.getpwnam user
        work_pathname = Base_path + user + directory
        setup_signals false
        Process::Sys.setgid user_info.gid
        Process::Sys.setuid user_info.uid

        # snip, handles errors for missing directory / startup file

        # config variables were simplified here, obviously they're in
        # a nice hash

        system "#@bash -c \"#@matlab < #@startup_file > #@output 2> 
#@error\""
      end

      # Set up signal handlers. Default is to set up the signal handlers
      # for a parent process, but if +parent+ is false, signals are set
      # up for a child.
      def setup_signals(parent = true)
        if parent
          Signal.trap('CHLD') { handle_dying_children }

          Signal.trap('INT') do
            puts "User interrupt. Dying gracefully..."
            self.close
          end

          Signal.trap('TERM') do
            puts "Termination requested. Dying gracefully..."
            self.close
          end

          Signal.trap('HUP') do
            puts "Configuration reload requested, reloading config."
            load_config
          end
        else
          Signal.trap('CHLD', 'DEFAULT')
          Signal.trap('INT', 'DEFAULT')
          Signal.trap('TERM', 'DEFAULT')
          Signal.trap('HUP', 'DEFAULT')
        end
      end

      def handle_command
        # Snip, kills a job if such an instruction is in the table
      end

      # Reads one queue entry from the +queue_entries+ table, in which 
the
      # user doesn't match any of the ones who have currently hit their
      # process quota. If none are available, returns +nil+.
      def get_queue_entry
        query_string = 'SELECT * FROM queue_entries '

        if users_at_quota.size > 0
          query_string += 'WHERE ' +
            users_at_quota.collect do |user_id|
              "user_id != #{user_id}"
            end.join(' AND ') + ' '
        end

        query_string += 'LIMIT 1'

        result = @mysql_connection.query query_string
        queue_entry_hash = result.fetch_hash
        result.free

        if queue_entry_hash
          return { :id => queue_entry_hash['id'],
                   :sent_at => Time.parse(queue_entry_hash['sent_at']),
                   :user => get_user_by_id(queue_entry_hash['user_id']),
                   :directory => queue_entry_hash['directory']
                 }
        else
          return nil
        end
      end

      # Returns an array of all users which are at their process quota.
      def users_at_quota
        @user_procs.reject do |user_id,procs|
          procs < @config['limits']['procs_per_user']
        end.keys
      end

      # Deletes the given queued entry from the table.
      def delete_queue_entry(id)
        @mysql_connection.query "DELETE FROM queue_entries WHERE id = 
#{id}"
      end

      # Adds information about the given historic job to the history 
table.
      def add_historic_job_entry(sent_at, started_at, user, directory)
        @mysql_connection.query <<-EOF
  INSERT INTO historic_jobs (sent_at,
                             started_at,
                             user_id,
                             directory)
         VALUES ('#{sent_at.to_mysql_time}',
                 '#{started_at.to_mysql_time}',
                 '#{user[:id]}',
                 '#{directory}')
        EOF
      end

      # Adds information about the given running-job to the running-job
      # table.
      def add_running_job_entry(sent_at, user, directory, pid)
        @mysql_connection.query <<-EOF
  INSERT INTO running_jobs (sent_at,
                            user_id,
                            directory,
                            pid)
         VALUES ('#{sent_at.to_mysql_time}',
                 '#{user[:id]}',
                 '#{directory}',
                 '#{pid}')
        EOF
      end

      # Gets a command from the +commands+ table and deletes it
      def get_command
        # snip
      end

      # Gets the running-job information for a job matching the supplied
      # +pid+, and removes it from the running-jobs table unless
      # :remove is set to false (default is to delete. Example:
      #
      #   get_running_job_by_pid 1053, :remove => false
      #
      def get_running_job_by_pid(pid, options = { :remove => true })
        result = @mysql_connection.query <<-EOF
  SELECT * FROM running_jobs WHERE pid = #{pid} LIMIT 1
        EOF
        result_hash = result.fetch_hash
        result.free

        # THIS IS THE ERROR WHICH WAS RAISED
        raise "No job with pid #{pid} in running jobs table." if 
result_hash.nil?

        if options[:remove]
          @mysql_connection.query "DELETE FROM running_jobs WHERE pid = 
#{pid}"
        end

        return { :sent_at => Time.parse(result_hash['sent_at']),
                 :started_at => Time.parse(result_hash['started_at']),
                 :user => get_user_by_id(result_hash['user_id']),
                 :directory => result_hash['directory']
               }
      end

      # Clears the running-jobs table, and any STOP commands
      def clear_running_jobs
        @mysql_connection.query "DELETE FROM running_jobs"
        @mysql_connection.query "DELETE FROM commands WHERE command = 
'stop'"
      end

      # Gets user information by the given user_id. User information
      # is in the form <tt>{ :id, :username }</tt>.
      def get_user_by_id(user_id)
        result = @mysql_connection.query <<-EOF
  SELECT username FROM users WHERE id = #{user_id} LIMIT 1
        EOF
        result_hash = result.fetch_hash
        result.free

        { :id => user_id, :username => result_hash['username'] }
      end

      # Load configuration from YAML file
      def load_config
        @config = File.open('/etc/matlab_as.yml') { |yf| YAML::load yf }
      end

      def mysql_connect
        mysql_disconnect
        mysql_config = @config['mysql']
        connection = Mysql.new mysql_config['host'],
        mysql_config['username'], mysql_config['password']
        connection.select_db mysql_config['database']

        @mysql_connection = connection
      end

      def mysql_disconnect
        @mysql_connection.close unless @mysql_connection.nil?
      end

      def MatlabAS.mail(address, message)
        # snip
      end
    end
  end

  daemon = MatlabAS::Daemon.new
  daemon.close

-- 
Posted via http://www.ruby-forum.com/.