8 # A QueueRing is implemented as an array with elements in the form
9 # [chan, [message1, message2, ...]
10 # Note that the channel +chan+ has no actual bearing with the channels
11 # to which messages will be sent
36 cmess = @storage.assoc(chan)
38 idx = @storage.index(cmess)
42 @storage << [chan, [mess]]
48 warning "trying to access empty ring"
52 @last_idx = (@last_idx + 1) % @storage.length
53 mess = @storage[@last_idx][1].first
60 warning "trying to access empty ring"
63 @last_idx = (@last_idx + 1) % @storage.length
64 mess = @storage[@last_idx][1].shift
65 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
73 # a MessageQueue is an array of QueueRings
74 # rings have decreasing priority, so messages in ring 0
75 # are more important than messages in ring 1, and so on
76 @rings = Array.new(3) { |i|
80 # ring 0 is special in that if it's not empty, it will
81 # be popped. IOW, ring 0 can starve the other rings
82 # ring 0 is strictly FIFO and is therefore implemented
87 # the other rings are satisfied round-robin
98 def push(mess, chan=nil, cring=0)
101 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
104 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
105 @rings[ring].push mess, chan
111 return false unless r.empty?
126 warning "trying to access empty ring"
131 mess = @rings[0].first
133 save_ring = @last_ring
134 (@rings.length - 1).times {
135 @last_ring = (@last_ring % (@rings.length - 1)) + 1
136 if !@rings[@last_ring].empty?
137 mess = @rings[@last_ring].next
141 @last_ring = save_ring
143 error "nil message" if mess.nil?
149 warning "trying to access empty ring"
154 return @rings[0].shift
156 (@rings.length - 1).times {
157 @last_ring = (@last_ring % (@rings.length - 1)) + 1
158 if !@rings[@last_ring].empty?
159 return @rings[@last_ring].shift
162 error "nil message" if mess.nil?
168 # wrapped TCPSocket for communication with the server.
169 # emulates a subset of TCPSocket functionality
171 # total number of lines sent to the irc server
172 attr_reader :lines_sent
174 # total number of lines received from the irc server
175 attr_reader :lines_received
177 # total number of bytes sent to the irc server
178 attr_reader :bytes_sent
180 # total number of bytes received from the irc server
181 attr_reader :bytes_received
183 # accumulator for the throttle
184 attr_reader :throttle_bytes
186 # byterate components
187 attr_reader :bytes_per
188 attr_reader :seconds_per
190 # delay between lines sent
191 attr_reader :sendq_delay
194 attr_reader :sendq_burst
196 # server:: server to connect to
198 # host:: optional local host to bind to (ruby 1.7+ required)
199 # create a new IrcSocket
200 def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
201 @timer = Timer::Timer.new
213 @sendq_delay = sendq_delay.to_f
217 @last_send = Time.new - @sendq_delay
218 @last_throttle = Time.new
221 @sendq_burst = sendq_burst.to_i
233 if brt.match(/(\d+)\/(\d)/)
235 @seconds_per = $2.to_i
236 debug "Byterate now #{byterate}"
239 debug "Couldn't set byterate #{brt}"
248 # open a TCP connection to the server
251 warning "reconnecting while connected"
256 @sock=TCPSocket.new(@server, @port, @host)
257 rescue ArgumentError => e
258 error "Your version of ruby does not support binding to a "
259 error "specific local address, please upgrade if you wish "
260 error "to use HOST = foo"
261 error "(this option has been disabled in order to continue)"
262 @sock=TCPSocket.new(@server, @port)
265 @sock=TCPSocket.new(@server, @port)
269 @sendq = MessageQueue.new
272 def sendq_delay=(newfreq)
273 debug "changing sendq frequency to #{newfreq}"
274 @qmutex.synchronize do
275 @sendq_delay = newfreq
285 def sendq_burst=(newburst)
286 @qmutex.synchronize do
287 @sendq_burst = newburst
292 return "#{@bytes_per}/#{@seconds_per}"
295 def byterate=(newrate)
296 @qmutex.synchronize do
301 def run_throttle(more=0)
303 if @throttle_bytes > 0
304 # If we ever reach the limit, we halve the actual allowed byterate
305 # until we manage to reset the throttle.
306 if @throttle_bytes >= @bytes_per
309 delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
311 @throttle_bytes -= delta
312 @throttle_bytes = 0 if @throttle_bytes < 0
318 @throttle_bytes += more
321 # used to send lines to the remote IRCd by skipping the queue
322 # message: IRC message to send
323 # it should only be used for stuff that *must not* be queued,
324 # i.e. the initial PASS, NICK and USER command
325 # or the final QUIT message
326 def emergency_puts(message)
327 @qmutex.synchronize do
328 # debug "In puts - got mutex"
329 puts_critical(message)
333 # get the next line from the server (blocks)
336 warning "socket get attempted while closed"
342 reply.strip! if reply
343 debug "RECV: #{reply.inspect}"
346 warning "socket get failed: #{e.inspect}"
347 debug e.backtrace.join("\n")
352 def queue(msg, chan=nil, ring=0)
354 @qmutex.synchronize do
355 @sendq.push msg, chan, ring
359 # just send it if queueing is disabled
360 self.emergency_puts(msg)
364 # pop a message off the queue, send it
366 @qmutex.synchronize do
374 if (now >= (@last_send + @sendq_delay))
375 # reset burst counter after @sendq_delay has passed
376 debug "resetting @burst"
378 elsif (@burst >= @sendq_burst)
379 # nope. can't send anything, come back to us next tick...
380 debug "can't send yet"
384 debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
385 (@sendq_burst - @burst).times do
386 break if @sendq.empty?
388 if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
389 debug "flood protection: sending message of length #{mess.length}"
390 debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
391 puts_critical(@sendq.shift)
393 debug "flood protection: throttling message of length #{mess.length}"
394 debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
403 error "Spooling failed: #{e.inspect}"
404 error e.backtrace.join("\n")
411 @qmutex.synchronize do
417 warning "Clearing socket while disconnected"
421 # flush the TCPSocket
426 # Wraps Kernel.select on the socket
427 def select(timeout=nil)
428 Kernel.select([@sock], nil, nil, timeout)
431 # shutdown the connection to the server
433 @sock.shutdown(how) unless @sock.nil?
440 # same as puts, but expects to be called with a mutex held on @qmutex
441 def puts_critical(message)
442 # debug "in puts_critical"
444 debug "SEND: #{message.inspect}"
446 error "SEND attempted on closed socket"
448 @sock.send(message + "\n",0)
449 @last_send = Time.new
452 run_throttle(message.length + 1)
455 error "SEND failed: #{e.inspect}"