Minor tweaks to httputil: make last response available in @last_resp for get and...
[rbot] / lib / rbot / ircsocket.rb
1 module Irc
2
3   require 'socket'
4   require 'thread'
5   require 'rbot/timer'
6
7   class QueueRing
8     # A QueueRing is implemented as an array with elements in the form
9     # [chan, [message1, message2, ...]
10     # Note that the channel +chan+ has no actual bearing with the channels
11     # to which messages will be sent
12
13     def initialize
14       @storage = Array.new
15       @last_idx = -1
16     end
17
18     def clear
19       @storage.clear
20       @last_idx = -1
21     end
22
23     def length
24       length = 0
25       @storage.each {|c|
26         length += c[1].length 
27       }
28       return length
29     end
30
31     def empty?
32       @storage.empty?
33     end
34
35     def push(mess, chan)
36       cmess = @storage.assoc(chan)
37       if cmess
38         idx = @storage.index(cmess)
39         cmess[1] << mess
40         @storage[idx] = cmess
41       else
42         @storage << [chan, [mess]]
43       end
44     end
45
46     def next
47       if empty?
48         warning "trying to access empty ring"
49         return nil
50       end
51       save_idx = @last_idx
52       @last_idx = (@last_idx + 1) % @storage.length
53       mess = @storage[@last_idx][1].first
54       @last_idx = save_idx
55       return mess
56     end
57
58     def shift
59       if empty?
60         warning "trying to access empty ring"
61         return nil
62       end
63       @last_idx = (@last_idx + 1) % @storage.length
64       mess = @storage[@last_idx][1].shift
65       @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
66       return mess
67     end
68
69   end
70
71   class MessageQueue
72     def initialize
73       # a MessageQueue is an array of QueueRings
74       # rings have decreasing priority, so messages in ring 0
75       # are more important than messages in ring 1, and so on
76       @rings = Array.new(3) { |i|
77         if i > 0
78           QueueRing.new
79         else
80           # ring 0 is special in that if it's not empty, it will
81           # be popped. IOW, ring 0 can starve the other rings
82           # ring 0 is strictly FIFO and is therefore implemented
83           # as an array
84           Array.new
85         end
86       }
87       # the other rings are satisfied round-robin
88       @last_ring = 0
89     end
90
91     def clear
92       @rings.each { |r|
93         r.clear
94       }
95       @last_ring = 0
96     end
97
98     def push(mess, chan=nil, cring=0)
99       ring = cring
100       if ring == 0
101         warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
102         @rings[0] << mess
103       else
104         error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
105         @rings[ring].push mess, chan
106       end
107     end
108
109     def empty?
110       @rings.each { |r|
111         return false unless r.empty?
112       }
113       return true
114     end
115
116     def length
117       len = 0
118       @rings.each { |r|
119         len += r.length
120       }
121       len
122     end
123
124     def next
125       if empty?
126         warning "trying to access empty ring"
127         return nil
128       end
129       mess = nil
130       if !@rings[0].empty?
131         mess = @rings[0].first
132       else
133         save_ring = @last_ring
134         (@rings.length - 1).times {
135           @last_ring = (@last_ring % (@rings.length - 1)) + 1
136           if !@rings[@last_ring].empty?
137             mess = @rings[@last_ring].next
138             break
139           end
140         }
141         @last_ring = save_ring
142       end
143       error "nil message" if mess.nil?
144       return mess
145     end
146
147     def shift
148       if empty?
149         warning "trying to access empty ring"
150         return nil
151       end
152       mess = nil
153       if !@rings[0].empty?
154         return @rings[0].shift
155       end
156       (@rings.length - 1).times {
157         @last_ring = (@last_ring % (@rings.length - 1)) + 1
158         if !@rings[@last_ring].empty?
159           return @rings[@last_ring].shift
160         end
161       }
162       error "nil message" if mess.nil?
163       return mess
164     end
165
166   end
167
168   # wrapped TCPSocket for communication with the server.
169   # emulates a subset of TCPSocket functionality
170   class IrcSocket
171     # total number of lines sent to the irc server
172     attr_reader :lines_sent
173
174     # total number of lines received from the irc server
175     attr_reader :lines_received
176
177     # total number of bytes sent to the irc server
178     attr_reader :bytes_sent
179
180     # total number of bytes received from the irc server
181     attr_reader :bytes_received
182
183     # accumulator for the throttle
184     attr_reader :throttle_bytes
185
186     # byterate components
187     attr_reader :bytes_per
188     attr_reader :seconds_per
189
190     # delay between lines sent
191     attr_reader :sendq_delay
192
193     # max lines to burst
194     attr_reader :sendq_burst
195
196     # server:: server to connect to
197     # port::   IRCd port
198     # host::   optional local host to bind to (ruby 1.7+ required)
199     # create a new IrcSocket
200     def initialize(server, port, host, sendq_delay=2, sendq_burst=4, brt="400/2")
201       @timer = Timer::Timer.new
202       @timer.add(0.2) do
203         spool
204       end
205       @server = server.dup
206       @port = port.to_i
207       @host = host
208       @sock = nil
209       @spooler = false
210       @lines_sent = 0
211       @lines_received = 0
212       if sendq_delay
213         @sendq_delay = sendq_delay.to_f
214       else
215         @sendq_delay = 2
216       end
217       @last_send = Time.new - @sendq_delay
218       @last_throttle = Time.new
219       @burst = 0
220       if sendq_burst
221         @sendq_burst = sendq_burst.to_i
222       else
223         @sendq_burst = 4
224       end
225       @bytes_per = 400
226       @seconds_per = 2
227       @throttle_bytes = 0
228       @throttle_div = 1
229       setbyterate(brt)
230     end
231
232     def setbyterate(brt)
233       if brt.match(/(\d+)\/(\d)/)
234         @bytes_per = $1.to_i
235         @seconds_per = $2.to_i
236         debug "Byterate now #{byterate}"
237         return true
238       else
239         debug "Couldn't set byterate #{brt}"
240         return false
241       end
242     end
243
244     def connected?
245       !@sock.nil?
246     end
247
248     # open a TCP connection to the server
249     def connect
250       if connected?
251         warning "reconnecting while connected"
252         return
253       end
254       if(@host)
255         begin
256           @sock=TCPSocket.new(@server, @port, @host)
257         rescue ArgumentError => e
258           error "Your version of ruby does not support binding to a "
259           error "specific local address, please upgrade if you wish "
260           error "to use HOST = foo"
261           error "(this option has been disabled in order to continue)"
262           @sock=TCPSocket.new(@server, @port)
263         end
264       else
265         @sock=TCPSocket.new(@server, @port)
266       end
267       @qthread = false
268       @qmutex = Mutex.new
269       @sendq = MessageQueue.new
270     end
271
272     def sendq_delay=(newfreq)
273       debug "changing sendq frequency to #{newfreq}"
274       @qmutex.synchronize do
275         @sendq_delay = newfreq
276         if newfreq == 0
277           clearq
278           @timer.stop
279         else
280           @timer.start
281         end
282       end
283     end
284
285     def sendq_burst=(newburst)
286       @qmutex.synchronize do
287         @sendq_burst = newburst
288       end
289     end
290
291     def byterate
292       return "#{@bytes_per}/#{@seconds_per}"
293     end
294
295     def byterate=(newrate)
296       @qmutex.synchronize do
297         setbyterate(newrate)
298       end
299     end
300
301     def run_throttle(more=0)
302       now = Time.new
303       if @throttle_bytes > 0
304         # If we ever reach the limit, we halve the actual allowed byterate
305         # until we manage to reset the throttle.
306         if @throttle_bytes >= @bytes_per
307           @throttle_div = 0.5
308         end
309         delta = ((now - @last_throttle)*@throttle_div*@bytes_per/@seconds_per).floor
310         if delta > 0
311           @throttle_bytes -= delta
312           @throttle_bytes = 0 if @throttle_bytes < 0
313           @last_throttle = now
314         end
315       else
316         @throttle_div = 1
317       end
318       @throttle_bytes += more
319     end
320
321     # used to send lines to the remote IRCd by skipping the queue
322     # message: IRC message to send
323     # it should only be used for stuff that *must not* be queued,
324     # i.e. the initial PASS, NICK and USER command
325     # or the final QUIT message
326     def emergency_puts(message)
327       @qmutex.synchronize do
328         # debug "In puts - got mutex"
329         puts_critical(message)
330       end
331     end
332
333     # get the next line from the server (blocks)
334     def gets
335       if @sock.nil?
336         warning "socket get attempted while closed"
337         return nil
338       end
339       begin
340         reply = @sock.gets
341         @lines_received += 1
342         reply.strip! if reply
343         debug "RECV: #{reply.inspect}"
344         return reply
345       rescue => e
346         warning "socket get failed: #{e.inspect}"
347         debug e.backtrace.join("\n")
348         return nil
349       end
350     end
351
352     def queue(msg, chan=nil, ring=0)
353       if @sendq_delay > 0
354         @qmutex.synchronize do
355           @sendq.push msg, chan, ring
356           @timer.start
357         end
358       else
359         # just send it if queueing is disabled
360         self.emergency_puts(msg)
361       end
362     end
363
364     # pop a message off the queue, send it
365     def spool
366       @qmutex.synchronize do
367         begin
368           debug "in spooler"
369           if @sendq.empty?
370             @timer.stop
371             return
372           end
373           now = Time.new
374           if (now >= (@last_send + @sendq_delay))
375             # reset burst counter after @sendq_delay has passed
376             debug "resetting @burst"
377             @burst = 0
378           elsif (@burst >= @sendq_burst)
379             # nope. can't send anything, come back to us next tick...
380             debug "can't send yet"
381             @timer.start
382             return
383           end
384           debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
385           (@sendq_burst - @burst).times do
386             break if @sendq.empty?
387             mess = @sendq.next
388             if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
389               debug "flood protection: sending message of length #{mess.length}"
390               debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
391               puts_critical(@sendq.shift)
392             else
393               debug "flood protection: throttling message of length #{mess.length}"
394               debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
395               run_throttle
396               break
397             end
398           end
399           if @sendq.empty?
400             @timer.stop
401           end
402         rescue => e
403           error "Spooling failed: #{e.inspect}"
404           error e.backtrace.join("\n")
405         end
406       end
407     end
408
409     def clearq
410       if @sock
411         @qmutex.synchronize do
412           unless @sendq.empty?
413             @sendq.clear
414           end
415         end
416       else
417         warning "Clearing socket while disconnected"
418       end
419     end
420
421     # flush the TCPSocket
422     def flush
423       @sock.flush
424     end
425
426     # Wraps Kernel.select on the socket
427     def select(timeout=nil)
428       Kernel.select([@sock], nil, nil, timeout)
429     end
430
431     # shutdown the connection to the server
432     def shutdown(how=2)
433       @sock.shutdown(how) unless @sock.nil?
434       @sock = nil
435       @burst = 0
436     end
437
438     private
439
440     # same as puts, but expects to be called with a mutex held on @qmutex
441     def puts_critical(message)
442       # debug "in puts_critical"
443       begin
444         debug "SEND: #{message.inspect}"
445         if @sock.nil?
446           error "SEND attempted on closed socket"
447         else
448           @sock.send(message + "\n",0)
449           @last_send = Time.new
450           @lines_sent += 1
451           @burst += 1
452           run_throttle(message.length + 1)
453         end
454       rescue => e
455         error "SEND failed: #{e.inspect}"
456         raise
457       end
458     end
459
460   end
461
462 end