rbot-remote: allow override of function
[rbot] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6 #
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
8
9 require 'thread'
10 require 'securerandom'
11
12 module Irc
13 class Bot
14 module Journal
15
16 =begin rdoc
17
18   The journal is a persistent message queue for rbot, its based on a basic
19   publish/subscribe model and persists messages into backend databases
20   that can be efficiently searched for past messages.
21
22   It is a addition to the key value storage already present in rbot
23   through its registry subsystem.
24
25 =end
26
27   class InvalidJournalMessage < StandardError
28   end
29   class StorageError < StandardError
30   end
31
32   class JournalMessage
33     # a unique identification of this message
34     attr_reader :id
35
36     # describes a hierarchical queue into which this message belongs
37     attr_reader :topic
38
39     # when this message was published as a Time instance
40     attr_reader :timestamp
41
42     # contains the actual message as a Hash
43     attr_reader :payload
44
45     def initialize(message)
46       @id = message[:id]
47       @timestamp = message[:timestamp]
48       @topic = message[:topic]
49       @payload = message[:payload]
50       if @payload.class != Hash
51         raise InvalidJournalMessage.new('payload must be a hash!')
52       end
53     end
54
55     # Access payload value by key.
56     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57       value = pkey.split('.').reduce(@payload) do |hash, key|
58         if hash.has_key?(key) or hash.has_key?(key.to_sym)
59           hash[key] || hash[key.to_sym]
60         else
61           if default == :exception
62             raise ArgumentError.new
63           else
64             default
65           end
66         end
67       end
68     end
69
70     # Access payload value by key alias for get(key, nil).
71     def [](key)
72       get(key, nil)
73     end
74
75     def ==(other)
76       (@id == other.id) rescue false
77     end
78
79     def self.create(topic, payload, opt={})
80       # cleanup payload to only contain strings
81       JournalMessage.new(
82         id: opt[:id] || SecureRandom.uuid,
83         timestamp: opt[:timestamp] || Time.now,
84         topic: topic,
85         payload: payload
86       )
87     end
88   end
89
90   module Storage
91     class AbstractStorage
92       # intializes/opens a new storage connection
93       def initialize(opts={})
94       end
95
96       # inserts a message in storage
97       def insert(message)
98       end
99
100       # creates/ensures a index exists on the payload specified by key
101       def ensure_payload_index(key)
102       end
103
104       # returns a array of message instances that match the query
105       def find(query=nil, limit=100, offset=0, &block)
106       end
107
108       # returns the number of messages that match the query
109       def count(query=nil)
110       end
111
112       # remove messages that match the query
113       def remove(query=nil)
114       end
115
116       # destroy the underlying table/collection
117       def drop
118       end
119
120       # Returns all classes from the namespace that implement this interface
121       def self.get_impl
122         ObjectSpace.each_object(Class).select { |klass| klass < self }
123       end
124     end
125
126     def self.create(name, uri)
127       log 'load journal storage adapter: ' + name
128       load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
129       cls = AbstractStorage.get_impl.first
130       cls.new(uri: uri)
131     end
132   end
133
134   # Describes a query on journal entries, it is used both to describe
135   # a subscription aswell as to query persisted messages.
136   # There two ways to declare a Query instance, using
137   # the DSL like this:
138   #
139   #   Query.define do
140   #     id 'foo'
141   #     id 'bar'
142   #     topic 'log.irc.*'
143   #     topic 'log.core'
144   #     timestamp from: Time.now, to: Time.now + 60 * 10
145   #     payload 'action': :privmsg
146   #     payload 'channel': '#rbot'
147   #     payload 'foo.bar': 'baz'
148   #   end
149   #
150   # or using a hash: (NOTE: avoid using symbols in payload)
151   #
152   #   Query.define({
153   #     id: ['foo', 'bar'],
154   #     topic: ['log.irc.*', 'log.core'],
155   #     timestamp: {
156   #       from: Time.now
157   #       to: Time.now + 60 * 10
158   #     },
159   #     payload: {
160   #       'action' => 'privmsg'
161   #       'channel' => '#rbot',
162   #       'foo.bar' => 'baz'
163   #     }
164   #   })
165   #
166   class Query
167     # array of ids to match (OR)
168     attr_reader :id
169     # array of topics to match with wildcard support (OR)
170     attr_reader :topic
171     # hash with from: timestamp and to: timestamp
172     attr_reader :timestamp
173     # hash of key values to match
174     attr_reader :payload
175
176     def initialize(query)
177       @id = query[:id] || []
178       @id = [@id] if @id.is_a? String
179       @topic = query[:topic] || []
180       @topic = [@topic] if @topic.is_a? String
181       @timestamp = {
182         from: nil, to: nil
183       }
184       if query[:timestamp] and query[:timestamp][:from]
185         @timestamp[:from] = query[:timestamp][:from]
186       end
187       if query[:timestamp] and query[:timestamp][:to]
188         @timestamp[:to] = query[:timestamp][:to]
189       end
190       @payload = query[:payload] || {}
191     end
192
193     # returns true if the given message matches the query
194     def matches?(message)
195       return false if not @id.empty? and not @id.include? message.id
196       return false if not @topic.empty? and not topic_matches? message.topic
197       if @timestamp[:from]
198         return false unless message.timestamp >= @timestamp[:from]
199       end
200       if @timestamp[:to]
201         return false unless message.timestamp <= @timestamp[:to]
202       end
203       found = false
204       @payload.each_pair do |key, value|
205         begin
206           message.get(key.to_s)
207         rescue ArgumentError
208         end
209         found = true
210       end
211       return false if not found and not @payload.empty?
212       true
213     end
214
215     def topic_matches?(_topic)
216       @topic.each do |topic|
217         if topic.include? '*'
218           match = true
219           topic.split('.').zip(_topic.split('.')).each do |a, b|
220             if a == '*'
221               if not b or b.empty?
222                 match = false
223               end
224             else
225               match = false unless a == b
226             end
227           end
228           return true if match
229         else
230           return true if topic == _topic
231         end
232       end
233       return false
234     end
235
236     # factory that constructs a query
237     class Factory
238       attr_reader :query
239       def initialize
240         @query = {
241           id: [],
242           topic: [],
243           timestamp: {
244             from: nil, to: nil
245           },
246           payload: {}
247         }
248       end
249
250       def id(*_id)
251         @query[:id] += _id
252       end
253
254       def topic(*_topic)
255           @query[:topic] += _topic
256       end
257
258       def timestamp(range)
259         @query[:timestamp] = range
260       end
261
262       def payload(query)
263         @query[:payload].merge!(query)
264       end
265     end
266
267     def self.define(query=nil, &block)
268       factory = Factory.new
269       if block_given?
270         factory.instance_eval(&block)
271         query = factory.query
272       end
273       Query.new query
274     end
275
276   end
277
278
279   class JournalBroker
280     attr_reader :storage
281     class Subscription
282       attr_reader :topic
283       attr_reader :block
284       def initialize(broker, topic, block)
285         @broker = broker
286         @topic = topic
287         @block = block
288       end
289       def cancel
290         @broker.unsubscribe(self)
291       end
292     end
293
294     def initialize(opts={})
295       # overrides the internal consumer with a block
296       @consumer = opts[:consumer]
297       # storage backend
298       @storage = opts[:storage]
299       unless @storage
300         warning 'journal broker: no storage set up, won\'t persist messages'
301       end
302       @queue = Queue.new
303       # consumer thread:
304       @thread = Thread.new do
305         while message = @queue.pop
306           begin
307             consume message
308           # pop(true) ... rescue ThreadError => e
309           rescue Exception => e
310             error 'journal broker: exception in consumer thread'
311             error $!
312           end
313         end
314       end
315       @subscriptions = []
316       # lookup-table for subscriptions by their topic
317       @topic_subs = {}
318     end
319
320     def consume(message)
321       return unless message
322       @consumer.call(message) if @consumer
323
324       # notify subscribers
325       if @topic_subs.has_key? message.topic
326         @topic_subs[message.topic].each do |s|
327           s.block.call(message)
328         end
329       end
330
331       @storage.insert(message) if @storage
332     end
333
334     def persists?
335       true if @storage
336     end
337
338     def shutdown
339       log 'journal shutdown'
340       @subscriptions.clear
341       @topic_subs.clear
342       @queue << nil
343       @thread.join
344       @thread = nil
345     end
346
347     def publish(topic, payload)
348       debug 'journal publish message in %s: %s' % [topic, payload.inspect]
349       @queue << JournalMessage::create(topic, payload)
350       nil
351     end
352
353     # Subscribe to receive messages from a topic.
354     #
355     # You can use this method to subscribe to messages that
356     # are published within a specified topic. You must provide
357     # a receiving block to receive messages one-by-one.
358     # The method returns an instance of Subscription that can
359     # be used to cancel the subscription by invoking cancel
360     # on it.
361     #
362     #   journal.subscribe('irclog') do |message|
363     #     # received irclog messages...
364     #   end
365     #
366     def subscribe(topic=nil, &block)
367       raise ArgumentError.new unless block_given?
368       s = Subscription.new(self, topic, block)
369       @subscriptions << s
370       unless @topic_subs.has_key? topic
371         @topic_subs[topic] = []
372       end
373       @topic_subs[topic] << s
374       s
375     end
376
377     def unsubscribe(s)
378       if @topic_subs.has_key? s.topic
379         @topic_subs[s.topic].delete(s)
380       end
381       @subscriptions.delete s
382     end
383
384     # Find and return persisted messages by a query.
385     #
386     # This method will either return all messages or call the provided
387     # block for each message. It will filter the messages by the
388     # provided Query instance. Limit and offset might be used to
389     # constrain the result.
390     # The query might also be a hash or proc that is passed to
391     # Query.define first.
392     #
393     # @param query [Query] 
394     # @param limit [Integer] how many items to return
395     # @param offset [Integer] relative offset in results
396     def find(query, limit=100, offset=0, &block)
397       unless query.is_a? Query
398         query = Query.define(query)
399       end
400       if block_given?
401         @storage.find(query, limit, offset, &block)
402       else
403         @storage.find(query, limit, offset)
404       end
405     end
406
407     def count(query=nil)
408       unless query.is_a? Query
409         query = Query.define(query)
410       end
411       @storage.count(query)
412     end
413
414     def remove(query=nil)
415       unless query.is_a? Query
416         query = Query.define(query)
417       end
418       @storage.remove(query)
419     end
420
421     def ensure_payload_index(key)
422       @storage.ensure_payload_index(key)
423     end
424
425   end
426
427 end # Journal
428 end # Bot
429 end # Irc
430