This is my event loop, but I haven't got the chance of really using it
yet. I post it here because some people have asked me to a month ago. I
have since fixed a few bugs, and here it is.
It defines several classes:
Action
ActionQueue
TimerQueue
StreamMap
MainLoop
And it has useful inline documentation. Please tell me what you think of
it.
----------------8<----------------cut-here----------------8<----------------
# $Id: MainLoop.rb,v 1.1 2001/05/27 07:51:58 matju Exp $
=begin
an Action is constituted of a Receiver and a Message.
it can be called.
=end
class Action
attr_accessor :receiver
attr_accessor :selector
attr_accessor :params
attr_accessor :proc
def initialize(receiver,selector,*params,&proc)
self.receiver = receiver
self.selector = selector
self.params = params
self.proc = proc
end
def call
receiver.call(selector,*params,&proc)
end
end
=begin
an ActionQueue is an array of Actions that have to be called in the order
they are received. you can #add an Action at the end of the queue, or you
can #consume the oldest Action of the Queue.
=end
class ActionQueue
def initialize
@queue = []
end
def add(x)
@queue << x
end
def consume
@queue.shift.call
end
def empty?
@queue.empty?
end
end
=begin
a TimerQueue is a timeline of Actions that must be triggered only after a
certain point in time.
=end
class TimerQueue
TimerEntry = Struct.new("TimerEntry",:time,:action)
def initialize
@queue = []
end
# schedule an action or a block for execution at a specified time
# time is a Time object.
# delay is a Numeric object. in seconds.
# action is an Action object.
# proc is an Object that responds to #call
def at_time_call(a_time, an_action, &a_proc)
an_action ||= Action.new(a_proc,:call)
entry = TimerEntry.new(a_time, an_action)
i = 0
@queue.each {|e| e.time < entry.time or break; i+=1 }
@queue[i,0] = [entry]
end
# schedule an action or a block for execution at a certain delay
# after the current time
def after(a_delay,an_action=nil,&proc)
at_time_call(Time.now+a_delay,an_action,&proc)
end
def delay_no_next; 42 end
def delay_until_next
return delay_no_next if @queue.length == 0
delay = @queue[0].time - Time.new
delay = 0 if delay < 0
end
def consume
@queue.shift.action.call if delay_until_next == 0
end
end
=begin
a Unix Stream is what is commonly known as a file handle even though it
may be referring to a non-file like a pipe or socket.
a Stream Observer is an object that is to be notified of certain events of
a file handle, usually incoming data.
a StreamMap is an object that keeps track of the state of various Unix
Streams and what are their associated observers.
add_stream(stream,observer,type_mask)
remove_stream(stream)
adding an already added stream automatically removes the previous registration.
stream is an IO object.
type_mask is a set of the following flags:
EventLoop::READ
EventLoop::WRITE
EventLoop::EXCEPT
observer is an Object that responds to some of the following (depending on the
event-type mask):
#ready_to_read(stream)
#ready_to_write(stream)
#stream_exception(stream)
=end
class StreamMap
StreamEntry = Struct.new(:stream,:stream_observer,:type_mask)
READ = 4
WRITE = 2
EXCEPT = 1
def initialize
@streams = {}
end
def add_stream(stream,observer,type_mask)
@streams[stream.to_s] =
StreamEntry.new(stream,observer,type_mask)
end
def remove_stream(stream)
@streams.remove(stream.to_s)
end
def streams_by_mask(mask)
@streams.values.
find_all {|a| a.type_mask & mask }.
map {|a| a.stream }
end
# like IO.select, but using this object's lists, and
# returning always a three element array.
def select(time=nil)
IO.select(
streams_by_mask(READ),
streams_by_mask(WRITE),
streams_by_mask(EXCEPT),
time) || [[]]*3
end
def make_actions(time=nil)
lists = select(time)
selectors = [
:ready_to_read,
:ready_to_write,
:stream_exception]
actions = []
(0..2).each {|i|
lists[i].each{|s|
observer = @streams[s.to_s].stream_observer
actions << Action.new(observer,selectors[i],s)
}
}
actions
end
end
class MainLoop
attr_reader :actions
attr_reader :timers
attr_reader :streams
def initialize
@actions = ActionQueue.new
@timers = TimerQueue.new
@streams = StreamMap.new
end
def one
while @timers.delay_until_next == 0
@timers.consume
end
while not @actions.empty?
@actions.consume
end
@streams.select(0.1)
end
def loop
while true; one; end
end
end
#todo:
# expired timers should fill the ActionQueue instead.
# stream events should not generate actions...?
----------------8<----------------cut-here----------------8<----------------
matju