utils: URI fragment is sometimes found in id attribute to A tag
[rbot] / lib / rbot / ircsocket.rb
1 require 'monitor'
2
3 class ::String
4   # Calculate the penalty which will be assigned to this message
5   # by the IRCd
6   def irc_send_penalty
7     # According to eggrdop, the initial penalty is
8     penalty = 1 + self.size/100
9     # on everything but UnderNET where it's
10     # penalty = 2 + self.size/120
11
12     cmd, pars = self.split($;,2)
13     debug "cmd: #{cmd}, pars: #{pars.inspect}"
14     case cmd.to_sym
15     when :KICK
16       chan, nick, msg = pars.split
17       chan = chan.split(',')
18       nick = nick.split(',')
19       penalty += nick.size
20       penalty *= chan.size
21     when :MODE
22       chan, modes, argument = pars.split
23       extra = 0
24       if modes
25         extra = 1
26         if argument
27           extra += modes.split(/\+|-/).size
28         else
29           extra += 3 * modes.split(/\+|-/).size
30         end
31       end
32       if argument
33         extra += 2 * argument.split.size
34       end
35       penalty += extra * chan.split.size
36     when :TOPIC
37       penalty += 1
38       penalty += 2 unless pars.split.size < 2
39     when :PRIVMSG, :NOTICE
40       dests = pars.split($;,2).first
41       penalty += dests.split(',').size
42     when :WHO
43       # I'm too lazy to implement this one correctly
44       penalty += 5
45     when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
46       penalty += 2
47     when :INVITE, :NICK
48       penalty += 3
49     when :ISON
50       penalty += 1
51     else # Unknown messages
52       penalty += 1
53     end
54     if penalty > 99
55       debug "Wow, more than 99 secs of penalty!"
56       penalty = 99
57     end
58     if penalty < 2
59       debug "Wow, less than 2 secs of penalty!"
60       penalty = 2
61     end
62     debug "penalty: #{penalty}"
63     return penalty
64   end
65 end
66
67 module Irc
68
69   require 'socket'
70   require 'thread'
71
72   class QueueRing
73     # A QueueRing is implemented as an array with elements in the form
74     # [chan, [message1, message2, ...]
75     # Note that the channel +chan+ has no actual bearing with the channels
76     # to which messages will be sent
77
78     def initialize
79       @storage = Array.new
80       @last_idx = -1
81     end
82
83     def clear
84       @storage.clear
85       @last_idx = -1
86     end
87
88     def length
89       len = 0
90       @storage.each {|c|
91         len += c[1].size
92       }
93       return len
94     end
95     alias :size :length
96
97     def empty?
98       @storage.empty?
99     end
100
101     def push(mess, chan)
102       cmess = @storage.assoc(chan)
103       if cmess
104         idx = @storage.index(cmess)
105         cmess[1] << mess
106         @storage[idx] = cmess
107       else
108         @storage << [chan, [mess]]
109       end
110     end
111
112     def next
113       if empty?
114         warning "trying to access empty ring"
115         return nil
116       end
117       save_idx = @last_idx
118       @last_idx = (@last_idx + 1) % @storage.size
119       mess = @storage[@last_idx][1].first
120       @last_idx = save_idx
121       return mess
122     end
123
124     def shift
125       if empty?
126         warning "trying to access empty ring"
127         return nil
128       end
129       @last_idx = (@last_idx + 1) % @storage.size
130       mess = @storage[@last_idx][1].shift
131       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
132       return mess
133     end
134
135   end
136
137   class MessageQueue
138
139     def initialize
140       # a MessageQueue is an array of QueueRings
141       # rings have decreasing priority, so messages in ring 0
142       # are more important than messages in ring 1, and so on
143       @rings = Array.new(3) { |i|
144         if i > 0
145           QueueRing.new
146         else
147           # ring 0 is special in that if it's not empty, it will
148           # be popped. IOW, ring 0 can starve the other rings
149           # ring 0 is strictly FIFO and is therefore implemented
150           # as an array
151           Array.new
152         end
153       }
154       # the other rings are satisfied round-robin
155       @last_ring = 0
156       self.extend(MonitorMixin)
157       @non_empty = self.new_cond
158     end
159
160     def clear
161       self.synchronize do
162         @rings.each { |r| r.clear }
163         @last_ring = 0
164       end
165     end
166
167     def push(mess, chan=nil, cring=0)
168       ring = cring
169       self.synchronize do
170         if ring == 0
171           warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
172           @rings[0] << mess
173         else
174           error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
175           @rings[ring].push mess, chan
176         end
177         @non_empty.signal
178       end
179     end
180
181     def shift(tmout = nil)
182       self.synchronize do
183         @non_empty.wait(tmout) if self.empty?
184         return unsafe_shift
185       end
186     end
187
188     protected
189
190     def empty?
191       !@rings.find { |r| !r.empty? }
192     end
193
194     def length
195       @rings.inject(0) { |s, r| s + r.size }
196     end
197     alias :size :length
198
199     def unsafe_shift
200       if !@rings[0].empty?
201         return @rings[0].shift
202       end
203       (@rings.size - 1).times do
204         @last_ring = (@last_ring % (@rings.size - 1)) + 1
205         return @rings[@last_ring].shift unless @rings[@last_ring].empty?
206       end
207       warning "trying to access an empty message queue"
208       return nil
209     end
210
211   end
212
213   # wrapped TCPSocket for communication with the server.
214   # emulates a subset of TCPSocket functionality
215   class Socket
216
217     MAX_IRC_SEND_PENALTY = 10
218
219     # total number of lines sent to the irc server
220     attr_reader :lines_sent
221
222     # total number of lines received from the irc server
223     attr_reader :lines_received
224
225     # total number of bytes sent to the irc server
226     attr_reader :bytes_sent
227
228     # total number of bytes received from the irc server
229     attr_reader :bytes_received
230
231     # accumulator for the throttle
232     attr_reader :throttle_bytes
233
234     # delay between lines sent
235     attr_accessor :sendq_delay
236
237     # max lines to burst
238     attr_accessor :sendq_burst
239
240     # an optional filter object. we call @filter.in(data) for
241     # all incoming data and @filter.out(data) for all outgoing data
242     attr_reader :filter
243
244     # normalized uri of the current server
245     attr_reader :server_uri
246
247     # default trivial filter class
248     class IdentityFilter
249         def in(x)
250             x
251         end
252
253         def out(x)
254             x
255         end
256     end
257
258     # set filter to identity, not to nil
259     def filter=(f)
260         @filter = f || IdentityFilter.new
261     end
262
263     # server_list:: list of servers to connect to
264     # host::   optional local host to bind to (ruby 1.7+ required)
265     # create a new Irc::Socket
266     def initialize(server_list, host, sendq_delay=2, sendq_burst=4, opts={})
267       @server_list = server_list.dup
268       @server_uri = nil
269       @conn_count = 0
270       @host = host
271       @sock = nil
272       @filter = IdentityFilter.new
273       @spooler = false
274       @lines_sent = 0
275       @lines_received = 0
276       if opts.kind_of?(Hash) and opts.key?(:ssl)
277         @ssl = opts[:ssl]
278       else
279         @ssl = false
280       end
281
282       if sendq_delay
283         @sendq_delay = sendq_delay.to_f
284       else
285         @sendq_delay = 2
286       end
287       if sendq_burst
288         @sendq_burst = sendq_burst.to_i
289       else
290         @sendq_burst = 4
291       end
292     end
293
294     def connected?
295       !@sock.nil?
296     end
297
298     # open a TCP connection to the server
299     def connect
300       if connected?
301         warning "reconnecting while connected"
302         return
303       end
304       srv_uri = @server_list[@conn_count % @server_list.size].dup
305       srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
306       @conn_count += 1
307       @server_uri = URI.parse(srv_uri)
308       @server_uri.port = 6667 if !@server_uri.port
309       debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
310
311       if(@host)
312         begin
313           sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
314         rescue ArgumentError => e
315           error "Your version of ruby does not support binding to a "
316           error "specific local address, please upgrade if you wish "
317           error "to use HOST = foo"
318           error "(this option has been disabled in order to continue)"
319           sock=TCPSocket.new(@server_uri.host, @server_uri.port)
320         end
321       else
322         sock=TCPSocket.new(@server_uri.host, @server_uri.port)
323       end
324       if(@ssl)
325         require 'openssl'
326         ssl_context = OpenSSL::SSL::SSLContext.new()
327         ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
328         sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
329         sock.sync_close = true
330         sock.connect
331       end
332       @sock = sock
333       @last_send = Time.new - @sendq_delay
334       @flood_send = Time.new
335       @last_throttle = Time.new
336       @burst = 0
337       @sock.extend(MonitorMixin)
338       @sendq = MessageQueue.new
339       @qthread = Thread.new { writer_loop }
340     end
341
342     # used to send lines to the remote IRCd by skipping the queue
343     # message: IRC message to send
344     # it should only be used for stuff that *must not* be queued,
345     # i.e. the initial PASS, NICK and USER command
346     # or the final QUIT message
347     def emergency_puts(message, penalty = false)
348       @sock.synchronize do
349         # debug "In puts - got @sock"
350         puts_critical(message, penalty)
351       end
352     end
353
354     def handle_socket_error(string, e)
355       error "#{string} failed: #{e.pretty_inspect}"
356       # We assume that an error means that there are connection
357       # problems and that we should reconnect, so we
358       shutdown
359       raise SocketError.new(e.inspect)
360     end
361
362     # get the next line from the server (blocks)
363     def gets
364       if @sock.nil?
365         warning "socket get attempted while closed"
366         return nil
367       end
368       begin
369         reply = @filter.in(@sock.gets)
370         @lines_received += 1
371         reply.strip! if reply
372         debug "RECV: #{reply.inspect}"
373         return reply
374       rescue Exception => e
375         handle_socket_error(:RECV, e)
376       end
377     end
378
379     def queue(msg, chan=nil, ring=0)
380       @sendq.push msg, chan, ring
381     end
382
383     def clearq
384       @sendq.clear
385     end
386
387     # flush the TCPSocket
388     def flush
389       @sock.flush
390     end
391
392     # Wraps Kernel.select on the socket
393     def select(timeout=nil)
394       Kernel.select([@sock], nil, nil, timeout)
395     end
396
397     # shutdown the connection to the server
398     def shutdown(how=2)
399       return unless connected?
400       @qthread.kill
401       @qthread = nil
402       begin
403         @sock.close
404       rescue Exception => e
405         error "error while shutting down: #{e.pretty_inspect}"
406       end
407       @sock = nil
408       @burst = 0
409       @sendq.clear
410     end
411
412     private
413
414     def writer_loop
415       loop do
416         # we could wait for the message, then calculate the delay and sleep
417         # if necessary. however, if high-priority message is enqueued while
418         # we sleep, it won't be the first to go out when the sleep is over.
419         # thus, we have to call Time.now() twice, once to calculate the delay
420         # and once to adjust @burst / @flood_send.
421         begin
422           now = Time.now
423           if @sendq_delay > 0
424             burst_delay = 0
425             if @burst > @sendq_burst
426               burst_delay = @last_send + @sendq_delay - now
427             end
428
429             flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
430             delay = [burst_delay, flood_delay, 0].max
431             if delay > 0
432               debug "sleep(#{delay}) # (f: #{flood_delay}, b: #{burst_delay})"
433               sleep(delay) 
434             end
435           end
436           msg = @sendq.shift
437           now = Time.now
438           @flood_send = now if @flood_send < now
439           @burst = 0 if @last_send + @sendq_delay < now
440           debug "got #{msg.inspect} from queue, sending"
441           emergency_puts(msg, true)
442         rescue Exception => e
443           error "Spooling failed: #{e.pretty_inspect}"
444           debug e.backtrace.join("\n")
445           raise e
446         end
447       end
448     end
449
450     # same as puts, but expects to be called with a lock held on @sock
451     def puts_critical(message, penalty=false)
452       # debug "in puts_critical"
453       begin
454         debug "SEND: #{message.inspect}"
455         if @sock.nil?
456           error "SEND attempted on closed socket"
457         else
458           @sock.puts(@filter.out(message))
459           @last_send = Time.new
460           @flood_send += message.irc_send_penalty if penalty
461           @lines_sent += 1
462           @burst += 1
463         end
464       rescue Exception => e
465         handle_socket_error(:SEND, e)
466       end
467     end
468
469   end
470
471 end