6 # This module implements the IRC socket interface, including IRC message
7 # penalty computation and the message queue system
12 # Calculate the penalty which will be assigned to this message
17 # According to eggdrop, the initial penalty is
18 penalty = 1 + self.size/200
19 # on everything but UnderNET where it's
20 # penalty = 2 + self.size/120
22 cmd, pars = self.split($;,2)
23 debug "cmd: #{cmd}, pars: #{pars.inspect}"
26 chan, nick, msg = pars.split
27 chan = chan.split(',')
28 nick = nick.split(',')
32 chan, modes, argument = pars.split
37 extra += modes.split(/\+|-/).size
39 extra += 3 * modes.split(/\+|-/).size
43 extra += 2 * argument.split.size
45 penalty += extra * chan.split.size
48 penalty += 2 unless pars.split.size < 2
49 when :PRIVMSG, :NOTICE
50 dests = pars.split($;,2).first
51 penalty += dests.split(',').size
53 # I'm too lazy to implement this one correctly
55 when :AWAY, :JOIN, :VERSION, :TIME, :TRACE, :WHOIS, :DNS
61 else # Unknown messages
64 if penalty > penalty_max
65 debug "Wow, more than #{penalty_max} secs of penalty!"
68 if penalty < penalty_min
69 debug "Wow, less than #{penalty_min} secs of penalty!"
72 debug "penalty: #{penalty}"
83 # A QueueRing is implemented as an array with elements in the form
84 # [chan, [message1, message2, ...]
85 # Note that the channel +chan+ has no actual bearing with the channels
86 # to which messages will be sent
112 cmess = @storage.assoc(chan)
114 idx = @storage.index(cmess)
116 @storage[idx] = cmess
118 @storage << [chan, [mess]]
124 warning "trying to access empty ring"
128 @last_idx = (@last_idx + 1) % @storage.size
129 mess = @storage[@last_idx][1].first
136 warning "trying to access empty ring"
139 @last_idx = (@last_idx + 1) % @storage.size
140 mess = @storage[@last_idx][1].shift
141 @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
150 # a MessageQueue is an array of QueueRings
151 # rings have decreasing priority, so messages in ring 0
152 # are more important than messages in ring 1, and so on
153 @rings = Array.new(3) { |i|
157 # ring 0 is special in that if it's not empty, it will
158 # be popped. IOW, ring 0 can starve the other rings
159 # ring 0 is strictly FIFO and is therefore implemented
164 # the other rings are satisfied round-robin
166 self.extend(MonitorMixin)
167 @non_empty = self.new_cond
172 @rings.each { |r| r.clear }
177 def push(mess, chan=nil, cring=0)
181 warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
184 error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
185 @rings[ring].push mess, chan
191 def shift(tmout = nil)
193 @non_empty.wait(tmout) if self.empty?
201 !@rings.find { |r| !r.empty? }
205 @rings.inject(0) { |s, r| s + r.size }
211 return @rings[0].shift
213 (@rings.size - 1).times do
214 @last_ring = (@last_ring % (@rings.size - 1)) + 1
215 return @rings[@last_ring].shift unless @rings[@last_ring].empty?
217 warning "trying to access an empty message queue"
223 # wrapped TCPSocket for communication with the server.
224 # emulates a subset of TCPSocket functionality
227 MAX_IRC_SEND_PENALTY = 10
229 # total number of lines sent to the irc server
230 attr_reader :lines_sent
232 # total number of lines received from the irc server
233 attr_reader :lines_received
235 # total number of bytes sent to the irc server
236 attr_reader :bytes_sent
238 # total number of bytes received from the irc server
239 attr_reader :bytes_received
241 # accumulator for the throttle
242 attr_reader :throttle_bytes
244 # an optional filter object. we call @filter.in(data) for
245 # all incoming data and @filter.out(data) for all outgoing data
248 # normalized uri of the current server
249 attr_reader :server_uri
251 # default trivial filter class
262 # set filter to identity, not to nil
264 @filter = f || IdentityFilter.new
267 # server_list:: list of servers to connect to
268 # host:: optional local host to bind to (ruby 1.7+ required)
269 # create a new Irc::Socket
270 def initialize(server_list, host, opts={})
271 @server_list = server_list.dup
276 @filter = IdentityFilter.new
280 if opts.kind_of?(Hash) and opts.key?(:ssl)
291 # open a TCP connection to the server
294 warning "reconnecting while connected"
297 srv_uri = @server_list[@conn_count % @server_list.size].dup
298 srv_uri = 'irc://' + srv_uri if !(srv_uri =~ /:\/\//)
300 @server_uri = URI.parse(srv_uri)
301 @server_uri.port = 6667 if !@server_uri.port
302 debug "connection attempt \##{@conn_count} (#{@server_uri.host}:#{@server_uri.port})"
306 sock=TCPSocket.new(@server_uri.host, @server_uri.port, @host)
307 rescue ArgumentError => e
308 error "Your version of ruby does not support binding to a "
309 error "specific local address, please upgrade if you wish "
310 error "to use HOST = foo"
311 error "(this option has been disabled in order to continue)"
312 sock=TCPSocket.new(@server_uri.host, @server_uri.port)
315 sock=TCPSocket.new(@server_uri.host, @server_uri.port)
319 ssl_context = OpenSSL::SSL::SSLContext.new()
320 ssl_context.verify_mode = OpenSSL::SSL::VERIFY_NONE
321 sock = OpenSSL::SSL::SSLSocket.new(sock, ssl_context)
322 sock.sync_close = true
326 @last_send = Time.new
327 @flood_send = Time.new
329 @sock.extend(MonitorMixin)
330 @sendq = MessageQueue.new
331 @qthread = Thread.new { writer_loop }
334 # used to send lines to the remote IRCd by skipping the queue
335 # message: IRC message to send
336 # it should only be used for stuff that *must not* be queued,
337 # i.e. the initial PASS, NICK and USER command
338 # or the final QUIT message
339 def emergency_puts(message, penalty = false)
341 # debug "In puts - got @sock"
342 puts_critical(message, penalty)
346 def handle_socket_error(string, e)
347 error "#{string} failed: #{e.pretty_inspect}"
348 # We assume that an error means that there are connection
349 # problems and that we should reconnect, so we
351 raise SocketError.new(e.inspect)
354 # get the next line from the server (blocks)
357 warning "socket get attempted while closed"
361 reply = @filter.in(@sock.gets)
363 reply.strip! if reply
364 debug "RECV: #{reply.inspect}"
366 rescue Exception => e
367 handle_socket_error(:RECV, e)
371 def queue(msg, chan=nil, ring=0)
372 @sendq.push msg, chan, ring
379 # flush the TCPSocket
384 # Wraps Kernel.select on the socket
385 def select(timeout=nil)
386 Kernel.select([@sock], nil, nil, timeout)
389 # shutdown the connection to the server
391 return unless connected?
396 rescue Exception => e
397 error "error while shutting down: #{e.pretty_inspect}"
409 flood_delay = @flood_send - MAX_IRC_SEND_PENALTY - now
410 delay = [flood_delay, 0].max
412 debug "sleep(#{delay}) # (f: #{flood_delay})"
416 debug "got #{msg.inspect} from queue, sending"
417 emergency_puts(msg, true)
418 rescue Exception => e
419 error "Spooling failed: #{e.pretty_inspect}"
420 debug e.backtrace.join("\n")
426 # same as puts, but expects to be called with a lock held on @sock
427 def puts_critical(message, penalty=false)
428 # debug "in puts_critical"
430 debug "SEND: #{message.inspect}"
432 error "SEND attempted on closed socket"
434 # we use Socket#syswrite() instead of Socket#puts() because
435 # the latter is racy and can cause double message output in
437 actual = @filter.out(message) + "\n"
439 @sock.syswrite actual
441 @flood_send = now if @flood_send < now
442 @flood_send += message.irc_send_penalty if penalty
445 rescue Exception => e
446 handle_socket_error(:SEND, e)