Deepak Gole wrote: > Hi > > Pls I need a help on above problem.* > > > Thanks > Deepak Here's my approach to a similar problem. Still not as polished as I'd like, but it maybe useful. The core is the PoolQM class (the CircularBuffer class exists to catch a limited number of exceptions). =begin NAME class CircularBuffer DESCRIPTION A lightweight but (hopefully) thread-safe version of the circular buffer Designed primarily for intentionally limited in-memory event/error logging. URI INSTALL HISTORY 0.1 SYNOPSIS cb = CircularBuffer.new(50) # create a new CircularBuffer that holds 50 nil elements cb << 'fnord' # append an element to the buffer elements = cb.to_a # return elements as an array with elements ordered from oldest to newest cb.clear # force all entires to nil CAVEATS The CircularBuffer ignores nil elements and ignores attempts to append them 2DOs By Djief =end require 'thread' class CircularBuffer def initialize(max_size) @max_size = max_size @ra = Array.new(@max_size, nil) @head = 0 @mutex = Mutex.new end private def inc(index) (index +1) % @max_size end public # set all elements to nil # def clear @mutex.synchronize do @ra.collect! { |element| element = nil } end end # append a new element to the current 'end' # def <<(element) unless element.nil? @mutex.synchronize do @ra[@head]=element @head = inc(@head) end end end # return the entire buffer (except nil elements) # as an array # def to_a index = @head result = [] @mutex.synchronize do @max_size.times do result << @ra[index] unless @ra[index].nil? index = inc(index) end end result end end =begin NAME class PoolQM DESCRIPTION PoolQM extends an Array with MonitorMixin to create a queue with an associated pool of worker threads that wait process any requests that are added to the queue. A dispatcher thread watches continuously for enqueued requests and signals idle worker threads (if any) to dequeue and process the request(s). If no idle workers exist, the request remains in the queue until one is available. During the creation of a new instance of PoolQM, the number of worker threads is established and the request processing block is defined: results = Queue.new NUM_OF_WORKERS = 10 pqm = PoolQM.new(NUM_OF_WORKERS) do |request| results << "Current request: #{request}" # processing request here end Note that any output you expect to collect from your worker threads should be returned via some thread-safe mechanism or container (Queue is a good default). Enqueuing a request is all that is necessary to initiate it's processing: pqm.enq("This is a test, this is only a test") If a request causes an exception within the processing block, the Exception is appended to a circular buffer whose contents can be obtained as an array with the PoolQM#exceptions method. If you're intested in logging exceptions, you'll have a bit more work to do but replacing the CircularBuffer with a Queue that has it's own worker to handle disk IO is probably a good bet. Performance-wise this approach behaves more consistently than any I've produced so far i.e. it's both fast and performs with repeatable uniformity. No doubt, there's still room for improvement. URI INSTALL HISTORY 0.1 - genesis 0.2 - documentation and clean-up SYNOPSIS require 'thread' results = Queue.new # thread-safe container for results! <<<<<<<<<< IMPORTANT NUM_OF_WORKERS = 10 pqm = PoolQM.new(NUM_OF_WORKERS) do |request| results << "Current request: #{request}" # processing request here end 100.times do |index| pqm.enq("Request number #{index}") # enqueuing requests here end pqm.wait_until_idle # wait for all requests to be processed until results.empty? do # dump results p results.pop end pqm.exceptions.each do |exception| # obtain exceptions array and dump it p exception end CAVEATS 2DOs By Djief =end require 'monitor' class PoolQM # default size for the exceptions CircularBuffer # DEFAULT_EXCEPTION_BUFFER_SIZE = 10 # Create a new PoolQM with 'worker_count' worker threads to execute # the associated block # def initialize(worker_count = 1) raise 'block required: { |request| ... }' unless block_given? @worker_count = worker_count @request_q = [] @request_q.extend(MonitorMixin) @request_ready = @request_q.new_cond @exceptions = CircularBuffer.new(DEFAULT_EXCEPTION_BUFFER_SIZE) @worker_count.times do Thread.new do loop do request = nil @request_q.synchronize do @request_ready.wait request = @request_q.delete_at(0) end begin yield request rescue Exception => e @exceptions << e end Thread.pass end end end @dispatcher = Thread.new do loop do @request_q.synchronize do @request_ready.signal unless @request_q.empty? || @request_ready.count_waiters == 0 end Thread.pass end end end # enq the request data # def enq(request) @request_q.synchronize do @request_q << request end end # Wait until all the queued requests have been removed # from the request_q && then wait until all threads have # compeleted their processing and are idle # def wait_until_idle(wait_resolution=0.3) q_empty = false until q_empty @request_q.synchronize do q_empty = @request_q.empty? end sleep(wait_resolution) unless q_empty end all_threads_idle = false until all_threads_idle @request_q.synchronize do all_threads_idle = @request_ready.count_waiters == @worker_count end sleep(wait_resolution) unless all_threads_idle end end # create a new exceptions buffer of new_size # def exceptions_buffer_size=(new_size) @exceptions = CircularBuffer.new(new_size) end # report the size of the current exceptions buffer # def exceptions_buffer_size @exceptions.size end # return the current exceptions buffer as an ordered Array # def exceptions @exceptions.to_a end end if __FILE__ == $0 # the usual trivial example require 'thread' # >>>> thread-safe container for result <<<< # results = Queue.new NUM_OF_WORKERS = 10 pqm = PoolQM.new(NUM_OF_WORKERS) do |request| raise "Dummy Exception during #{request}" if rand(10) == 0 # simulate random exceptions results << "Current request: #{request}" # processing request here end 100.times do |index| pqm.enq("Request number #{index}") # enqueuing requests here end # wait for all requests to be processed pqm.wait_until_idle # dump results until results.empty? do p results.pop end # obtain exceptions array and dump it pqm.exceptions.each do |exception| p exception end end Regards, djief -- Posted via http://www.ruby-forum.com/.