5 # :title: rbot's persistent message queue
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
10 require 'securerandom'
18 The journal is a persistent message queue for rbot, its based on a basic
19 publish/subscribe model and persists messages into backend databases
20 that can be efficiently searched for past messages.
22 It is a addition to the key value storage already present in rbot
23 through its registry subsystem.
27 class InvalidJournalMessage < StandardError
29 class StorageError < StandardError
33 # a unique identification of this message
36 # describes a hierarchical queue into which this message belongs
39 # when this message was published as a Time instance
40 attr_reader :timestamp
42 # contains the actual message as a Hash
45 def initialize(message)
47 @timestamp = message[:timestamp]
48 @topic = message[:topic]
49 @payload = message[:payload]
50 if @payload.class != Hash
51 raise InvalidJournalMessage.new('payload must be a hash!')
55 # Access payload value by key.
56 def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57 value = pkey.split('.').reduce(@payload) do |hash, key|
58 if hash.has_key?(key) or hash.has_key?(key.to_sym)
59 hash[key] || hash[key.to_sym]
61 if default == :exception
62 raise ArgumentError.new
70 # Access payload value by key alias for get(key, nil).
76 (@id == other.id) rescue false
79 def self.create(topic, payload, opt={})
81 id: opt[:id] || SecureRandom.uuid,
82 timestamp: opt[:timestamp] || Time.now,
91 # intializes/opens a new storage connection
92 def initialize(opts={})
95 # inserts a message in storage
99 # creates/ensures a index exists on the payload specified by key
100 def ensure_index(key)
103 # returns a array of message instances that match the query
104 def find(query=nil, limit=100, offset=0, &block)
107 # returns the number of messages that match the query
111 # remove messages that match the query
112 def remove(query=nil)
115 # destroy the underlying table/collection
119 # Returns all classes from the namespace that implement this interface
121 ObjectSpace.each_object(Class).select { |klass| klass < self }
125 def self.create(name, uri)
126 log 'load journal storage adapter: ' + name
127 load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
128 cls = AbstractStorage.get_impl.first
133 # Describes a query on journal entries, it is used both to describe
134 # a subscription aswell as to query persisted messages.
135 # There two ways to declare a Query instance, using
143 # timestamp from: Time.now, to: Time.now + 60 * 10
144 # payload 'action': :privmsg
145 # payload 'channel': '#rbot'
146 # payload 'foo.bar': 'baz'
149 # or using a hash: (NOTE: avoid using symbols in payload)
152 # id: ['foo', 'bar'],
153 # topic: ['log.irc.*', 'log.core'],
156 # to: Time.now + 60 * 10
159 # 'action' => 'privmsg'
160 # 'channel' => '#rbot',
166 # array of ids to match (OR)
168 # array of topics to match with wildcard support (OR)
170 # hash with from: timestamp and to: timestamp
171 attr_reader :timestamp
172 # hash of key values to match
175 def initialize(query)
176 @id = query[:id] || []
177 @id = [@id] if @id.is_a? String
178 @topic = query[:topic] || []
179 @topic = [@topic] if @topic.is_a? String
183 if query[:timestamp] and query[:timestamp][:from]
184 @timestamp[:from] = query[:timestamp][:from]
186 if query[:timestamp] and query[:timestamp][:to]
187 @timestamp[:to] = query[:timestamp][:to]
189 @payload = query[:payload] || {}
192 # returns true if the given message matches the query
193 def matches?(message)
194 return false if not @id.empty? and not @id.include? message.id
195 return false if not @topic.empty? and not topic_matches? message.topic
197 return false unless message.timestamp >= @timestamp[:from]
200 return false unless message.timestamp <= @timestamp[:to]
203 @payload.each_pair do |key, value|
205 message.get(key.to_s)
210 return false if not found and not @payload.empty?
214 def topic_matches?(_topic)
215 @topic.each do |topic|
216 if topic.include? '*'
218 topic.split('.').zip(_topic.split('.')).each do |a, b|
224 match = false unless a == b
229 return true if topic == _topic
235 # factory that constructs a query
254 @query[:topic] += _topic
258 @query[:timestamp] = range
262 @query[:payload].merge!(query)
266 def self.define(query=nil, &block)
267 factory = Factory.new
269 factory.instance_eval(&block)
270 query = factory.query
282 def initialize(broker, topic, block)
288 @broker.unsubscribe(self)
292 def initialize(opts={})
293 # overrides the internal consumer with a block
294 @consumer = opts[:consumer]
296 @storage = opts[:storage]
298 warning 'journal broker: no storage set up, won\'t persist messages'
302 @thread = Thread.new do
303 while message = @queue.pop
306 # pop(true) ... rescue ThreadError => e
307 rescue Exception => e
308 error 'journal broker: exception in consumer thread'
314 # lookup-table for subscriptions by their topic
319 return unless message
320 @consumer.call(message) if @consumer
323 if @topic_subs.has_key? message.topic
324 @topic_subs[message.topic].each do |s|
325 s.block.call(message)
329 @storage.insert(message) if @storage
337 log 'journal shutdown'
345 def publish(topic, payload)
346 @queue << JournalMessage::create(topic, payload)
349 # Subscribe to receive messages from a topic.
351 # You can use this method to subscribe to messages that
352 # are published within a specified topic. You must provide
353 # a receiving block to receive messages one-by-one.
354 # The method returns an instance of Subscription that can
355 # be used to cancel the subscription by invoking cancel
358 # journal.subscribe('irclog') do |message|
359 # # received irclog messages...
362 def subscribe(topic=nil, &block)
363 raise ArgumentError.new unless block_given?
364 s = Subscription.new(self, topic, block)
366 unless @topic_subs.has_key? topic
367 @topic_subs[topic] = []
369 @topic_subs[topic] << s
374 if @topic_subs.has_key? s.topic
375 @topic_subs[s.topic].delete(s)
377 @subscriptions.delete s
380 # Find and return persisted messages by a query.
382 # This method will either return all messages or call the provided
383 # block for each message. It will filter the messages by the
384 # provided Query instance. Limit and offset might be used to
385 # constrain the result.
386 # The query might also be a hash or proc that is passed to
387 # Query.define first.
389 # @param query [Query]
390 # @param limit [Integer] how many items to return
391 # @param offset [Integer] relative offset in results
392 def find(query, limit=100, offset=0, &block)
393 unless query.is_a? Query
394 query = Query.define(query)
397 @storage.find(query, limit, offset, &block)
399 @storage.find(query, limit, offset)
404 @storage.count(query)
407 def remove(query=nil)
408 @storage.remove(query)