Deepak Gole wrote:
> Hi
> 
> Pls I need a help on above problem.*
> 
> 
> Thanks
> Deepak

Here's my approach to a similar problem. Still not as polished as I'd
like, but it maybe useful.

The core is the PoolQM class (the CircularBuffer class exists to catch
a limited number of exceptions).

=begin

NAME

  class CircularBuffer

DESCRIPTION

  A lightweight but (hopefully) thread-safe version of the circular 
buffer

  Designed primarily for intentionally limited in-memory event/error 
logging.

URI



INSTALL



HISTORY

  0.1

SYNOPSIS

  cb = CircularBuffer.new(50)     # create a new CircularBuffer that 
holds 50 nil elements
  cb << 'fnord'                   # append an element to the buffer
  elements = cb.to_a              # return elements as an array with 
elements ordered from oldest to newest
  cb.clear                        # force all entires to nil

CAVEATS

 The CircularBuffer ignores nil elements and ignores attempts to append 
them

2DOs



By Djief

=end

require 'thread'

class CircularBuffer

  def initialize(max_size)
    @max_size = max_size
    @ra = Array.new(@max_size, nil)
    @head = 0
    @mutex = Mutex.new
  end

  private

  def inc(index)
    (index +1) % @max_size
 end

  public

  # set all elements to nil
  #
  def clear
    @mutex.synchronize do
      @ra.collect! { |element| element = nil }
    end
  end

  # append a new element to the current 'end'
  #
  def <<(element)
    unless element.nil?
      @mutex.synchronize do
        @ra[@head]=element
        @head = inc(@head)
      end
    end
  end

  # return the entire buffer (except nil elements)
  # as an array
  #
  def to_a
    index = @head
    result = []
    @mutex.synchronize do
      @max_size.times do
        result << @ra[index] unless @ra[index].nil?
        index = inc(index)
      end
    end
    result
  end

end

=begin

NAME

  class PoolQM

DESCRIPTION

  PoolQM extends an Array with MonitorMixin to create a queue with
  an associated pool of worker threads that wait process any requests
  that are added to the queue.

  A dispatcher thread watches continuously for enqueued requests and
  signals idle worker threads (if any) to dequeue and process the
  request(s). If no idle workers exist, the request remains in the
  queue until one is available.

  During the creation of a new instance of PoolQM, the number of worker
  threads is established and the request processing block is defined:

    results = Queue.new
    NUM_OF_WORKERS = 10
    pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
      results << "Current request: #{request}"    # processing request 
here
    end

  Note that any output you expect to collect from your worker threads 
should
  be returned via some thread-safe mechanism or container (Queue is a 
good
  default).

  Enqueuing a request is all that is necessary to initiate it's 
processing:

    pqm.enq("This is a test, this is only a test")

  If a request causes an exception within the processing block, the 
Exception
  is appended to a circular buffer whose contents can be obtained as an 
array
  with the PoolQM#exceptions method.

  If you're intested in logging exceptions, you'll have a bit more work 
to
  do but replacing the CircularBuffer with a Queue that has it's own 
worker
  to handle disk IO is probably a good bet.

  Performance-wise this approach behaves more consistently than any I've
  produced so far i.e. it's both fast and performs with repeatable 
uniformity.

  No doubt, there's still room for improvement.


URI



INSTALL



HISTORY

  0.1 - genesis
  0.2 - documentation and clean-up

SYNOPSIS

  require 'thread'

  results = Queue.new                           # thread-safe container 
for results! <<<<<<<<<< IMPORTANT

  NUM_OF_WORKERS = 10

  pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    results << "Current request: #{request}"    # processing request 
here
  end

  100.times do |index|
    pqm.enq("Request number #{index}")          # enqueuing requests 
here
  end

  pqm.wait_until_idle                            # wait for all requests 
to be processed

  until results.empty? do                       # dump results
    p results.pop
  end

  pqm.exceptions.each do |exception|            # obtain exceptions 
array and dump it
    p exception
  end

CAVEATS



2DOs



By Djief

=end


require 'monitor'

class PoolQM

  # default size for the exceptions CircularBuffer
  #
  DEFAULT_EXCEPTION_BUFFER_SIZE = 10

  # Create a new PoolQM with 'worker_count' worker threads to execute
  # the associated block
  #
  def initialize(worker_count = 1)
    raise 'block required: { |request| ... }' unless block_given?
    @worker_count = worker_count
    @request_q = []
    @request_q.extend(MonitorMixin)
    @request_ready = @request_q.new_cond
    @exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE)
    @worker_count.times do
      Thread.new do
        loop do
          request = nil
          @request_q.synchronize do
            @request_ready.wait
            request = @request_q.delete_at(0)
          end
          begin
            yield request
          rescue Exception => e
            @exceptions << e
          end
          Thread.pass
        end
      end
    end
    @dispatcher = Thread.new do
      loop do
        @request_q.synchronize do
          @request_ready.signal unless @request_q.empty? || 
@request_ready.count_waiters == 0
        end
        Thread.pass
      end
    end

  end

  # enq the request data
  #
  def enq(request)
    @request_q.synchronize do
      @request_q << request
    end
  end

  # Wait until all the queued requests have been removed
  # from the request_q && then wait until all threads have
  # compeleted their processing and are idle
  #
  def wait_until_idle(wait_resolution=0.3)
    q_empty = false
    until q_empty
      @request_q.synchronize do
        q_empty = @request_q.empty?
      end
      sleep(wait_resolution) unless q_empty
    end
    all_threads_idle = false
    until all_threads_idle
      @request_q.synchronize do
        all_threads_idle = @request_ready.count_waiters == @worker_count
      end
      sleep(wait_resolution) unless all_threads_idle
    end
  end

  # create a new exceptions buffer of new_size
  #
  def exceptions_buffer_size=(new_size)
    @exceptions = CircularBuffer.new(new_size)
  end

  # report the size of the current exceptions buffer
  #
  def exceptions_buffer_size
    @exceptions.size
  end

  # return the current exceptions buffer as an ordered Array
  #
  def exceptions
    @exceptions.to_a
  end

end

if __FILE__ == $0

  # the usual trivial example

  require 'thread'

  # >>>> thread-safe container for result <<<<
  #
  results = Queue.new

  NUM_OF_WORKERS = 10

  pqm = PoolQM.new(NUM_OF_WORKERS) do |request|
    raise "Dummy Exception during #{request}" if rand(10) == 0  # 
simulate random exceptions
    results << "Current request: #{request}"    # processing request 
here
  end

  100.times do |index|
    pqm.enq("Request number #{index}")            # enqueuing requests 
here
  end

  # wait for all requests to be processed
  pqm.wait_until_idle

  # dump results
  until results.empty? do
    p results.pop
  end

  # obtain exceptions array and dump it
  pqm.exceptions.each do |exception|
    p exception
  end

end


Regards,

djief
-- 
Posted via http://www.ruby-forum.com/.