Minor messagemapper optimizations
[rbot] / lib / rbot / ircsocket.rb
1 class ::String
2   # Calculate the penalty which will be assigned to this message
3   # by the IRCd
4   def irc_send_penalty
5     # According to eggrdop, the initial penalty is
6     penalty = 1 + self.size/100
7     # on everything but UnderNET where it's
8     # penalty = 2 + self.size/120
9
10     cmd, pars = self.split($;,2)
11     debug "cmd: #{cmd}, pars: #{pars.inspect}"
12     case cmd.to_sym
13     when :KICK
14       chan, nick, msg = pars.split
15       chan = chan.split(',')
16       nick = nick.split(',')
17       penalty += nick.size
18       penalty *= chan.size
19     when :MODE
20       chan, modes, argument = pars.split
21       extra = 0
22       if modes
23         extra = 1
24         if argument
25           extra += modes.split(/\+|-/).size
26         else
27           extra += 3 * modes.split(/\+|-/).size
28         end
29       end
30       if argument
31         extra += 2 * argument.split.size
32       end
33       penalty += extra * chan.split.size
34     when :TOPIC
35       penalty += 1
36       penalty += 2 unless pars.split.size < 2
37     when :PRIVMSG, :NOTICE
38       dests = pars.split($;,2).first
39       penalty += dests.split(',').size
40     when :WHO
41       # I'm too lazy to implement this one correctly
42       penalty += 5
43     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
44       penalty += 2
45     when :INVITE, :NICK
46       penalty += 3
47     when :ISON
48       penalty += 1
49     else # Unknown messages
50       penalty += 1
51     end
52     if penalty > 99
53       debug "Wow, more than 99 secs of penalty!"
54       penalty = 99
55     end
56     if penalty < 2
57       debug "Wow, less than 2 secs of penalty!"
58       penalty = 2
59     end
60     debug "penalty: #{penalty}"
61     return penalty
62   end
63 end
64
65 module Irc
66
67   require 'socket'
68   require 'thread'
69   require 'rbot/timer'
70
71   class QueueRing
72     # A QueueRing is implemented as an array with elements in the form
73     # [chan, [message1, message2, ...]
74     # Note that the channel +chan+ has no actual bearing with the channels
75     # to which messages will be sent
76
77     def initialize
78       @storage = Array.new
79       @last_idx = -1
80     end
81
82     def clear
83       @storage.clear
84       @last_idx = -1
85     end
86
87     def length
88       len = 0
89       @storage.each {|c|
90         len += c[1].size
91       }
92       return len
93     end
94     alias :size :length
95
96     def empty?
97       @storage.empty?
98     end
99
100     def push(mess, chan)
101       cmess = @storage.assoc(chan)
102       if cmess
103         idx = @storage.index(cmess)
104         cmess[1] << mess
105         @storage[idx] = cmess
106       else
107         @storage << [chan, [mess]]
108       end
109     end
110
111     def next
112       if empty?
113         warning "trying to access empty ring"
114         return nil
115       end
116       save_idx = @last_idx
117       @last_idx = (@last_idx + 1) % @storage.size
118       mess = @storage[@last_idx][1].first
119       @last_idx = save_idx
120       return mess
121     end
122
123     def shift
124       if empty?
125         warning "trying to access empty ring"
126         return nil
127       end
128       @last_idx = (@last_idx + 1) % @storage.size
129       mess = @storage[@last_idx][1].shift
130       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
131       return mess
132     end
133
134   end
135
136   class MessageQueue
137     def initialize
138       # a MessageQueue is an array of QueueRings
139       # rings have decreasing priority, so messages in ring 0
140       # are more important than messages in ring 1, and so on
141       @rings = Array.new(3) { |i|
142         if i > 0
143           QueueRing.new
144         else
145           # ring 0 is special in that if it's not empty, it will
146           # be popped. IOW, ring 0 can starve the other rings
147           # ring 0 is strictly FIFO and is therefore implemented
148           # as an array
149           Array.new
150         end
151       }
152       # the other rings are satisfied round-robin
153       @last_ring = 0
154     end
155
156     def clear
157       @rings.each { |r|
158         r.clear
159       }
160       @last_ring = 0
161     end
162
163     def push(mess, chan=nil, cring=0)
164       ring = cring
165       if ring == 0
166         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
167         @rings[0] << mess
168       else
169         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
170         @rings[ring].push mess, chan
171       end
172     end
173
174     def empty?
175       @rings.each { |r|
176         return false unless r.empty?
177       }
178       return true
179     end
180
181     def length
182       len = 0
183       @rings.each { |r|
184         len += r.size
185       }
186       len
187     end
188     alias :size :length
189
190     def next
191       if empty?
192         warning "trying to access empty ring"
193         return nil
194       end
195       mess = nil
196       if !@rings[0].empty?
197         mess = @rings[0].first
198       else
199         save_ring = @last_ring
200         (@rings.size - 1).times {
201           @last_ring = (@last_ring % (@rings.size - 1)) + 1
202           if !@rings[@last_ring].empty?
203             mess = @rings[@last_ring].next
204             break
205           end
206         }
207         @last_ring = save_ring
208       end
209       error "nil message" if mess.nil?
210       return mess
211     end
212
213     def shift
214       if empty?
215         warning "trying to access empty ring"
216         return nil
217       end
218       mess = nil
219       if !@rings[0].empty?
220         return @rings[0].shift
221       end
222       (@rings.size - 1).times {
223         @last_ring = (@last_ring % (@rings.size - 1)) + 1
224         if !@rings[@last_ring].empty?
225           return @rings[@last_ring].shift
226         end
227       }
228       error "nil message" if mess.nil?
229       return mess
230     end
231
232   end
233
234   # wrapped TCPSocket for communication with the server.
235   # emulates a subset of TCPSocket functionality
236   class IrcSocket
237
238     MAX_IRC_SEND_PENALTY = 10
239
240     # total number of lines sent to the irc server
241     attr_reader :lines_sent
242
243     # total number of lines received from the irc server
244     attr_reader :lines_received
245
246     # total number of bytes sent to the irc server
247     attr_reader :bytes_sent
248
249     # total number of bytes received from the irc server
250     attr_reader :bytes_received
251
252     # accumulator for the throttle
253     attr_reader :throttle_bytes
254
255     # delay between lines sent
256     attr_reader :sendq_delay
257
258     # max lines to burst
259     attr_reader :sendq_burst
260
261     # server:: server to connect to
262     # port::   IRCd port
263     # host::   optional local host to bind to (ruby 1.7+ required)
264     # create a new IrcSocket
265     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, opts={})
266       @timer = Timer::Timer.new
267       @timer.add(0.2) do
268         spool
269       end
270       @server = server.dup
271       @port = port.to_i
272       @host = host
273       @sock = nil
274       @spooler = false
275       @lines_sent = 0
276       @lines_received = 0
277       if opts.kind_of?(Hash) and opts.key?(:ssl)
278         @ssl = opts[:ssl]
279       else
280         @ssl = false
281       end
282
283       if sendq_delay
284         @sendq_delay = sendq_delay.to_f
285       else
286         @sendq_delay = 2
287       end
288       @last_send = Time.new - @sendq_delay
289       @flood_send = Time.new
290       @last_throttle = Time.new
291       @burst = 0
292       if sendq_burst
293         @sendq_burst = sendq_burst.to_i
294       else
295         @sendq_burst = 4
296       end
297     end
298
299     def connected?
300       !@sock.nil?
301     end
302
303     # open a TCP connection to the server
304     def connect
305       if connected?
306         warning "reconnecting while connected"
307         return
308       end
309       if(@host)
310         begin
311           @sock=TCPSocket.new(@server, @port, @host)
312         rescue ArgumentError => e
313           error "Your version of ruby does not support binding to a "
314           error "specific local address, please upgrade if you wish "
315           error "to use HOST = foo"
316           error "(this option has been disabled in order to continue)"
317           @sock=TCPSocket.new(@server, @port)
318         end
319       else
320         @sock=TCPSocket.new(@server, @port)
321       end
322       if(@ssl)
323         require 'openssl'
324         ssl_context = OpenSSL::SSL::SSLContext.new()
325         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
326         @rawsock = @sock
327         @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
328         @sock.sync_close = true
329         @sock.connect
330       end
331       @qthread = false
332       @qmutex = Mutex.new
333       @sendq = MessageQueue.new
334     end
335
336     def sendq_delay=(newfreq)
337       debug "changing sendq frequency to #{newfreq}"
338       @qmutex.synchronize do
339         @sendq_delay = newfreq
340         if newfreq == 0
341           clearq
342           @timer.stop
343         else
344           @timer.start
345         end
346       end
347     end
348
349     def sendq_burst=(newburst)
350       @qmutex.synchronize do
351         @sendq_burst = newburst
352       end
353     end
354
355     # used to send lines to the remote IRCd by skipping the queue
356     # message: IRC message to send
357     # it should only be used for stuff that *must not* be queued,
358     # i.e. the initial PASS, NICK and USER command
359     # or the final QUIT message
360     def emergency_puts(message)
361       @qmutex.synchronize do
362         # debug "In puts - got mutex"
363         puts_critical(message)
364       end
365     end
366
367     def handle_socket_error(string, err)
368       error "#{string} failed: #{err.inspect}"
369       debug err.backtrace.join("\n")
370       # We assume that an error means that there are connection
371       # problems and that we should reconnect, so we
372       shutdown
373       raise SocketError.new(err.inspect)
374     end
375
376     # get the next line from the server (blocks)
377     def gets
378       if @sock.nil?
379         warning "socket get attempted while closed"
380         return nil
381       end
382       begin
383         reply = @sock.gets
384         @lines_received += 1
385         reply.strip! if reply
386         debug "RECV: #{reply.inspect}"
387         return reply
388       rescue => e
389         handle_socket_error(:RECV, e)
390       end
391     end
392
393     def queue(msg, chan=nil, ring=0)
394       if @sendq_delay > 0
395         @qmutex.synchronize do
396           @sendq.push msg, chan, ring
397           @timer.start
398         end
399       else
400         # just send it if queueing is disabled
401         self.emergency_puts(msg)
402       end
403     end
404
405     # pop a message off the queue, send it
406     def spool
407       @qmutex.synchronize do
408         begin
409           debug "in spooler"
410           if @sendq.empty?
411             @timer.stop
412             return
413           end
414           now = Time.new
415           if (now >= (@last_send + @sendq_delay))
416             debug "resetting @burst"
417             @burst = 0
418           elsif (@burst > @sendq_burst)
419             # nope. can't send anything, come back to us next tick...
420             debug "can't send yet"
421             @timer.start
422             return
423           end
424           @flood_send = now if @flood_send < now
425           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
426           while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
427             debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
428             puts_critical(@sendq.shift, true)
429           end
430           if @sendq.empty?
431             @timer.stop
432           end
433         rescue => e
434           error "Spooling failed: #{e.inspect}"
435           error e.backtrace.join("\n")
436         end
437       end
438     end
439
440     def clearq
441       if @sock
442         @qmutex.synchronize do
443           unless @sendq.empty?
444             @sendq.clear
445           end
446         end
447       else
448         warning "Clearing socket while disconnected"
449       end
450     end
451
452     # flush the TCPSocket
453     def flush
454       @sock.flush
455     end
456
457     # Wraps Kernel.select on the socket
458     def select(timeout=nil)
459       Kernel.select([@sock], nil, nil, timeout)
460     end
461
462     # shutdown the connection to the server
463     def shutdown(how=2)
464       return unless connected?
465       begin
466         @sock.close
467       rescue => err
468         error "error while shutting down: #{err.inspect}"
469         debug err.backtrace.join("\n")
470       end
471       @rawsock = nil if @ssl
472       @sock = nil
473       @burst = 0
474     end
475
476     private
477
478     # same as puts, but expects to be called with a mutex held on @qmutex
479     def puts_critical(message, penalty=false)
480       # debug "in puts_critical"
481       begin
482         debug "SEND: #{message.inspect}"
483         if @sock.nil?
484           error "SEND attempted on closed socket"
485         else
486           @sock.puts message
487           @last_send = Time.new
488           @flood_send += message.irc_send_penalty if penalty
489           @lines_sent += 1
490           @burst += 1
491         end
492       rescue => e
493         handle_socket_error(:SEND, e)
494       end
495     end
496
497   end
498
499 end