Merge branch 'cuse' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse
[linux-2.6] / fs / fscache / operation.c
1 /* FS-Cache worker operation management routines
2  *
3  * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  *
11  * See Documentation/filesystems/caching/operations.txt
12  */
13
14 #define FSCACHE_DEBUG_LEVEL OPERATION
15 #include <linux/module.h>
16 #include "internal.h"
17
18 atomic_t fscache_op_debug_id;
19 EXPORT_SYMBOL(fscache_op_debug_id);
20
21 /**
22  * fscache_enqueue_operation - Enqueue an operation for processing
23  * @op: The operation to enqueue
24  *
25  * Enqueue an operation for processing by the FS-Cache thread pool.
26  *
27  * This will get its own ref on the object.
28  */
29 void fscache_enqueue_operation(struct fscache_operation *op)
30 {
31         _enter("{OBJ%x OP%x,%u}",
32                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
33
34         ASSERT(op->processor != NULL);
35         ASSERTCMP(op->object->state, >=, FSCACHE_OBJECT_AVAILABLE);
36         ASSERTCMP(atomic_read(&op->usage), >, 0);
37
38         if (list_empty(&op->pend_link)) {
39                 switch (op->flags & FSCACHE_OP_TYPE) {
40                 case FSCACHE_OP_FAST:
41                         _debug("queue fast");
42                         atomic_inc(&op->usage);
43                         if (!schedule_work(&op->fast_work))
44                                 fscache_put_operation(op);
45                         break;
46                 case FSCACHE_OP_SLOW:
47                         _debug("queue slow");
48                         slow_work_enqueue(&op->slow_work);
49                         break;
50                 case FSCACHE_OP_MYTHREAD:
51                         _debug("queue for caller's attention");
52                         break;
53                 default:
54                         printk(KERN_ERR "FS-Cache: Unexpected op type %lx",
55                                op->flags);
56                         BUG();
57                         break;
58                 }
59                 fscache_stat(&fscache_n_op_enqueue);
60         }
61 }
62 EXPORT_SYMBOL(fscache_enqueue_operation);
63
64 /*
65  * start an op running
66  */
67 static void fscache_run_op(struct fscache_object *object,
68                            struct fscache_operation *op)
69 {
70         object->n_in_progress++;
71         if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
72                 wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
73         if (op->processor)
74                 fscache_enqueue_operation(op);
75         fscache_stat(&fscache_n_op_run);
76 }
77
78 /*
79  * submit an exclusive operation for an object
80  * - other ops are excluded from running simultaneously with this one
81  * - this gets any extra refs it needs on an op
82  */
83 int fscache_submit_exclusive_op(struct fscache_object *object,
84                                 struct fscache_operation *op)
85 {
86         int ret;
87
88         _enter("{OBJ%x OP%x},", object->debug_id, op->debug_id);
89
90         spin_lock(&object->lock);
91         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
92         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
93
94         ret = -ENOBUFS;
95         if (fscache_object_is_active(object)) {
96                 op->object = object;
97                 object->n_ops++;
98                 object->n_exclusive++;  /* reads and writes must wait */
99
100                 if (object->n_ops > 0) {
101                         atomic_inc(&op->usage);
102                         list_add_tail(&op->pend_link, &object->pending_ops);
103                         fscache_stat(&fscache_n_op_pend);
104                 } else if (!list_empty(&object->pending_ops)) {
105                         atomic_inc(&op->usage);
106                         list_add_tail(&op->pend_link, &object->pending_ops);
107                         fscache_stat(&fscache_n_op_pend);
108                         fscache_start_operations(object);
109                 } else {
110                         ASSERTCMP(object->n_in_progress, ==, 0);
111                         fscache_run_op(object, op);
112                 }
113
114                 /* need to issue a new write op after this */
115                 clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
116                 ret = 0;
117         } else if (object->state == FSCACHE_OBJECT_CREATING) {
118                 op->object = object;
119                 object->n_ops++;
120                 object->n_exclusive++;  /* reads and writes must wait */
121                 atomic_inc(&op->usage);
122                 list_add_tail(&op->pend_link, &object->pending_ops);
123                 fscache_stat(&fscache_n_op_pend);
124                 ret = 0;
125         } else {
126                 /* not allowed to submit ops in any other state */
127                 BUG();
128         }
129
130         spin_unlock(&object->lock);
131         return ret;
132 }
133
134 /*
135  * report an unexpected submission
136  */
137 static void fscache_report_unexpected_submission(struct fscache_object *object,
138                                                  struct fscache_operation *op,
139                                                  unsigned long ostate)
140 {
141         static bool once_only;
142         struct fscache_operation *p;
143         unsigned n;
144
145         if (once_only)
146                 return;
147         once_only = true;
148
149         kdebug("unexpected submission OP%x [OBJ%x %s]",
150                op->debug_id, object->debug_id,
151                fscache_object_states[object->state]);
152         kdebug("objstate=%s [%s]",
153                fscache_object_states[object->state],
154                fscache_object_states[ostate]);
155         kdebug("objflags=%lx", object->flags);
156         kdebug("objevent=%lx [%lx]", object->events, object->event_mask);
157         kdebug("ops=%u inp=%u exc=%u",
158                object->n_ops, object->n_in_progress, object->n_exclusive);
159
160         if (!list_empty(&object->pending_ops)) {
161                 n = 0;
162                 list_for_each_entry(p, &object->pending_ops, pend_link) {
163                         ASSERTCMP(p->object, ==, object);
164                         kdebug("%p %p", op->processor, op->release);
165                         n++;
166                 }
167
168                 kdebug("n=%u", n);
169         }
170
171         dump_stack();
172 }
173
174 /*
175  * submit an operation for an object
176  * - objects may be submitted only in the following states:
177  *   - during object creation (write ops may be submitted)
178  *   - whilst the object is active
179  *   - after an I/O error incurred in one of the two above states (op rejected)
180  * - this gets any extra refs it needs on an op
181  */
182 int fscache_submit_op(struct fscache_object *object,
183                       struct fscache_operation *op)
184 {
185         unsigned long ostate;
186         int ret;
187
188         _enter("{OBJ%x OP%x},{%u}",
189                object->debug_id, op->debug_id, atomic_read(&op->usage));
190
191         ASSERTCMP(atomic_read(&op->usage), >, 0);
192
193         spin_lock(&object->lock);
194         ASSERTCMP(object->n_ops, >=, object->n_in_progress);
195         ASSERTCMP(object->n_ops, >=, object->n_exclusive);
196
197         ostate = object->state;
198         smp_rmb();
199
200         if (fscache_object_is_active(object)) {
201                 op->object = object;
202                 object->n_ops++;
203
204                 if (object->n_exclusive > 0) {
205                         atomic_inc(&op->usage);
206                         list_add_tail(&op->pend_link, &object->pending_ops);
207                         fscache_stat(&fscache_n_op_pend);
208                 } else if (!list_empty(&object->pending_ops)) {
209                         atomic_inc(&op->usage);
210                         list_add_tail(&op->pend_link, &object->pending_ops);
211                         fscache_stat(&fscache_n_op_pend);
212                         fscache_start_operations(object);
213                 } else {
214                         ASSERTCMP(object->n_exclusive, ==, 0);
215                         fscache_run_op(object, op);
216                 }
217                 ret = 0;
218         } else if (object->state == FSCACHE_OBJECT_CREATING) {
219                 op->object = object;
220                 object->n_ops++;
221                 atomic_inc(&op->usage);
222                 list_add_tail(&op->pend_link, &object->pending_ops);
223                 fscache_stat(&fscache_n_op_pend);
224                 ret = 0;
225         } else if (!test_bit(FSCACHE_IOERROR, &object->cache->flags)) {
226                 fscache_report_unexpected_submission(object, op, ostate);
227                 ASSERT(!fscache_object_is_active(object));
228                 ret = -ENOBUFS;
229         } else {
230                 ret = -ENOBUFS;
231         }
232
233         spin_unlock(&object->lock);
234         return ret;
235 }
236
237 /*
238  * queue an object for withdrawal on error, aborting all following asynchronous
239  * operations
240  */
241 void fscache_abort_object(struct fscache_object *object)
242 {
243         _enter("{OBJ%x}", object->debug_id);
244
245         fscache_raise_event(object, FSCACHE_OBJECT_EV_ERROR);
246 }
247
248 /*
249  * jump start the operation processing on an object
250  * - caller must hold object->lock
251  */
252 void fscache_start_operations(struct fscache_object *object)
253 {
254         struct fscache_operation *op;
255         bool stop = false;
256
257         while (!list_empty(&object->pending_ops) && !stop) {
258                 op = list_entry(object->pending_ops.next,
259                                 struct fscache_operation, pend_link);
260
261                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
262                         if (object->n_in_progress > 0)
263                                 break;
264                         stop = true;
265                 }
266                 list_del_init(&op->pend_link);
267                 object->n_in_progress++;
268
269                 if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
270                         wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
271                 if (op->processor)
272                         fscache_enqueue_operation(op);
273
274                 /* the pending queue was holding a ref on the object */
275                 fscache_put_operation(op);
276         }
277
278         ASSERTCMP(object->n_in_progress, <=, object->n_ops);
279
280         _debug("woke %d ops on OBJ%x",
281                object->n_in_progress, object->debug_id);
282 }
283
284 /*
285  * release an operation
286  * - queues pending ops if this is the last in-progress op
287  */
288 void fscache_put_operation(struct fscache_operation *op)
289 {
290         struct fscache_object *object;
291         struct fscache_cache *cache;
292
293         _enter("{OBJ%x OP%x,%d}",
294                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
295
296         ASSERTCMP(atomic_read(&op->usage), >, 0);
297
298         if (!atomic_dec_and_test(&op->usage))
299                 return;
300
301         _debug("PUT OP");
302         if (test_and_set_bit(FSCACHE_OP_DEAD, &op->flags))
303                 BUG();
304
305         fscache_stat(&fscache_n_op_release);
306
307         if (op->release) {
308                 op->release(op);
309                 op->release = NULL;
310         }
311
312         object = op->object;
313
314         /* now... we may get called with the object spinlock held, so we
315          * complete the cleanup here only if we can immediately acquire the
316          * lock, and defer it otherwise */
317         if (!spin_trylock(&object->lock)) {
318                 _debug("defer put");
319                 fscache_stat(&fscache_n_op_deferred_release);
320
321                 cache = object->cache;
322                 spin_lock(&cache->op_gc_list_lock);
323                 list_add_tail(&op->pend_link, &cache->op_gc_list);
324                 spin_unlock(&cache->op_gc_list_lock);
325                 schedule_work(&cache->op_gc);
326                 _leave(" [defer]");
327                 return;
328         }
329
330         if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
331                 ASSERTCMP(object->n_exclusive, >, 0);
332                 object->n_exclusive--;
333         }
334
335         ASSERTCMP(object->n_in_progress, >, 0);
336         object->n_in_progress--;
337         if (object->n_in_progress == 0)
338                 fscache_start_operations(object);
339
340         ASSERTCMP(object->n_ops, >, 0);
341         object->n_ops--;
342         if (object->n_ops == 0)
343                 fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
344
345         spin_unlock(&object->lock);
346
347         kfree(op);
348         _leave(" [done]");
349 }
350 EXPORT_SYMBOL(fscache_put_operation);
351
352 /*
353  * garbage collect operations that have had their release deferred
354  */
355 void fscache_operation_gc(struct work_struct *work)
356 {
357         struct fscache_operation *op;
358         struct fscache_object *object;
359         struct fscache_cache *cache =
360                 container_of(work, struct fscache_cache, op_gc);
361         int count = 0;
362
363         _enter("");
364
365         do {
366                 spin_lock(&cache->op_gc_list_lock);
367                 if (list_empty(&cache->op_gc_list)) {
368                         spin_unlock(&cache->op_gc_list_lock);
369                         break;
370                 }
371
372                 op = list_entry(cache->op_gc_list.next,
373                                 struct fscache_operation, pend_link);
374                 list_del(&op->pend_link);
375                 spin_unlock(&cache->op_gc_list_lock);
376
377                 object = op->object;
378
379                 _debug("GC DEFERRED REL OBJ%x OP%x",
380                        object->debug_id, op->debug_id);
381                 fscache_stat(&fscache_n_op_gc);
382
383                 ASSERTCMP(atomic_read(&op->usage), ==, 0);
384
385                 spin_lock(&object->lock);
386                 if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
387                         ASSERTCMP(object->n_exclusive, >, 0);
388                         object->n_exclusive--;
389                 }
390
391                 ASSERTCMP(object->n_in_progress, >, 0);
392                 object->n_in_progress--;
393                 if (object->n_in_progress == 0)
394                         fscache_start_operations(object);
395
396                 ASSERTCMP(object->n_ops, >, 0);
397                 object->n_ops--;
398                 if (object->n_ops == 0)
399                         fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
400
401                 spin_unlock(&object->lock);
402
403         } while (count++ < 20);
404
405         if (!list_empty(&cache->op_gc_list))
406                 schedule_work(&cache->op_gc);
407
408         _leave("");
409 }
410
411 /*
412  * allow the slow work item processor to get a ref on an operation
413  */
414 static int fscache_op_get_ref(struct slow_work *work)
415 {
416         struct fscache_operation *op =
417                 container_of(work, struct fscache_operation, slow_work);
418
419         atomic_inc(&op->usage);
420         return 0;
421 }
422
423 /*
424  * allow the slow work item processor to discard a ref on an operation
425  */
426 static void fscache_op_put_ref(struct slow_work *work)
427 {
428         struct fscache_operation *op =
429                 container_of(work, struct fscache_operation, slow_work);
430
431         fscache_put_operation(op);
432 }
433
434 /*
435  * execute an operation using the slow thread pool to provide processing context
436  * - the caller holds a ref to this object, so we don't need to hold one
437  */
438 static void fscache_op_execute(struct slow_work *work)
439 {
440         struct fscache_operation *op =
441                 container_of(work, struct fscache_operation, slow_work);
442         unsigned long start;
443
444         _enter("{OBJ%x OP%x,%d}",
445                op->object->debug_id, op->debug_id, atomic_read(&op->usage));
446
447         ASSERT(op->processor != NULL);
448         start = jiffies;
449         op->processor(op);
450         fscache_hist(fscache_ops_histogram, start);
451
452         _leave("");
453 }
454
455 const struct slow_work_ops fscache_op_slow_work_ops = {
456         .get_ref        = fscache_op_get_ref,
457         .put_ref        = fscache_op_put_ref,
458         .execute        = fscache_op_execute,
459 };