+ added support for multiple servers to try (round-robin) for unreliable ircnets
[rbot] / lib / rbot / ircsocket.rb
1 class ::String
2   # Calculate the penalty which will be assigned to this message
3   # by the IRCd
4   def irc_send_penalty
5     # According to eggrdop, the initial penalty is
6     penalty = 1 + self.size/100
7     # on everything but UnderNET where it's
8     # penalty = 2 + self.size/120
9
10     cmd, pars = self.split($;,2)
11     debug "cmd: #{cmd}, pars: #{pars.inspect}"
12     case cmd.to_sym
13     when :KICK
14       chan, nick, msg = pars.split
15       chan = chan.split(',')
16       nick = nick.split(',')
17       penalty += nick.size
18       penalty *= chan.size
19     when :MODE
20       chan, modes, argument = pars.split
21       extra = 0
22       if modes
23         extra = 1
24         if argument
25           extra += modes.split(/\+|-/).size
26         else
27           extra += 3 * modes.split(/\+|-/).size
28         end
29       end
30       if argument
31         extra += 2 * argument.split.size
32       end
33       penalty += extra * chan.split.size
34     when :TOPIC
35       penalty += 1
36       penalty += 2 unless pars.split.size < 2
37     when :PRIVMSG, :NOTICE
38       dests = pars.split($;,2).first
39       penalty += dests.split(',').size
40     when :WHO
41       # I'm too lazy to implement this one correctly
42       penalty += 5
43     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
44       penalty += 2
45     when :INVITE, :NICK
46       penalty += 3
47     when :ISON
48       penalty += 1
49     else # Unknown messages
50       penalty += 1
51     end
52     if penalty > 99
53       debug "Wow, more than 99 secs of penalty!"
54       penalty = 99
55     end
56     if penalty < 2
57       debug "Wow, less than 2 secs of penalty!"
58       penalty = 2
59     end
60     debug "penalty: #{penalty}"
61     return penalty
62   end
63 end
64
65 module Irc
66
67   require 'socket'
68   require 'thread'
69   require 'rbot/timer'
70
71   class QueueRing
72     # A QueueRing is implemented as an array with elements in the form
73     # [chan, [message1, message2, ...]
74     # Note that the channel +chan+ has no actual bearing with the channels
75     # to which messages will be sent
76
77     def initialize
78       @storage = Array.new
79       @last_idx = -1
80     end
81
82     def clear
83       @storage.clear
84       @last_idx = -1
85     end
86
87     def length
88       len = 0
89       @storage.each {|c|
90         len += c[1].size
91       }
92       return len
93     end
94     alias :size :length
95
96     def empty?
97       @storage.empty?
98     end
99
100     def push(mess, chan)
101       cmess = @storage.assoc(chan)
102       if cmess
103         idx = @storage.index(cmess)
104         cmess[1] << mess
105         @storage[idx] = cmess
106       else
107         @storage << [chan, [mess]]
108       end
109     end
110
111     def next
112       if empty?
113         warning "trying to access empty ring"
114         return nil
115       end
116       save_idx = @last_idx
117       @last_idx = (@last_idx + 1) % @storage.size
118       mess = @storage[@last_idx][1].first
119       @last_idx = save_idx
120       return mess
121     end
122
123     def shift
124       if empty?
125         warning "trying to access empty ring"
126         return nil
127       end
128       @last_idx = (@last_idx + 1) % @storage.size
129       mess = @storage[@last_idx][1].shift
130       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
131       return mess
132     end
133
134   end
135
136   class MessageQueue
137     def initialize
138       # a MessageQueue is an array of QueueRings
139       # rings have decreasing priority, so messages in ring 0
140       # are more important than messages in ring 1, and so on
141       @rings = Array.new(3) { |i|
142         if i > 0
143           QueueRing.new
144         else
145           # ring 0 is special in that if it's not empty, it will
146           # be popped. IOW, ring 0 can starve the other rings
147           # ring 0 is strictly FIFO and is therefore implemented
148           # as an array
149           Array.new
150         end
151       }
152       # the other rings are satisfied round-robin
153       @last_ring = 0
154     end
155
156     def clear
157       @rings.each { |r|
158         r.clear
159       }
160       @last_ring = 0
161     end
162
163     def push(mess, chan=nil, cring=0)
164       ring = cring
165       if ring == 0
166         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
167         @rings[0] << mess
168       else
169         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
170         @rings[ring].push mess, chan
171       end
172     end
173
174     def empty?
175       @rings.each { |r|
176         return false unless r.empty?
177       }
178       return true
179     end
180
181     def length
182       len = 0
183       @rings.each { |r|
184         len += r.size
185       }
186       len
187     end
188     alias :size :length
189
190     def next
191       if empty?
192         warning "trying to access empty ring"
193         return nil
194       end
195       mess = nil
196       if !@rings[0].empty?
197         mess = @rings[0].first
198       else
199         save_ring = @last_ring
200         (@rings.size - 1).times {
201           @last_ring = (@last_ring % (@rings.size - 1)) + 1
202           if !@rings[@last_ring].empty?
203             mess = @rings[@last_ring].next
204             break
205           end
206         }
207         @last_ring = save_ring
208       end
209       error "nil message" if mess.nil?
210       return mess
211     end
212
213     def shift
214       if empty?
215         warning "trying to access empty ring"
216         return nil
217       end
218       mess = nil
219       if !@rings[0].empty?
220         return @rings[0].shift
221       end
222       (@rings.size - 1).times {
223         @last_ring = (@last_ring % (@rings.size - 1)) + 1
224         if !@rings[@last_ring].empty?
225           return @rings[@last_ring].shift
226         end
227       }
228       error "nil message" if mess.nil?
229       return mess
230     end
231
232   end
233
234   # wrapped TCPSocket for communication with the server.
235   # emulates a subset of TCPSocket functionality
236   class IrcSocket
237
238     MAX_IRC_SEND_PENALTY = 10
239
240     # total number of lines sent to the irc server
241     attr_reader :lines_sent
242
243     # total number of lines received from the irc server
244     attr_reader :lines_received
245
246     # total number of bytes sent to the irc server
247     attr_reader :bytes_sent
248
249     # total number of bytes received from the irc server
250     attr_reader :bytes_received
251
252     # accumulator for the throttle
253     attr_reader :throttle_bytes
254
255     # delay between lines sent
256     attr_reader :sendq_delay
257
258     # max lines to burst
259     attr_reader :sendq_burst
260
261     # an optional filter object. we call @filter.in(data) for
262     # all incoming data and @filter.out(data) for all outgoing data
263     attr_reader :filter
264
265     # normalized uri of the current server
266     attr_reader :server_uri
267
268     # default trivial filter class
269     class IdentityFilter
270         def in(x)
271             x
272         end
273
274         def out(x)
275             x
276         end
277     end
278
279     # set filter to identity, not to nil
280     def filter=(f)
281         @filter = f || IdentityFilter.new
282     end
283
284     # server_list:: list of servers to connect to
285     # host::   optional local host to bind to (ruby 1.7+ required)
286     # create a new IrcSocket
287     def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
288       @timer = Timer::Timer.new
289       @timer.add(0.2) do
290         spool
291       end
292       @server_list = server_list.dup
293       @server_uri = nil
294       @conn_count = 0
295       @host = host
296       @sock = nil
297       @filter = IdentityFilter.new
298       @spooler = false
299       @lines_sent = 0
300       @lines_received = 0
301       if opts.kind_of?(Hash) and opts.key?(:ssl)
302         @ssl = opts[:ssl]
303       else
304         @ssl = false
305       end
306
307       if sendq_delay
308         @sendq_delay = sendq_delay.to_f
309       else
310         @sendq_delay = 2
311       end
312       @last_send = Time.new - @sendq_delay
313       @flood_send = Time.new
314       @last_throttle = Time.new
315       @burst = 0
316       if sendq_burst
317         @sendq_burst = sendq_burst.to_i
318       else
319         @sendq_burst = 4
320       end
321     end
322
323     def connected?
324       !@sock.nil?
325     end
326
327     # open a TCP connection to the server
328     def connect
329       if connected?
330         warning "reconnecting while connected"
331         return
332       end
333       srv_uri = @server_list[@conn_count % @server_list.size].dup
334       srv_uri = 'irc://' + srv_uri if !srv_uri =~ /:\/\//
335       @conn_count += 1
336       @server_uri = URI.parse(srv_uri)
337       @server_uri.port = 6667 if !@server_uri.port
338       debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
339
340       if(@host)
341         begin
342           @sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
343         rescue ArgumentError => e
344           error "Your version of ruby does not support binding to a "
345           error "specific local address, please upgrade if you wish "
346           error "to use HOST = foo"
347           error "(this option has been disabled in order to continue)"
348           @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
349         end
350       else
351         @sock=TCPSocket.new(@server_uri.host, @server_uri.port)
352       end
353       if(@ssl)
354         require 'openssl'
355         ssl_context = OpenSSL::SSL::SSLContext.new()
356         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
357         @rawsock = @sock
358         @sock = OpenSSL::SSL::SSLSocket.new(@rawsock, ssl_context)
359         @sock.sync_close = true
360         @sock.connect
361       end
362       @qthread = false
363       @qmutex = Mutex.new
364       @sendq = MessageQueue.new
365     end
366
367     def sendq_delay=(newfreq)
368       debug "changing sendq frequency to #{newfreq}"
369       @qmutex.synchronize do
370         @sendq_delay = newfreq
371         if newfreq == 0
372           clearq
373           @timer.stop
374         else
375           @timer.start
376         end
377       end
378     end
379
380     def sendq_burst=(newburst)
381       @qmutex.synchronize do
382         @sendq_burst = newburst
383       end
384     end
385
386     # used to send lines to the remote IRCd by skipping the queue
387     # message: IRC message to send
388     # it should only be used for stuff that *must not* be queued,
389     # i.e. the initial PASS, NICK and USER command
390     # or the final QUIT message
391     def emergency_puts(message)
392       @qmutex.synchronize do
393         # debug "In puts - got mutex"
394         puts_critical(message)
395       end
396     end
397
398     def handle_socket_error(string, err)
399       error "#{string} failed: #{err.inspect}"
400       debug err.backtrace.join("\n")
401       # We assume that an error means that there are connection
402       # problems and that we should reconnect, so we
403       shutdown
404       raise SocketError.new(err.inspect)
405     end
406
407     # get the next line from the server (blocks)
408     def gets
409       if @sock.nil?
410         warning "socket get attempted while closed"
411         return nil
412       end
413       begin
414         reply = @filter.in(@sock.gets)
415         @lines_received += 1
416         reply.strip! if reply
417         debug "RECV: #{reply.inspect}"
418         return reply
419       rescue => e
420         handle_socket_error(:RECV, e)
421       end
422     end
423
424     def queue(msg, chan=nil, ring=0)
425       if @sendq_delay > 0
426         @qmutex.synchronize do
427           @sendq.push msg, chan, ring
428           @timer.start
429         end
430       else
431         # just send it if queueing is disabled
432         self.emergency_puts(msg)
433       end
434     end
435
436     # pop a message off the queue, send it
437     def spool
438       @qmutex.synchronize do
439         begin
440           debug "in spooler"
441           if @sendq.empty?
442             @timer.stop
443             return
444           end
445           now = Time.new
446           if (now >= (@last_send + @sendq_delay))
447             debug "resetting @burst"
448             @burst = 0
449           elsif (@burst > @sendq_burst)
450             # nope. can't send anything, come back to us next tick...
451             debug "can't send yet"
452             @timer.start
453             return
454           end
455           @flood_send = now if @flood_send < now
456           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.size} to send"
457           while !@sendq.empty? and @burst < @sendq_burst and @flood_send - now < MAX_IRC_SEND_PENALTY
458             debug "sending message (#{@flood_send - now} < #{MAX_IRC_SEND_PENALTY})"
459             puts_critical(@sendq.shift, true)
460           end
461           if @sendq.empty?
462             @timer.stop
463           end
464         rescue => e
465           error "Spooling failed: #{e.inspect}"
466           error e.backtrace.join("\n")
467         end
468       end
469     end
470
471     def clearq
472       if @sock
473         @qmutex.synchronize do
474           unless @sendq.empty?
475             @sendq.clear
476           end
477         end
478       else
479         warning "Clearing socket while disconnected"
480       end
481     end
482
483     # flush the TCPSocket
484     def flush
485       @sock.flush
486     end
487
488     # Wraps Kernel.select on the socket
489     def select(timeout=nil)
490       Kernel.select([@sock], nil, nil, timeout)
491     end
492
493     # shutdown the connection to the server
494     def shutdown(how=2)
495       return unless connected?
496       begin
497         @sock.close
498       rescue => err
499         error "error while shutting down: #{err.inspect}"
500         debug err.backtrace.join("\n")
501       end
502       @rawsock = nil if @ssl
503       @sock = nil
504       @burst = 0
505     end
506
507     private
508
509     # same as puts, but expects to be called with a mutex held on @qmutex
510     def puts_critical(message, penalty=false)
511       # debug "in puts_critical"
512       begin
513         debug "SEND: #{message.inspect}"
514         if @sock.nil?
515           error "SEND attempted on closed socket"
516         else
517           @sock.puts(@filter.out(message))
518           @last_send = Time.new
519           @flood_send += message.irc_send_penalty if penalty
520           @lines_sent += 1
521           @burst += 1
522         end
523       rescue => e
524         handle_socket_error(:SEND, e)
525       end
526     end
527
528   end
529
530 end