On Thu, 16 Nov 2006, Devesh Agrawal wrote:

> Hi Folks,
>
> 	I am using ruby to analyse a huge (around 60G) amount of my networking
> experiment data. Let me briefly describe my technique: I have to read
> around 40 files (of around 1.5G each) named f1,f2 ... .Each file fi
> contains traceroutes to lots of destinations at different times. I.E a
> file is basically a list of traceroutes launched from a given src (src =
> filename) launched at diff times. I want to get a structure like
> following: (list of all traceroutes from *all* src's at time 1), (list
> of all traceroutes from *all* src's at time 2)... and so on.
>
> 	For this I am using the following psuedocode:
>
> 	outputfile.open
> 	open all files f1..fn
> 	while (!(all files have eof))
> 		(f1..fn).each{|f|
> 			next if f.eof
> 			line = f.readline
> 			parse the line, and get a structure P out of it
> 			put P into a hashtable: H[P.time] << P
>
> 			check for eof conditions on f
>
> 			if (H has more than k keys ? (ie has it become very large))
> 				H.keys.sort{|t|
> 					outputfile << Marshal.dump(H[t])
> 					H.delete(t)
> 				}
> 			end
> 		}
> 	end
> 	close all files
>
> //Btw I can't use an array instead of a hashtable H, as the P.time's
> read across all files needn't be same.
>
> This is performing miserbly SLOW. I have the following questions:
<snip commentary>

use threads:


harp:~ > ls -ltarh data/big
-rw-rw-r--    1 ahoward  ahoward      251M Nov 15 16:41 data/big

harp:~ > wc -l data/big
7131456 data/big

harp:~ > head data/big
1163633749.75535 = 0.0877033759844916
1163633749.75565 = 0.913142160532852
1163633749.75569 = 0.604664544000001
1163633749.75571 = 0.233515496811041
1163633749.75574 = 0.221557802561386
1163633749.75577 = 0.241982349948893
1163633749.7558 = 0.190149667316971
1163633749.75583 = 0.0827503931446325
1163633749.75586 = 0.656736160359522
1163633749.75588 = 0.222579171509354


using my threaded program i can process these lines at a rate of about


harp:~ > ruby a.rb data/big 42 42
rate : 0.0 lines/second
rate : 14094.9956719788 lines/second
rate : 21059.6378674178 lines/second
rate : 22742.0758748341 lines/second
rate : 20527.6435560232 lines/second
rate : 16541.8249949497 lines/second
rate : 18295.1759919063 lines/second
rate : 19648.3251130277 lines/second


so, let's say about 20,000 lines per second - so it'd take about 5 minutes to
process my 250mb file.   i realize yours is more complex, but lets say you
should be able to do 1gb data on a reasonably fast machine in something like
20-30 minutes.


btw.  map won't help (much) since you are just doing a sequential read of ALl
of the data anyhow...



here's the code



harp:~ > cat a.rb

class LineProducer
   require 'thread'
   EOF = nil

   class EOFQueue < ::Queue
     def push arg
       if arg == EOF
         class << self; def pop() EOF end; end
       end
     ensure
       return super
     end
   end

   def initialize path, bufsize
     @path = path
     @bufsize = Integer bufsize
     @lines = EOFQueue.new
     @thread = new_reader
   end

   def join() @thread.join end

   def new_reader
     Thread.new do
       Thread.current.abort_on_exception = true
       buf = ''
       open(@path) do |f|
         while((buf2 = f.read @bufsize))
           buf << buf2
           lines = buf.scan %r/^.*$\n?/
           if lines.last =~ %r/$\n/
             buf = ''
           else
             buf = lines.pop
           end
           @lines.push lines
         end
       end
       @lines.push EOF
     end
   end

   def new_consumer &b
     Thread.new{
       Thread.current.abort_on_exception = true
       while((lines = @lines.pop)); b[ lines ]; end
     }
   end
end

class ThreadSafeHash
   require 'sync'
   instance_methods.each{|m| undef_method m unless m[%r/__/]}
   def initialize
     @hash = {}
     @sync = Sync.new
   end
   def method_missing m, *a, &b
     @sync.synchronize{ @hash.send m, *a, &b }
   end
   def respond_to? m, *a, &b
     @sync.synchronize{ @hash.respond_to? m, *a, &b }
   end
end


path = ARGV.shift or abort 'no path'
n = Integer(ARGV.shift || 42)
hn = Integer(ARGV.shift || 42)
mb = 2 ** 20

lines_producer = LineProducer.new path, mb
tsh = ThreadSafeHash.new

start = Time.now.to_f
elapsed = lambda{|start| Time.now.to_f - start.to_f}
progress = Thread.new{ loop{ puts "rate : #{ tsh.size / elapsed[start] } lines/second"; sleep 2 } }

parse = lambda{|line| line.split(%r/=/).map{|word| word.strip}}

threads = Array.new(n).map{
   lines_producer.new_consumer do |lines|
     h = {} and lines.each do |line|
       k, v = parse[ line ]
       h[k] = v
     end
     tsh.update h
   end
}


lines_producer.join
threads.map{|t| t.join}



regards.



-a
-- 
my religion is very simple.  my religion is kindness. -- the dalai lama