Shrink the penalty calc for Gentoo.
[rbot] / lib / rbot / ircsocket.rb
1 #-- vim:sw=2:et
2 #++
3 #
4 # :title: IRC Socket
5 #
6 # This module implements the IRC socket interface, including IRC message
7 # penalty computation and the message queue system
8
9 require 'monitor'
10
11 class ::String
12   # Calculate the penalty which will be assigned to this message
13   # by the IRCd
14   def irc_send_penalty
15     penalty_min = 1
16     penalty_max = 99
17     # According to eggdrop, the initial penalty is
18     penalty = 1 + self.size/200
19     # on everything but UnderNET where it's
20     # penalty = 2 + self.size/120
21
22     cmd, pars = self.split($;,2)
23     debug "cmd: #{cmd}, pars: #{pars.inspect}"
24     case cmd.to_sym
25     when :KICK
26       chan, nick, msg = pars.split
27       chan = chan.split(',')
28       nick = nick.split(',')
29       penalty += nick.size
30       penalty *= chan.size
31     when :MODE
32       chan, modes, argument = pars.split
33       extra = 0
34       if modes
35         extra = 1
36         if argument
37           extra += modes.split(/\+|-/).size
38         else
39           extra += 3 * modes.split(/\+|-/).size
40         end
41       end
42       if argument
43         extra += 2 * argument.split.size
44       end
45       penalty += extra * chan.split.size
46     when :TOPIC
47       penalty += 1
48       penalty += 2 unless pars.split.size < 2
49     when :PRIVMSG, :NOTICE
50       dests = pars.split($;,2).first
51       penalty += dests.split(',').size
52     when :WHO
53       # I'm too lazy to implement this one correctly
54       penalty += 1
55     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
56       penalty += 1
57     when :INVITE, :NICK
58       penalty += 3
59     when :ISON
60       penalty += 1
61     else # Unknown messages
62       penalty += 1
63     end
64     if penalty > penalty_max
65       debug "Wow, more than #{penalty_max} secs of penalty!"
66       penalty = penalty_max
67     end
68     if penalty < penalty_min
69       debug "Wow, less than #{penalty_min} secs of penalty!"
70       penalty = penalty_min
71     end
72     debug "penalty: #{penalty}"
73     return penalty
74   end
75 end
76
77 module Irc
78
79   require 'socket'
80   require 'thread'
81
82   class QueueRing
83     # A QueueRing is implemented as an array with elements in the form
84     # [chan, [message1, message2, ...]
85     # Note that the channel +chan+ has no actual bearing with the channels
86     # to which messages will be sent
87
88     def initialize
89       @storage = Array.new
90       @last_idx = -1
91     end
92
93     def clear
94       @storage.clear
95       @last_idx = -1
96     end
97
98     def length
99       len = 0
100       @storage.each {|c|
101         len += c[1].size
102       }
103       return len
104     end
105     alias :size :length
106
107     def empty?
108       @storage.empty?
109     end
110
111     def push(mess, chan)
112       cmess = @storage.assoc(chan)
113       if cmess
114         idx = @storage.index(cmess)
115         cmess[1] << mess
116         @storage[idx] = cmess
117       else
118         @storage << [chan, [mess]]
119       end
120     end
121
122     def next
123       if empty?
124         warning "trying to access empty ring"
125         return nil
126       end
127       save_idx = @last_idx
128       @last_idx = (@last_idx + 1) % @storage.size
129       mess = @storage[@last_idx][1].first
130       @last_idx = save_idx
131       return mess
132     end
133
134     def shift
135       if empty?
136         warning "trying to access empty ring"
137         return nil
138       end
139       @last_idx = (@last_idx + 1) % @storage.size
140       mess = @storage[@last_idx][1].shift
141       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
142       return mess
143     end
144
145   end
146
147   class MessageQueue
148
149     def initialize
150       # a MessageQueue is an array of QueueRings
151       # rings have decreasing priority, so messages in ring 0
152       # are more important than messages in ring 1, and so on
153       @rings = Array.new(3) { |i|
154         if i > 0
155           QueueRing.new
156         else
157           # ring 0 is special in that if it's not empty, it will
158           # be popped. IOW, ring 0 can starve the other rings
159           # ring 0 is strictly FIFO and is therefore implemented
160           # as an array
161           Array.new
162         end
163       }
164       # the other rings are satisfied round-robin
165       @last_ring = 0
166       self.extend(MonitorMixin)
167       @non_empty = self.new_cond
168     end
169
170     def clear
171       self.synchronize do
172         @rings.each { |r| r.clear }
173         @last_ring = 0
174       end
175     end
176
177     def push(mess, chan=nil, cring=0)
178       ring = cring
179       self.synchronize do
180         if ring == 0
181           warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
182           @rings[0] << mess
183         else
184           error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
185           @rings[ring].push mess, chan
186         end
187         @non_empty.signal
188       end
189     end
190
191     def shift(tmout = nil)
192       self.synchronize do
193         @non_empty.wait(tmout) if self.empty?
194         return unsafe_shift
195       end
196     end
197
198     protected
199
200     def empty?
201       !@rings.find { |r| !r.empty? }
202     end
203
204     def length
205       @rings.inject(0) { |s, r| s + r.size }
206     end
207     alias :size :length
208
209     def unsafe_shift
210       if !@rings[0].empty?
211         return @rings[0].shift
212       end
213       (@rings.size - 1).times do
214         @last_ring = (@last_ring % (@rings.size - 1)) + 1
215         return @rings[@last_ring].shift unless @rings[@last_ring].empty?
216       end
217       warning "trying to access an empty message queue"
218       return nil
219     end
220
221   end
222
223   # wrapped TCPSocket for communication with the server.
224   # emulates a subset of TCPSocket functionality
225   class Socket
226
227     MAX_IRC_SEND_PENALTY = 10
228
229     # total number of lines sent to the irc server
230     attr_reader :lines_sent
231
232     # total number of lines received from the irc server
233     attr_reader :lines_received
234
235     # total number of bytes sent to the irc server
236     attr_reader :bytes_sent
237
238     # total number of bytes received from the irc server
239     attr_reader :bytes_received
240
241     # accumulator for the throttle
242     attr_reader :throttle_bytes
243
244     # an optional filter object. we call @filter.in(data) for
245     # all incoming data and @filter.out(data) for all outgoing data
246     attr_reader :filter
247
248     # normalized uri of the current server
249     attr_reader :server_uri
250
251     # default trivial filter class
252     class IdentityFilter
253         def in(x)
254             x
255         end
256
257         def out(x)
258             x
259         end
260     end
261
262     # set filter to identity, not to nil
263     def filter=(f)
264         @filter = f || IdentityFilter.new
265     end
266
267     # server_list:: list of servers to connect to
268     # host::   optional local host to bind to (ruby 1.7+ required)
269     # create a new Irc::Socket
270     def initialize(server_list, host, opts={})
271       @server_list = server_list.dup
272       @server_uri = nil
273       @conn_count = 0
274       @host = host
275       @sock = nil
276       @filter = IdentityFilter.new
277       @spooler = false
278       @lines_sent = 0
279       @lines_received = 0
280       if opts.kind_of?(Hash) and opts.key?(:ssl)
281         @ssl = opts[:ssl]
282       else
283         @ssl = false
284       end
285     end
286
287     def connected?
288       !@sock.nil?
289     end
290
291     # open a TCP connection to the server
292     def connect
293       if connected?
294         warning "reconnecting while connected"
295         return
296       end
297       srv_uri = @server_list[@conn_count % @server_list.size].dup
298       srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
299       @conn_count += 1
300       @server_uri = URI.parse(srv_uri)
301       @server_uri.port = 6667 if !@server_uri.port
302       debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
303
304       if(@host)
305         begin
306           sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
307         rescue ArgumentError => e
308           error "Your version of ruby does not support binding to a "
309           error "specific local address, please upgrade if you wish "
310           error "to use HOST = foo"
311           error "(this option has been disabled in order to continue)"
312           sock=TCPSocket.new(@server_uri.host, @server_uri.port)
313         end
314       else
315         sock=TCPSocket.new(@server_uri.host, @server_uri.port)
316       end
317       if(@ssl)
318         require 'openssl'
319         ssl_context = OpenSSL::SSL::SSLContext.new()
320         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
321         sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
322         sock.sync_close = true
323         sock.connect
324       end
325       @sock = sock
326       @last_send = Time.new
327       @flood_send = Time.new
328       @burst = 0
329       @sock.extend(MonitorMixin)
330       @sendq = MessageQueue.new
331       @qthread = Thread.new { writer_loop }
332     end
333
334     # used to send lines to the remote IRCd by skipping the queue
335     # message: IRC message to send
336     # it should only be used for stuff that *must not* be queued,
337     # i.e. the initial PASS, NICK and USER command
338     # or the final QUIT message
339     def emergency_puts(message, penalty = false)
340       @sock.synchronize do
341         # debug "In puts - got @sock"
342         puts_critical(message, penalty)
343       end
344     end
345
346     def handle_socket_error(string, e)
347       error "#{string} failed: #{e.pretty_inspect}"
348       # We assume that an error means that there are connection
349       # problems and that we should reconnect, so we
350       shutdown
351       raise SocketError.new(e.inspect)
352     end
353
354     # get the next line from the server (blocks)
355     def gets
356       if @sock.nil?
357         warning "socket get attempted while closed"
358         return nil
359       end
360       begin
361         reply = @filter.in(@sock.gets)
362         @lines_received += 1
363         reply.strip! if reply
364         debug "RECV: #{reply.inspect}"
365         return reply
366       rescue Exception => e
367         handle_socket_error(:RECV, e)
368       end
369     end
370
371     def queue(msg, chan=nil, ring=0)
372       @sendq.push msg, chan, ring
373     end
374
375     def clearq
376       @sendq.clear
377     end
378
379     # flush the TCPSocket
380     def flush
381       @sock.flush
382     end
383
384     # Wraps Kernel.select on the socket
385     def select(timeout=nil)
386       Kernel.select([@sock], nil, nil, timeout)
387     end
388
389     # shutdown the connection to the server
390     def shutdown(how=2)
391       return unless connected?
392       @qthread.kill
393       @qthread = nil
394       begin
395         @sock.close
396       rescue Exception => e
397         error "error while shutting down: #{e.pretty_inspect}"
398       end
399       @sock = nil
400       @sendq.clear
401     end
402
403     private
404
405     def writer_loop
406       loop do
407         begin
408           now = Time.now
409           flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
410           delay = [flood_delay, 0].max
411           if delay > 0
412             debug "sleep(#{delay}) # (f: #{flood_delay})"
413             sleep(delay)
414           end
415           msg = @sendq.shift
416           debug "got #{msg.inspect} from queue, sending"
417           emergency_puts(msg, true)
418         rescue Exception => e
419           error "Spooling failed: #{e.pretty_inspect}"
420           debug e.backtrace.join("\n")
421           raise e
422         end
423       end
424     end
425
426     # same as puts, but expects to be called with a lock held on @sock
427     def puts_critical(message, penalty=false)
428       # debug "in puts_critical"
429       begin
430         debug "SEND: #{message.inspect}"
431         if @sock.nil?
432           error "SEND attempted on closed socket"
433         else
434           # we use Socket#syswrite() instead of Socket#puts() because
435           # the latter is racy and can cause double message output in
436           # some circumstances
437           actual = @filter.out(message) + "\n"
438           now = Time.new
439           @sock.syswrite actual
440           @last_send = now
441           @flood_send = now if @flood_send < now
442           @flood_send += message.irc_send_penalty if penalty
443           @lines_sent += 1
444         end
445       rescue Exception => e
446         handle_socket_error(:SEND, e)
447       end
448     end
449
450   end
451
452 end