journal: start with core botmodule, api changes
[rbot] / lib / rbot / journal.rb
1 # encoding: UTF-8
2 #-- vim:sw=2:et
3 #++
4 #
5 # :title: rbot's persistent message queue
6 #
7 # Author:: Matthias Hecker (apoc@geekosphere.org)
8
9 require 'thread'
10 require 'securerandom'
11
12 module Irc
13 class Bot
14 module Journal
15
16 =begin rdoc
17
18   The journal is a persistent message queue for rbot, its based on a basic
19   publish/subscribe model and persists messages into backend databases
20   that can be efficiently searched for past messages.
21
22   It is a addition to the key value storage already present in rbot
23   through its registry subsystem.
24
25 =end
26
27   class InvalidJournalMessage < StandardError
28   end
29   class StorageError < StandardError
30   end
31
32   class JournalMessage
33     # a unique identification of this message
34     attr_reader :id
35
36     # describes a hierarchical queue into which this message belongs
37     attr_reader :topic
38
39     # when this message was published as a Time instance
40     attr_reader :timestamp
41
42     # contains the actual message as a Hash
43     attr_reader :payload
44
45     def initialize(message)
46       @id = message[:id]
47       @timestamp = message[:timestamp]
48       @topic = message[:topic]
49       @payload = message[:payload]
50       if @payload.class != Hash
51         raise InvalidJournalMessage.new('payload must be a hash!')
52       end
53     end
54
55     # Access payload value by key.
56     def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
57       value = pkey.split('.').reduce(@payload) do |hash, key|
58         if hash.has_key?(key) or hash.has_key?(key.to_sym)
59           hash[key] || hash[key.to_sym]
60         else
61           if default == :exception
62             raise ArgumentError.new
63           else
64             default
65           end
66         end
67       end
68     end
69
70     # Access payload value by key alias for get(key, nil).
71     def [](key)
72       get(key, nil)
73     end
74
75     def ==(other)
76       (@id == other.id) rescue false
77     end
78
79     def self.create(topic, payload, opt={})
80       JournalMessage.new(
81         id: opt[:id] || SecureRandom.uuid,
82         timestamp: opt[:timestamp] || Time.now,
83         topic: topic,
84         payload: payload
85       )
86     end
87   end
88
89   module Storage
90     class AbstractStorage
91       # intializes/opens a new storage connection
92       def initialize(opts={})
93       end
94
95       # inserts a message in storage
96       def insert(message)
97       end
98
99       # creates/ensures a index exists on the payload specified by key
100       def ensure_index(key)
101       end
102
103       # returns a array of message instances that match the query
104       def find(query=nil, limit=100, offset=0, &block)
105       end
106
107       # returns the number of messages that match the query
108       def count(query=nil)
109       end
110
111       # remove messages that match the query
112       def remove(query=nil)
113       end
114
115       # destroy the underlying table/collection
116       def drop
117       end
118
119       # Returns all classes from the namespace that implement this interface
120       def self.get_impl
121         ObjectSpace.each_object(Class).select { |klass| klass < self }
122       end
123     end
124
125     def self.create(name, uri)
126       log 'load journal storage adapter: ' + name
127       load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
128       cls = AbstractStorage.get_impl.first
129       cls.new(uri: uri)
130     end
131   end
132
133   # Describes a query on journal entries, it is used both to describe
134   # a subscription aswell as to query persisted messages.
135   # There two ways to declare a Query instance, using
136   # the DSL like this:
137   #
138   #   Query.define do
139   #     id 'foo'
140   #     id 'bar'
141   #     topic 'log.irc.*'
142   #     topic 'log.core'
143   #     timestamp from: Time.now, to: Time.now + 60 * 10
144   #     payload 'action': :privmsg
145   #     payload 'channel': '#rbot'
146   #     payload 'foo.bar': 'baz'
147   #   end
148   #
149   # or using a hash: (NOTE: avoid using symbols in payload)
150   #
151   #   Query.define({
152   #     id: ['foo', 'bar'],
153   #     topic: ['log.irc.*', 'log.core'],
154   #     timestamp: {
155   #       from: Time.now
156   #       to: Time.now + 60 * 10
157   #     },
158   #     payload: {
159   #       'action' => 'privmsg'
160   #       'channel' => '#rbot',
161   #       'foo.bar' => 'baz'
162   #     }
163   #   })
164   #
165   class Query
166     # array of ids to match (OR)
167     attr_reader :id
168     # array of topics to match with wildcard support (OR)
169     attr_reader :topic
170     # hash with from: timestamp and to: timestamp
171     attr_reader :timestamp
172     # hash of key values to match
173     attr_reader :payload
174
175     def initialize(query)
176       @id = query[:id] || []
177       @id = [@id] if @id.is_a? String
178       @topic = query[:topic] || []
179       @topic = [@topic] if @topic.is_a? String
180       @timestamp = {
181         from: nil, to: nil
182       }
183       if query[:timestamp] and query[:timestamp][:from]
184         @timestamp[:from] = query[:timestamp][:from]
185       end
186       if query[:timestamp] and query[:timestamp][:to]
187         @timestamp[:to] = query[:timestamp][:to]
188       end
189       @payload = query[:payload] || {}
190     end
191
192     # returns true if the given message matches the query
193     def matches?(message)
194       return false if not @id.empty? and not @id.include? message.id
195       return false if not @topic.empty? and not topic_matches? message.topic
196       if @timestamp[:from]
197         return false unless message.timestamp >= @timestamp[:from]
198       end
199       if @timestamp[:to]
200         return false unless message.timestamp <= @timestamp[:to]
201       end
202       found = false
203       @payload.each_pair do |key, value|
204         begin
205           message.get(key.to_s)
206         rescue ArgumentError
207         end
208         found = true
209       end
210       return false if not found and not @payload.empty?
211       true
212     end
213
214     def topic_matches?(_topic)
215       @topic.each do |topic|
216         if topic.include? '*'
217           match = true
218           topic.split('.').zip(_topic.split('.')).each do |a, b|
219             if a == '*'
220               if not b or b.empty?
221                 match = false
222               end
223             else
224               match = false unless a == b
225             end
226           end
227           return true if match
228         else
229           return true if topic == _topic
230         end
231       end
232       return false
233     end
234
235     # factory that constructs a query
236     class Factory
237       attr_reader :query
238       def initialize
239         @query = {
240           id: [],
241           topic: [],
242           timestamp: {
243             from: nil, to: nil
244           },
245           payload: {}
246         }
247       end
248
249       def id(*_id)
250         @query[:id] += _id
251       end
252
253       def topic(*_topic)
254           @query[:topic] += _topic
255       end
256
257       def timestamp(range)
258         @query[:timestamp] = range
259       end
260
261       def payload(query)
262         @query[:payload].merge!(query)
263       end
264     end
265
266     def self.define(query=nil, &block)
267       factory = Factory.new
268       if block_given?
269         factory.instance_eval(&block)
270         query = factory.query
271       end
272       Query.new query
273     end
274
275   end
276
277
278   class JournalBroker
279     class Subscription
280       attr_reader :topic
281       attr_reader :block
282       def initialize(broker, topic, block)
283         @broker = broker
284         @topic = topic
285         @block = block
286       end
287       def cancel
288         @broker.unsubscribe(self)
289       end
290     end
291
292     def initialize(opts={})
293       # overrides the internal consumer with a block
294       @consumer = opts[:consumer]
295       # storage backend
296       @storage = opts[:storage]
297       unless @storage
298         warning 'journal broker: no storage set up, won\'t persist messages'
299       end
300       @queue = Queue.new
301       # consumer thread:
302       @thread = Thread.new do
303         while message = @queue.pop
304           begin
305             consume message
306           # pop(true) ... rescue ThreadError => e
307           rescue Exception => e
308             error 'journal broker: exception in consumer thread'
309             error $!
310           end
311         end
312       end
313       @subscriptions = []
314       # lookup-table for subscriptions by their topic
315       @topic_subs = {}
316     end
317
318     def consume(message)
319       return unless message
320       @consumer.call(message) if @consumer
321
322       # notify subscribers
323       if @topic_subs.has_key? message.topic
324         @topic_subs[message.topic].each do |s|
325           s.block.call(message)
326         end
327       end
328
329       @storage.insert(message) if @storage
330     end
331
332     def persists?
333       true if @storage
334     end
335
336     def shutdown
337       log 'journal shutdown'
338       @subscriptions.clear
339       @topic_subs.clear
340       @queue << nil
341       @thread.join
342       @thread = nil
343     end
344
345     def publish(topic, payload)
346       @queue << JournalMessage::create(topic, payload)
347     end
348
349     # Subscribe to receive messages from a topic.
350     #
351     # You can use this method to subscribe to messages that
352     # are published within a specified topic. You must provide
353     # a receiving block to receive messages one-by-one.
354     # The method returns an instance of Subscription that can
355     # be used to cancel the subscription by invoking cancel
356     # on it.
357     #
358     #   journal.subscribe('irclog') do |message|
359     #     # received irclog messages...
360     #   end
361     #
362     def subscribe(topic=nil, &block)
363       raise ArgumentError.new unless block_given?
364       s = Subscription.new(self, topic, block)
365       @subscriptions << s
366       unless @topic_subs.has_key? topic
367         @topic_subs[topic] = []
368       end
369       @topic_subs[topic] << s
370       s
371     end
372
373     def unsubscribe(s)
374       if @topic_subs.has_key? s.topic
375         @topic_subs[s.topic].delete(s)
376       end
377       @subscriptions.delete s
378     end
379
380     # Find and return persisted messages by a query.
381     #
382     # This method will either return all messages or call the provided
383     # block for each message. It will filter the messages by the
384     # provided Query instance. Limit and offset might be used to
385     # constrain the result.
386     # The query might also be a hash or proc that is passed to
387     # Query.define first.
388     #
389     # @param query [Query] 
390     # @param limit [Integer] how many items to return
391     # @param offset [Integer] relative offset in results
392     def find(query, limit=100, offset=0, &block)
393       unless query.is_a? Query
394         query = Query.define(query)
395       end
396       if block_given?
397         @storage.find(query, limit, offset, &block)
398       else
399         @storage.find(query, limit, offset)
400       end
401     end
402
403     def count(query=nil)
404       @storage.count(query)
405     end
406
407     def remove(query=nil)
408       @storage.remove(query)
409     end
410
411   end
412
413 end # Journal
414 end # Bot
415 end # Irc
416