Merge branches 'tracing/branch-tracer', 'tracing/ftrace', 'tracing/function-return...
[linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 #include "trace.h"
20
21 /* Global flag to disable all recording to ring buffers */
22 static int ring_buffers_off __read_mostly;
23
24 /**
25  * tracing_on - enable all tracing buffers
26  *
27  * This function enables all tracing buffers that may have been
28  * disabled with tracing_off.
29  */
30 void tracing_on(void)
31 {
32         ring_buffers_off = 0;
33 }
34
35 /**
36  * tracing_off - turn off all tracing buffers
37  *
38  * This function stops all tracing buffers from recording data.
39  * It does not disable any overhead the tracers themselves may
40  * be causing. This function simply causes all recording to
41  * the ring buffers to fail.
42  */
43 void tracing_off(void)
44 {
45         ring_buffers_off = 1;
46 }
47
48 #include "trace.h"
49
50 /* Up this if you want to test the TIME_EXTENTS and normalization */
51 #define DEBUG_SHIFT 0
52
53 /* FIXME!!! */
54 u64 ring_buffer_time_stamp(int cpu)
55 {
56         u64 time;
57
58         preempt_disable_notrace();
59         /* shift to debug/test normalization and TIME_EXTENTS */
60         time = sched_clock() << DEBUG_SHIFT;
61         preempt_enable_notrace();
62
63         return time;
64 }
65
66 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
67 {
68         /* Just stupid testing the normalize function and deltas */
69         *ts >>= DEBUG_SHIFT;
70 }
71
72 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
73 #define RB_ALIGNMENT_SHIFT      2
74 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
75 #define RB_MAX_SMALL_DATA       28
76
77 enum {
78         RB_LEN_TIME_EXTEND = 8,
79         RB_LEN_TIME_STAMP = 16,
80 };
81
82 /* inline for ring buffer fast paths */
83 static inline unsigned
84 rb_event_length(struct ring_buffer_event *event)
85 {
86         unsigned length;
87
88         switch (event->type) {
89         case RINGBUF_TYPE_PADDING:
90                 /* undefined */
91                 return -1;
92
93         case RINGBUF_TYPE_TIME_EXTEND:
94                 return RB_LEN_TIME_EXTEND;
95
96         case RINGBUF_TYPE_TIME_STAMP:
97                 return RB_LEN_TIME_STAMP;
98
99         case RINGBUF_TYPE_DATA:
100                 if (event->len)
101                         length = event->len << RB_ALIGNMENT_SHIFT;
102                 else
103                         length = event->array[0];
104                 return length + RB_EVNT_HDR_SIZE;
105         default:
106                 BUG();
107         }
108         /* not hit */
109         return 0;
110 }
111
112 /**
113  * ring_buffer_event_length - return the length of the event
114  * @event: the event to get the length of
115  */
116 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
117 {
118         return rb_event_length(event);
119 }
120
121 /* inline for ring buffer fast paths */
122 static inline void *
123 rb_event_data(struct ring_buffer_event *event)
124 {
125         BUG_ON(event->type != RINGBUF_TYPE_DATA);
126         /* If length is in len field, then array[0] has the data */
127         if (event->len)
128                 return (void *)&event->array[0];
129         /* Otherwise length is in array[0] and array[1] has the data */
130         return (void *)&event->array[1];
131 }
132
133 /**
134  * ring_buffer_event_data - return the data of the event
135  * @event: the event to get the data from
136  */
137 void *ring_buffer_event_data(struct ring_buffer_event *event)
138 {
139         return rb_event_data(event);
140 }
141
142 #define for_each_buffer_cpu(buffer, cpu)                \
143         for_each_cpu_mask(cpu, buffer->cpumask)
144
145 #define TS_SHIFT        27
146 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
147 #define TS_DELTA_TEST   (~TS_MASK)
148
149 /*
150  * This hack stolen from mm/slob.c.
151  * We can store per page timing information in the page frame of the page.
152  * Thanks to Peter Zijlstra for suggesting this idea.
153  */
154 struct buffer_page {
155         u64              time_stamp;    /* page time stamp */
156         local_t          write;         /* index for next write */
157         local_t          commit;        /* write commited index */
158         unsigned         read;          /* index for next read */
159         struct list_head list;          /* list of free pages */
160         void *page;                     /* Actual data page */
161 };
162
163 /*
164  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
165  * this issue out.
166  */
167 static inline void free_buffer_page(struct buffer_page *bpage)
168 {
169         if (bpage->page)
170                 free_page((unsigned long)bpage->page);
171         kfree(bpage);
172 }
173
174 /*
175  * We need to fit the time_stamp delta into 27 bits.
176  */
177 static inline int test_time_stamp(u64 delta)
178 {
179         if (delta & TS_DELTA_TEST)
180                 return 1;
181         return 0;
182 }
183
184 #define BUF_PAGE_SIZE PAGE_SIZE
185
186 /*
187  * head_page == tail_page && head == tail then buffer is empty.
188  */
189 struct ring_buffer_per_cpu {
190         int                             cpu;
191         struct ring_buffer              *buffer;
192         spinlock_t                      reader_lock; /* serialize readers */
193         raw_spinlock_t                  lock;
194         struct lock_class_key           lock_key;
195         struct list_head                pages;
196         struct buffer_page              *head_page;     /* read from head */
197         struct buffer_page              *tail_page;     /* write to tail */
198         struct buffer_page              *commit_page;   /* commited pages */
199         struct buffer_page              *reader_page;
200         unsigned long                   overrun;
201         unsigned long                   entries;
202         u64                             write_stamp;
203         u64                             read_stamp;
204         atomic_t                        record_disabled;
205 };
206
207 struct ring_buffer {
208         unsigned long                   size;
209         unsigned                        pages;
210         unsigned                        flags;
211         int                             cpus;
212         cpumask_t                       cpumask;
213         atomic_t                        record_disabled;
214
215         struct mutex                    mutex;
216
217         struct ring_buffer_per_cpu      **buffers;
218 };
219
220 struct ring_buffer_iter {
221         struct ring_buffer_per_cpu      *cpu_buffer;
222         unsigned long                   head;
223         struct buffer_page              *head_page;
224         u64                             read_stamp;
225 };
226
227 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
228 #define RB_WARN_ON(buffer, cond)                                \
229         ({                                                      \
230                 int _____ret = unlikely(cond);                  \
231                 if (_____ret) {                                 \
232                         atomic_inc(&buffer->record_disabled);   \
233                         WARN_ON(1);                             \
234                 }                                               \
235                 _____ret;                                       \
236         })
237
238 /**
239  * check_pages - integrity check of buffer pages
240  * @cpu_buffer: CPU buffer with pages to test
241  *
242  * As a safty measure we check to make sure the data pages have not
243  * been corrupted.
244  */
245 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
246 {
247         struct list_head *head = &cpu_buffer->pages;
248         struct buffer_page *page, *tmp;
249
250         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
251                 return -1;
252         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
253                 return -1;
254
255         list_for_each_entry_safe(page, tmp, head, list) {
256                 if (RB_WARN_ON(cpu_buffer,
257                                page->list.next->prev != &page->list))
258                         return -1;
259                 if (RB_WARN_ON(cpu_buffer,
260                                page->list.prev->next != &page->list))
261                         return -1;
262         }
263
264         return 0;
265 }
266
267 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
268                              unsigned nr_pages)
269 {
270         struct list_head *head = &cpu_buffer->pages;
271         struct buffer_page *page, *tmp;
272         unsigned long addr;
273         LIST_HEAD(pages);
274         unsigned i;
275
276         for (i = 0; i < nr_pages; i++) {
277                 page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
278                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
279                 if (!page)
280                         goto free_pages;
281                 list_add(&page->list, &pages);
282
283                 addr = __get_free_page(GFP_KERNEL);
284                 if (!addr)
285                         goto free_pages;
286                 page->page = (void *)addr;
287         }
288
289         list_splice(&pages, head);
290
291         rb_check_pages(cpu_buffer);
292
293         return 0;
294
295  free_pages:
296         list_for_each_entry_safe(page, tmp, &pages, list) {
297                 list_del_init(&page->list);
298                 free_buffer_page(page);
299         }
300         return -ENOMEM;
301 }
302
303 static struct ring_buffer_per_cpu *
304 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
305 {
306         struct ring_buffer_per_cpu *cpu_buffer;
307         struct buffer_page *page;
308         unsigned long addr;
309         int ret;
310
311         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
312                                   GFP_KERNEL, cpu_to_node(cpu));
313         if (!cpu_buffer)
314                 return NULL;
315
316         cpu_buffer->cpu = cpu;
317         cpu_buffer->buffer = buffer;
318         spin_lock_init(&cpu_buffer->reader_lock);
319         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
320         INIT_LIST_HEAD(&cpu_buffer->pages);
321
322         page = kzalloc_node(ALIGN(sizeof(*page), cache_line_size()),
323                             GFP_KERNEL, cpu_to_node(cpu));
324         if (!page)
325                 goto fail_free_buffer;
326
327         cpu_buffer->reader_page = page;
328         addr = __get_free_page(GFP_KERNEL);
329         if (!addr)
330                 goto fail_free_reader;
331         page->page = (void *)addr;
332
333         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
334
335         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
336         if (ret < 0)
337                 goto fail_free_reader;
338
339         cpu_buffer->head_page
340                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
341         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
342
343         return cpu_buffer;
344
345  fail_free_reader:
346         free_buffer_page(cpu_buffer->reader_page);
347
348  fail_free_buffer:
349         kfree(cpu_buffer);
350         return NULL;
351 }
352
353 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
354 {
355         struct list_head *head = &cpu_buffer->pages;
356         struct buffer_page *page, *tmp;
357
358         list_del_init(&cpu_buffer->reader_page->list);
359         free_buffer_page(cpu_buffer->reader_page);
360
361         list_for_each_entry_safe(page, tmp, head, list) {
362                 list_del_init(&page->list);
363                 free_buffer_page(page);
364         }
365         kfree(cpu_buffer);
366 }
367
368 /*
369  * Causes compile errors if the struct buffer_page gets bigger
370  * than the struct page.
371  */
372 extern int ring_buffer_page_too_big(void);
373
374 /**
375  * ring_buffer_alloc - allocate a new ring_buffer
376  * @size: the size in bytes that is needed.
377  * @flags: attributes to set for the ring buffer.
378  *
379  * Currently the only flag that is available is the RB_FL_OVERWRITE
380  * flag. This flag means that the buffer will overwrite old data
381  * when the buffer wraps. If this flag is not set, the buffer will
382  * drop data when the tail hits the head.
383  */
384 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
385 {
386         struct ring_buffer *buffer;
387         int bsize;
388         int cpu;
389
390         /* Paranoid! Optimizes out when all is well */
391         if (sizeof(struct buffer_page) > sizeof(struct page))
392                 ring_buffer_page_too_big();
393
394
395         /* keep it in its own cache line */
396         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
397                          GFP_KERNEL);
398         if (!buffer)
399                 return NULL;
400
401         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
402         buffer->flags = flags;
403
404         /* need at least two pages */
405         if (buffer->pages == 1)
406                 buffer->pages++;
407
408         buffer->cpumask = cpu_possible_map;
409         buffer->cpus = nr_cpu_ids;
410
411         bsize = sizeof(void *) * nr_cpu_ids;
412         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
413                                   GFP_KERNEL);
414         if (!buffer->buffers)
415                 goto fail_free_buffer;
416
417         for_each_buffer_cpu(buffer, cpu) {
418                 buffer->buffers[cpu] =
419                         rb_allocate_cpu_buffer(buffer, cpu);
420                 if (!buffer->buffers[cpu])
421                         goto fail_free_buffers;
422         }
423
424         mutex_init(&buffer->mutex);
425
426         return buffer;
427
428  fail_free_buffers:
429         for_each_buffer_cpu(buffer, cpu) {
430                 if (buffer->buffers[cpu])
431                         rb_free_cpu_buffer(buffer->buffers[cpu]);
432         }
433         kfree(buffer->buffers);
434
435  fail_free_buffer:
436         kfree(buffer);
437         return NULL;
438 }
439
440 /**
441  * ring_buffer_free - free a ring buffer.
442  * @buffer: the buffer to free.
443  */
444 void
445 ring_buffer_free(struct ring_buffer *buffer)
446 {
447         int cpu;
448
449         for_each_buffer_cpu(buffer, cpu)
450                 rb_free_cpu_buffer(buffer->buffers[cpu]);
451
452         kfree(buffer);
453 }
454
455 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
456
457 static void
458 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
459 {
460         struct buffer_page *page;
461         struct list_head *p;
462         unsigned i;
463
464         atomic_inc(&cpu_buffer->record_disabled);
465         synchronize_sched();
466
467         for (i = 0; i < nr_pages; i++) {
468                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
469                         return;
470                 p = cpu_buffer->pages.next;
471                 page = list_entry(p, struct buffer_page, list);
472                 list_del_init(&page->list);
473                 free_buffer_page(page);
474         }
475         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
476                 return;
477
478         rb_reset_cpu(cpu_buffer);
479
480         rb_check_pages(cpu_buffer);
481
482         atomic_dec(&cpu_buffer->record_disabled);
483
484 }
485
486 static void
487 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
488                 struct list_head *pages, unsigned nr_pages)
489 {
490         struct buffer_page *page;
491         struct list_head *p;
492         unsigned i;
493
494         atomic_inc(&cpu_buffer->record_disabled);
495         synchronize_sched();
496
497         for (i = 0; i < nr_pages; i++) {
498                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
499                         return;
500                 p = pages->next;
501                 page = list_entry(p, struct buffer_page, list);
502                 list_del_init(&page->list);
503                 list_add_tail(&page->list, &cpu_buffer->pages);
504         }
505         rb_reset_cpu(cpu_buffer);
506
507         rb_check_pages(cpu_buffer);
508
509         atomic_dec(&cpu_buffer->record_disabled);
510 }
511
512 /**
513  * ring_buffer_resize - resize the ring buffer
514  * @buffer: the buffer to resize.
515  * @size: the new size.
516  *
517  * The tracer is responsible for making sure that the buffer is
518  * not being used while changing the size.
519  * Note: We may be able to change the above requirement by using
520  *  RCU synchronizations.
521  *
522  * Minimum size is 2 * BUF_PAGE_SIZE.
523  *
524  * Returns -1 on failure.
525  */
526 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
527 {
528         struct ring_buffer_per_cpu *cpu_buffer;
529         unsigned nr_pages, rm_pages, new_pages;
530         struct buffer_page *page, *tmp;
531         unsigned long buffer_size;
532         unsigned long addr;
533         LIST_HEAD(pages);
534         int i, cpu;
535
536         /*
537          * Always succeed at resizing a non-existent buffer:
538          */
539         if (!buffer)
540                 return size;
541
542         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
543         size *= BUF_PAGE_SIZE;
544         buffer_size = buffer->pages * BUF_PAGE_SIZE;
545
546         /* we need a minimum of two pages */
547         if (size < BUF_PAGE_SIZE * 2)
548                 size = BUF_PAGE_SIZE * 2;
549
550         if (size == buffer_size)
551                 return size;
552
553         mutex_lock(&buffer->mutex);
554
555         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
556
557         if (size < buffer_size) {
558
559                 /* easy case, just free pages */
560                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages)) {
561                         mutex_unlock(&buffer->mutex);
562                         return -1;
563                 }
564
565                 rm_pages = buffer->pages - nr_pages;
566
567                 for_each_buffer_cpu(buffer, cpu) {
568                         cpu_buffer = buffer->buffers[cpu];
569                         rb_remove_pages(cpu_buffer, rm_pages);
570                 }
571                 goto out;
572         }
573
574         /*
575          * This is a bit more difficult. We only want to add pages
576          * when we can allocate enough for all CPUs. We do this
577          * by allocating all the pages and storing them on a local
578          * link list. If we succeed in our allocation, then we
579          * add these pages to the cpu_buffers. Otherwise we just free
580          * them all and return -ENOMEM;
581          */
582         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages)) {
583                 mutex_unlock(&buffer->mutex);
584                 return -1;
585         }
586
587         new_pages = nr_pages - buffer->pages;
588
589         for_each_buffer_cpu(buffer, cpu) {
590                 for (i = 0; i < new_pages; i++) {
591                         page = kzalloc_node(ALIGN(sizeof(*page),
592                                                   cache_line_size()),
593                                             GFP_KERNEL, cpu_to_node(cpu));
594                         if (!page)
595                                 goto free_pages;
596                         list_add(&page->list, &pages);
597                         addr = __get_free_page(GFP_KERNEL);
598                         if (!addr)
599                                 goto free_pages;
600                         page->page = (void *)addr;
601                 }
602         }
603
604         for_each_buffer_cpu(buffer, cpu) {
605                 cpu_buffer = buffer->buffers[cpu];
606                 rb_insert_pages(cpu_buffer, &pages, new_pages);
607         }
608
609         if (RB_WARN_ON(buffer, !list_empty(&pages))) {
610                 mutex_unlock(&buffer->mutex);
611                 return -1;
612         }
613
614  out:
615         buffer->pages = nr_pages;
616         mutex_unlock(&buffer->mutex);
617
618         return size;
619
620  free_pages:
621         list_for_each_entry_safe(page, tmp, &pages, list) {
622                 list_del_init(&page->list);
623                 free_buffer_page(page);
624         }
625         return -ENOMEM;
626 }
627
628 static inline int rb_null_event(struct ring_buffer_event *event)
629 {
630         return event->type == RINGBUF_TYPE_PADDING;
631 }
632
633 static inline void *__rb_page_index(struct buffer_page *page, unsigned index)
634 {
635         return page->page + index;
636 }
637
638 static inline struct ring_buffer_event *
639 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
640 {
641         return __rb_page_index(cpu_buffer->reader_page,
642                                cpu_buffer->reader_page->read);
643 }
644
645 static inline struct ring_buffer_event *
646 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
647 {
648         return __rb_page_index(cpu_buffer->head_page,
649                                cpu_buffer->head_page->read);
650 }
651
652 static inline struct ring_buffer_event *
653 rb_iter_head_event(struct ring_buffer_iter *iter)
654 {
655         return __rb_page_index(iter->head_page, iter->head);
656 }
657
658 static inline unsigned rb_page_write(struct buffer_page *bpage)
659 {
660         return local_read(&bpage->write);
661 }
662
663 static inline unsigned rb_page_commit(struct buffer_page *bpage)
664 {
665         return local_read(&bpage->commit);
666 }
667
668 /* Size is determined by what has been commited */
669 static inline unsigned rb_page_size(struct buffer_page *bpage)
670 {
671         return rb_page_commit(bpage);
672 }
673
674 static inline unsigned
675 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
676 {
677         return rb_page_commit(cpu_buffer->commit_page);
678 }
679
680 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
681 {
682         return rb_page_commit(cpu_buffer->head_page);
683 }
684
685 /*
686  * When the tail hits the head and the buffer is in overwrite mode,
687  * the head jumps to the next page and all content on the previous
688  * page is discarded. But before doing so, we update the overrun
689  * variable of the buffer.
690  */
691 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
692 {
693         struct ring_buffer_event *event;
694         unsigned long head;
695
696         for (head = 0; head < rb_head_size(cpu_buffer);
697              head += rb_event_length(event)) {
698
699                 event = __rb_page_index(cpu_buffer->head_page, head);
700                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
701                         return;
702                 /* Only count data entries */
703                 if (event->type != RINGBUF_TYPE_DATA)
704                         continue;
705                 cpu_buffer->overrun++;
706                 cpu_buffer->entries--;
707         }
708 }
709
710 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
711                                struct buffer_page **page)
712 {
713         struct list_head *p = (*page)->list.next;
714
715         if (p == &cpu_buffer->pages)
716                 p = p->next;
717
718         *page = list_entry(p, struct buffer_page, list);
719 }
720
721 static inline unsigned
722 rb_event_index(struct ring_buffer_event *event)
723 {
724         unsigned long addr = (unsigned long)event;
725
726         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
727 }
728
729 static inline int
730 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
731              struct ring_buffer_event *event)
732 {
733         unsigned long addr = (unsigned long)event;
734         unsigned long index;
735
736         index = rb_event_index(event);
737         addr &= PAGE_MASK;
738
739         return cpu_buffer->commit_page->page == (void *)addr &&
740                 rb_commit_index(cpu_buffer) == index;
741 }
742
743 static inline void
744 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
745                     struct ring_buffer_event *event)
746 {
747         unsigned long addr = (unsigned long)event;
748         unsigned long index;
749
750         index = rb_event_index(event);
751         addr &= PAGE_MASK;
752
753         while (cpu_buffer->commit_page->page != (void *)addr) {
754                 if (RB_WARN_ON(cpu_buffer,
755                           cpu_buffer->commit_page == cpu_buffer->tail_page))
756                         return;
757                 cpu_buffer->commit_page->commit =
758                         cpu_buffer->commit_page->write;
759                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
760                 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
761         }
762
763         /* Now set the commit to the event's index */
764         local_set(&cpu_buffer->commit_page->commit, index);
765 }
766
767 static inline void
768 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
769 {
770         /*
771          * We only race with interrupts and NMIs on this CPU.
772          * If we own the commit event, then we can commit
773          * all others that interrupted us, since the interruptions
774          * are in stack format (they finish before they come
775          * back to us). This allows us to do a simple loop to
776          * assign the commit to the tail.
777          */
778         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
779                 cpu_buffer->commit_page->commit =
780                         cpu_buffer->commit_page->write;
781                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
782                 cpu_buffer->write_stamp = cpu_buffer->commit_page->time_stamp;
783                 /* add barrier to keep gcc from optimizing too much */
784                 barrier();
785         }
786         while (rb_commit_index(cpu_buffer) !=
787                rb_page_write(cpu_buffer->commit_page)) {
788                 cpu_buffer->commit_page->commit =
789                         cpu_buffer->commit_page->write;
790                 barrier();
791         }
792 }
793
794 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
795 {
796         cpu_buffer->read_stamp = cpu_buffer->reader_page->time_stamp;
797         cpu_buffer->reader_page->read = 0;
798 }
799
800 static inline void rb_inc_iter(struct ring_buffer_iter *iter)
801 {
802         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
803
804         /*
805          * The iterator could be on the reader page (it starts there).
806          * But the head could have moved, since the reader was
807          * found. Check for this case and assign the iterator
808          * to the head page instead of next.
809          */
810         if (iter->head_page == cpu_buffer->reader_page)
811                 iter->head_page = cpu_buffer->head_page;
812         else
813                 rb_inc_page(cpu_buffer, &iter->head_page);
814
815         iter->read_stamp = iter->head_page->time_stamp;
816         iter->head = 0;
817 }
818
819 /**
820  * ring_buffer_update_event - update event type and data
821  * @event: the even to update
822  * @type: the type of event
823  * @length: the size of the event field in the ring buffer
824  *
825  * Update the type and data fields of the event. The length
826  * is the actual size that is written to the ring buffer,
827  * and with this, we can determine what to place into the
828  * data field.
829  */
830 static inline void
831 rb_update_event(struct ring_buffer_event *event,
832                          unsigned type, unsigned length)
833 {
834         event->type = type;
835
836         switch (type) {
837
838         case RINGBUF_TYPE_PADDING:
839                 break;
840
841         case RINGBUF_TYPE_TIME_EXTEND:
842                 event->len =
843                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
844                         >> RB_ALIGNMENT_SHIFT;
845                 break;
846
847         case RINGBUF_TYPE_TIME_STAMP:
848                 event->len =
849                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
850                         >> RB_ALIGNMENT_SHIFT;
851                 break;
852
853         case RINGBUF_TYPE_DATA:
854                 length -= RB_EVNT_HDR_SIZE;
855                 if (length > RB_MAX_SMALL_DATA) {
856                         event->len = 0;
857                         event->array[0] = length;
858                 } else
859                         event->len =
860                                 (length + (RB_ALIGNMENT-1))
861                                 >> RB_ALIGNMENT_SHIFT;
862                 break;
863         default:
864                 BUG();
865         }
866 }
867
868 static inline unsigned rb_calculate_event_length(unsigned length)
869 {
870         struct ring_buffer_event event; /* Used only for sizeof array */
871
872         /* zero length can cause confusions */
873         if (!length)
874                 length = 1;
875
876         if (length > RB_MAX_SMALL_DATA)
877                 length += sizeof(event.array[0]);
878
879         length += RB_EVNT_HDR_SIZE;
880         length = ALIGN(length, RB_ALIGNMENT);
881
882         return length;
883 }
884
885 static struct ring_buffer_event *
886 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
887                   unsigned type, unsigned long length, u64 *ts)
888 {
889         struct buffer_page *tail_page, *head_page, *reader_page;
890         unsigned long tail, write;
891         struct ring_buffer *buffer = cpu_buffer->buffer;
892         struct ring_buffer_event *event;
893         unsigned long flags;
894
895         tail_page = cpu_buffer->tail_page;
896         write = local_add_return(length, &tail_page->write);
897         tail = write - length;
898
899         /* See if we shot pass the end of this buffer page */
900         if (write > BUF_PAGE_SIZE) {
901                 struct buffer_page *next_page = tail_page;
902
903                 local_irq_save(flags);
904                 __raw_spin_lock(&cpu_buffer->lock);
905
906                 rb_inc_page(cpu_buffer, &next_page);
907
908                 head_page = cpu_buffer->head_page;
909                 reader_page = cpu_buffer->reader_page;
910
911                 /* we grabbed the lock before incrementing */
912                 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
913                         goto out_unlock;
914
915                 /*
916                  * If for some reason, we had an interrupt storm that made
917                  * it all the way around the buffer, bail, and warn
918                  * about it.
919                  */
920                 if (unlikely(next_page == cpu_buffer->commit_page)) {
921                         WARN_ON_ONCE(1);
922                         goto out_unlock;
923                 }
924
925                 if (next_page == head_page) {
926                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
927                                 /* reset write */
928                                 if (tail <= BUF_PAGE_SIZE)
929                                         local_set(&tail_page->write, tail);
930                                 goto out_unlock;
931                         }
932
933                         /* tail_page has not moved yet? */
934                         if (tail_page == cpu_buffer->tail_page) {
935                                 /* count overflows */
936                                 rb_update_overflow(cpu_buffer);
937
938                                 rb_inc_page(cpu_buffer, &head_page);
939                                 cpu_buffer->head_page = head_page;
940                                 cpu_buffer->head_page->read = 0;
941                         }
942                 }
943
944                 /*
945                  * If the tail page is still the same as what we think
946                  * it is, then it is up to us to update the tail
947                  * pointer.
948                  */
949                 if (tail_page == cpu_buffer->tail_page) {
950                         local_set(&next_page->write, 0);
951                         local_set(&next_page->commit, 0);
952                         cpu_buffer->tail_page = next_page;
953
954                         /* reread the time stamp */
955                         *ts = ring_buffer_time_stamp(cpu_buffer->cpu);
956                         cpu_buffer->tail_page->time_stamp = *ts;
957                 }
958
959                 /*
960                  * The actual tail page has moved forward.
961                  */
962                 if (tail < BUF_PAGE_SIZE) {
963                         /* Mark the rest of the page with padding */
964                         event = __rb_page_index(tail_page, tail);
965                         event->type = RINGBUF_TYPE_PADDING;
966                 }
967
968                 if (tail <= BUF_PAGE_SIZE)
969                         /* Set the write back to the previous setting */
970                         local_set(&tail_page->write, tail);
971
972                 /*
973                  * If this was a commit entry that failed,
974                  * increment that too
975                  */
976                 if (tail_page == cpu_buffer->commit_page &&
977                     tail == rb_commit_index(cpu_buffer)) {
978                         rb_set_commit_to_write(cpu_buffer);
979                 }
980
981                 __raw_spin_unlock(&cpu_buffer->lock);
982                 local_irq_restore(flags);
983
984                 /* fail and let the caller try again */
985                 return ERR_PTR(-EAGAIN);
986         }
987
988         /* We reserved something on the buffer */
989
990         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
991                 return NULL;
992
993         event = __rb_page_index(tail_page, tail);
994         rb_update_event(event, type, length);
995
996         /*
997          * If this is a commit and the tail is zero, then update
998          * this page's time stamp.
999          */
1000         if (!tail && rb_is_commit(cpu_buffer, event))
1001                 cpu_buffer->commit_page->time_stamp = *ts;
1002
1003         return event;
1004
1005  out_unlock:
1006         __raw_spin_unlock(&cpu_buffer->lock);
1007         local_irq_restore(flags);
1008         return NULL;
1009 }
1010
1011 static int
1012 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1013                   u64 *ts, u64 *delta)
1014 {
1015         struct ring_buffer_event *event;
1016         static int once;
1017         int ret;
1018
1019         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1020                 printk(KERN_WARNING "Delta way too big! %llu"
1021                        " ts=%llu write stamp = %llu\n",
1022                        (unsigned long long)*delta,
1023                        (unsigned long long)*ts,
1024                        (unsigned long long)cpu_buffer->write_stamp);
1025                 WARN_ON(1);
1026         }
1027
1028         /*
1029          * The delta is too big, we to add a
1030          * new timestamp.
1031          */
1032         event = __rb_reserve_next(cpu_buffer,
1033                                   RINGBUF_TYPE_TIME_EXTEND,
1034                                   RB_LEN_TIME_EXTEND,
1035                                   ts);
1036         if (!event)
1037                 return -EBUSY;
1038
1039         if (PTR_ERR(event) == -EAGAIN)
1040                 return -EAGAIN;
1041
1042         /* Only a commited time event can update the write stamp */
1043         if (rb_is_commit(cpu_buffer, event)) {
1044                 /*
1045                  * If this is the first on the page, then we need to
1046                  * update the page itself, and just put in a zero.
1047                  */
1048                 if (rb_event_index(event)) {
1049                         event->time_delta = *delta & TS_MASK;
1050                         event->array[0] = *delta >> TS_SHIFT;
1051                 } else {
1052                         cpu_buffer->commit_page->time_stamp = *ts;
1053                         event->time_delta = 0;
1054                         event->array[0] = 0;
1055                 }
1056                 cpu_buffer->write_stamp = *ts;
1057                 /* let the caller know this was the commit */
1058                 ret = 1;
1059         } else {
1060                 /* Darn, this is just wasted space */
1061                 event->time_delta = 0;
1062                 event->array[0] = 0;
1063                 ret = 0;
1064         }
1065
1066         *delta = 0;
1067
1068         return ret;
1069 }
1070
1071 static struct ring_buffer_event *
1072 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1073                       unsigned type, unsigned long length)
1074 {
1075         struct ring_buffer_event *event;
1076         u64 ts, delta;
1077         int commit = 0;
1078         int nr_loops = 0;
1079
1080  again:
1081         /*
1082          * We allow for interrupts to reenter here and do a trace.
1083          * If one does, it will cause this original code to loop
1084          * back here. Even with heavy interrupts happening, this
1085          * should only happen a few times in a row. If this happens
1086          * 1000 times in a row, there must be either an interrupt
1087          * storm or we have something buggy.
1088          * Bail!
1089          */
1090         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1091                 return NULL;
1092
1093         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
1094
1095         /*
1096          * Only the first commit can update the timestamp.
1097          * Yes there is a race here. If an interrupt comes in
1098          * just after the conditional and it traces too, then it
1099          * will also check the deltas. More than one timestamp may
1100          * also be made. But only the entry that did the actual
1101          * commit will be something other than zero.
1102          */
1103         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1104             rb_page_write(cpu_buffer->tail_page) ==
1105             rb_commit_index(cpu_buffer)) {
1106
1107                 delta = ts - cpu_buffer->write_stamp;
1108
1109                 /* make sure this delta is calculated here */
1110                 barrier();
1111
1112                 /* Did the write stamp get updated already? */
1113                 if (unlikely(ts < cpu_buffer->write_stamp))
1114                         delta = 0;
1115
1116                 if (test_time_stamp(delta)) {
1117
1118                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1119
1120                         if (commit == -EBUSY)
1121                                 return NULL;
1122
1123                         if (commit == -EAGAIN)
1124                                 goto again;
1125
1126                         RB_WARN_ON(cpu_buffer, commit < 0);
1127                 }
1128         } else
1129                 /* Non commits have zero deltas */
1130                 delta = 0;
1131
1132         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1133         if (PTR_ERR(event) == -EAGAIN)
1134                 goto again;
1135
1136         if (!event) {
1137                 if (unlikely(commit))
1138                         /*
1139                          * Ouch! We needed a timestamp and it was commited. But
1140                          * we didn't get our event reserved.
1141                          */
1142                         rb_set_commit_to_write(cpu_buffer);
1143                 return NULL;
1144         }
1145
1146         /*
1147          * If the timestamp was commited, make the commit our entry
1148          * now so that we will update it when needed.
1149          */
1150         if (commit)
1151                 rb_set_commit_event(cpu_buffer, event);
1152         else if (!rb_is_commit(cpu_buffer, event))
1153                 delta = 0;
1154
1155         event->time_delta = delta;
1156
1157         return event;
1158 }
1159
1160 static DEFINE_PER_CPU(int, rb_need_resched);
1161
1162 /**
1163  * ring_buffer_lock_reserve - reserve a part of the buffer
1164  * @buffer: the ring buffer to reserve from
1165  * @length: the length of the data to reserve (excluding event header)
1166  * @flags: a pointer to save the interrupt flags
1167  *
1168  * Returns a reseverd event on the ring buffer to copy directly to.
1169  * The user of this interface will need to get the body to write into
1170  * and can use the ring_buffer_event_data() interface.
1171  *
1172  * The length is the length of the data needed, not the event length
1173  * which also includes the event header.
1174  *
1175  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1176  * If NULL is returned, then nothing has been allocated or locked.
1177  */
1178 struct ring_buffer_event *
1179 ring_buffer_lock_reserve(struct ring_buffer *buffer,
1180                          unsigned long length,
1181                          unsigned long *flags)
1182 {
1183         struct ring_buffer_per_cpu *cpu_buffer;
1184         struct ring_buffer_event *event;
1185         int cpu, resched;
1186
1187         if (ring_buffers_off)
1188                 return NULL;
1189
1190         if (atomic_read(&buffer->record_disabled))
1191                 return NULL;
1192
1193         /* If we are tracing schedule, we don't want to recurse */
1194         resched = ftrace_preempt_disable();
1195
1196         cpu = raw_smp_processor_id();
1197
1198         if (!cpu_isset(cpu, buffer->cpumask))
1199                 goto out;
1200
1201         cpu_buffer = buffer->buffers[cpu];
1202
1203         if (atomic_read(&cpu_buffer->record_disabled))
1204                 goto out;
1205
1206         length = rb_calculate_event_length(length);
1207         if (length > BUF_PAGE_SIZE)
1208                 goto out;
1209
1210         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1211         if (!event)
1212                 goto out;
1213
1214         /*
1215          * Need to store resched state on this cpu.
1216          * Only the first needs to.
1217          */
1218
1219         if (preempt_count() == 1)
1220                 per_cpu(rb_need_resched, cpu) = resched;
1221
1222         return event;
1223
1224  out:
1225         ftrace_preempt_enable(resched);
1226         return NULL;
1227 }
1228
1229 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1230                       struct ring_buffer_event *event)
1231 {
1232         cpu_buffer->entries++;
1233
1234         /* Only process further if we own the commit */
1235         if (!rb_is_commit(cpu_buffer, event))
1236                 return;
1237
1238         cpu_buffer->write_stamp += event->time_delta;
1239
1240         rb_set_commit_to_write(cpu_buffer);
1241 }
1242
1243 /**
1244  * ring_buffer_unlock_commit - commit a reserved
1245  * @buffer: The buffer to commit to
1246  * @event: The event pointer to commit.
1247  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1248  *
1249  * This commits the data to the ring buffer, and releases any locks held.
1250  *
1251  * Must be paired with ring_buffer_lock_reserve.
1252  */
1253 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1254                               struct ring_buffer_event *event,
1255                               unsigned long flags)
1256 {
1257         struct ring_buffer_per_cpu *cpu_buffer;
1258         int cpu = raw_smp_processor_id();
1259
1260         cpu_buffer = buffer->buffers[cpu];
1261
1262         rb_commit(cpu_buffer, event);
1263
1264         /*
1265          * Only the last preempt count needs to restore preemption.
1266          */
1267         if (preempt_count() == 1)
1268                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1269         else
1270                 preempt_enable_no_resched_notrace();
1271
1272         return 0;
1273 }
1274
1275 /**
1276  * ring_buffer_write - write data to the buffer without reserving
1277  * @buffer: The ring buffer to write to.
1278  * @length: The length of the data being written (excluding the event header)
1279  * @data: The data to write to the buffer.
1280  *
1281  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1282  * one function. If you already have the data to write to the buffer, it
1283  * may be easier to simply call this function.
1284  *
1285  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1286  * and not the length of the event which would hold the header.
1287  */
1288 int ring_buffer_write(struct ring_buffer *buffer,
1289                         unsigned long length,
1290                         void *data)
1291 {
1292         struct ring_buffer_per_cpu *cpu_buffer;
1293         struct ring_buffer_event *event;
1294         unsigned long event_length;
1295         void *body;
1296         int ret = -EBUSY;
1297         int cpu, resched;
1298
1299         if (ring_buffers_off)
1300                 return -EBUSY;
1301
1302         if (atomic_read(&buffer->record_disabled))
1303                 return -EBUSY;
1304
1305         resched = ftrace_preempt_disable();
1306
1307         cpu = raw_smp_processor_id();
1308
1309         if (!cpu_isset(cpu, buffer->cpumask))
1310                 goto out;
1311
1312         cpu_buffer = buffer->buffers[cpu];
1313
1314         if (atomic_read(&cpu_buffer->record_disabled))
1315                 goto out;
1316
1317         event_length = rb_calculate_event_length(length);
1318         event = rb_reserve_next_event(cpu_buffer,
1319                                       RINGBUF_TYPE_DATA, event_length);
1320         if (!event)
1321                 goto out;
1322
1323         body = rb_event_data(event);
1324
1325         memcpy(body, data, length);
1326
1327         rb_commit(cpu_buffer, event);
1328
1329         ret = 0;
1330  out:
1331         ftrace_preempt_enable(resched);
1332
1333         return ret;
1334 }
1335
1336 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1337 {
1338         struct buffer_page *reader = cpu_buffer->reader_page;
1339         struct buffer_page *head = cpu_buffer->head_page;
1340         struct buffer_page *commit = cpu_buffer->commit_page;
1341
1342         return reader->read == rb_page_commit(reader) &&
1343                 (commit == reader ||
1344                  (commit == head &&
1345                   head->read == rb_page_commit(commit)));
1346 }
1347
1348 /**
1349  * ring_buffer_record_disable - stop all writes into the buffer
1350  * @buffer: The ring buffer to stop writes to.
1351  *
1352  * This prevents all writes to the buffer. Any attempt to write
1353  * to the buffer after this will fail and return NULL.
1354  *
1355  * The caller should call synchronize_sched() after this.
1356  */
1357 void ring_buffer_record_disable(struct ring_buffer *buffer)
1358 {
1359         atomic_inc(&buffer->record_disabled);
1360 }
1361
1362 /**
1363  * ring_buffer_record_enable - enable writes to the buffer
1364  * @buffer: The ring buffer to enable writes
1365  *
1366  * Note, multiple disables will need the same number of enables
1367  * to truely enable the writing (much like preempt_disable).
1368  */
1369 void ring_buffer_record_enable(struct ring_buffer *buffer)
1370 {
1371         atomic_dec(&buffer->record_disabled);
1372 }
1373
1374 /**
1375  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1376  * @buffer: The ring buffer to stop writes to.
1377  * @cpu: The CPU buffer to stop
1378  *
1379  * This prevents all writes to the buffer. Any attempt to write
1380  * to the buffer after this will fail and return NULL.
1381  *
1382  * The caller should call synchronize_sched() after this.
1383  */
1384 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1385 {
1386         struct ring_buffer_per_cpu *cpu_buffer;
1387
1388         if (!cpu_isset(cpu, buffer->cpumask))
1389                 return;
1390
1391         cpu_buffer = buffer->buffers[cpu];
1392         atomic_inc(&cpu_buffer->record_disabled);
1393 }
1394
1395 /**
1396  * ring_buffer_record_enable_cpu - enable writes to the buffer
1397  * @buffer: The ring buffer to enable writes
1398  * @cpu: The CPU to enable.
1399  *
1400  * Note, multiple disables will need the same number of enables
1401  * to truely enable the writing (much like preempt_disable).
1402  */
1403 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1404 {
1405         struct ring_buffer_per_cpu *cpu_buffer;
1406
1407         if (!cpu_isset(cpu, buffer->cpumask))
1408                 return;
1409
1410         cpu_buffer = buffer->buffers[cpu];
1411         atomic_dec(&cpu_buffer->record_disabled);
1412 }
1413
1414 /**
1415  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1416  * @buffer: The ring buffer
1417  * @cpu: The per CPU buffer to get the entries from.
1418  */
1419 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1420 {
1421         struct ring_buffer_per_cpu *cpu_buffer;
1422
1423         if (!cpu_isset(cpu, buffer->cpumask))
1424                 return 0;
1425
1426         cpu_buffer = buffer->buffers[cpu];
1427         return cpu_buffer->entries;
1428 }
1429
1430 /**
1431  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1432  * @buffer: The ring buffer
1433  * @cpu: The per CPU buffer to get the number of overruns from
1434  */
1435 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1436 {
1437         struct ring_buffer_per_cpu *cpu_buffer;
1438
1439         if (!cpu_isset(cpu, buffer->cpumask))
1440                 return 0;
1441
1442         cpu_buffer = buffer->buffers[cpu];
1443         return cpu_buffer->overrun;
1444 }
1445
1446 /**
1447  * ring_buffer_entries - get the number of entries in a buffer
1448  * @buffer: The ring buffer
1449  *
1450  * Returns the total number of entries in the ring buffer
1451  * (all CPU entries)
1452  */
1453 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1454 {
1455         struct ring_buffer_per_cpu *cpu_buffer;
1456         unsigned long entries = 0;
1457         int cpu;
1458
1459         /* if you care about this being correct, lock the buffer */
1460         for_each_buffer_cpu(buffer, cpu) {
1461                 cpu_buffer = buffer->buffers[cpu];
1462                 entries += cpu_buffer->entries;
1463         }
1464
1465         return entries;
1466 }
1467
1468 /**
1469  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1470  * @buffer: The ring buffer
1471  *
1472  * Returns the total number of overruns in the ring buffer
1473  * (all CPU entries)
1474  */
1475 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1476 {
1477         struct ring_buffer_per_cpu *cpu_buffer;
1478         unsigned long overruns = 0;
1479         int cpu;
1480
1481         /* if you care about this being correct, lock the buffer */
1482         for_each_buffer_cpu(buffer, cpu) {
1483                 cpu_buffer = buffer->buffers[cpu];
1484                 overruns += cpu_buffer->overrun;
1485         }
1486
1487         return overruns;
1488 }
1489
1490 static void rb_iter_reset(struct ring_buffer_iter *iter)
1491 {
1492         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1493
1494         /* Iterator usage is expected to have record disabled */
1495         if (list_empty(&cpu_buffer->reader_page->list)) {
1496                 iter->head_page = cpu_buffer->head_page;
1497                 iter->head = cpu_buffer->head_page->read;
1498         } else {
1499                 iter->head_page = cpu_buffer->reader_page;
1500                 iter->head = cpu_buffer->reader_page->read;
1501         }
1502         if (iter->head)
1503                 iter->read_stamp = cpu_buffer->read_stamp;
1504         else
1505                 iter->read_stamp = iter->head_page->time_stamp;
1506 }
1507
1508 /**
1509  * ring_buffer_iter_reset - reset an iterator
1510  * @iter: The iterator to reset
1511  *
1512  * Resets the iterator, so that it will start from the beginning
1513  * again.
1514  */
1515 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1516 {
1517         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1518         unsigned long flags;
1519
1520         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1521         rb_iter_reset(iter);
1522         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1523 }
1524
1525 /**
1526  * ring_buffer_iter_empty - check if an iterator has no more to read
1527  * @iter: The iterator to check
1528  */
1529 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1530 {
1531         struct ring_buffer_per_cpu *cpu_buffer;
1532
1533         cpu_buffer = iter->cpu_buffer;
1534
1535         return iter->head_page == cpu_buffer->commit_page &&
1536                 iter->head == rb_commit_index(cpu_buffer);
1537 }
1538
1539 static void
1540 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1541                      struct ring_buffer_event *event)
1542 {
1543         u64 delta;
1544
1545         switch (event->type) {
1546         case RINGBUF_TYPE_PADDING:
1547                 return;
1548
1549         case RINGBUF_TYPE_TIME_EXTEND:
1550                 delta = event->array[0];
1551                 delta <<= TS_SHIFT;
1552                 delta += event->time_delta;
1553                 cpu_buffer->read_stamp += delta;
1554                 return;
1555
1556         case RINGBUF_TYPE_TIME_STAMP:
1557                 /* FIXME: not implemented */
1558                 return;
1559
1560         case RINGBUF_TYPE_DATA:
1561                 cpu_buffer->read_stamp += event->time_delta;
1562                 return;
1563
1564         default:
1565                 BUG();
1566         }
1567         return;
1568 }
1569
1570 static void
1571 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1572                           struct ring_buffer_event *event)
1573 {
1574         u64 delta;
1575
1576         switch (event->type) {
1577         case RINGBUF_TYPE_PADDING:
1578                 return;
1579
1580         case RINGBUF_TYPE_TIME_EXTEND:
1581                 delta = event->array[0];
1582                 delta <<= TS_SHIFT;
1583                 delta += event->time_delta;
1584                 iter->read_stamp += delta;
1585                 return;
1586
1587         case RINGBUF_TYPE_TIME_STAMP:
1588                 /* FIXME: not implemented */
1589                 return;
1590
1591         case RINGBUF_TYPE_DATA:
1592                 iter->read_stamp += event->time_delta;
1593                 return;
1594
1595         default:
1596                 BUG();
1597         }
1598         return;
1599 }
1600
1601 static struct buffer_page *
1602 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1603 {
1604         struct buffer_page *reader = NULL;
1605         unsigned long flags;
1606         int nr_loops = 0;
1607
1608         local_irq_save(flags);
1609         __raw_spin_lock(&cpu_buffer->lock);
1610
1611  again:
1612         /*
1613          * This should normally only loop twice. But because the
1614          * start of the reader inserts an empty page, it causes
1615          * a case where we will loop three times. There should be no
1616          * reason to loop four times (that I know of).
1617          */
1618         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1619                 reader = NULL;
1620                 goto out;
1621         }
1622
1623         reader = cpu_buffer->reader_page;
1624
1625         /* If there's more to read, return this page */
1626         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1627                 goto out;
1628
1629         /* Never should we have an index greater than the size */
1630         if (RB_WARN_ON(cpu_buffer,
1631                        cpu_buffer->reader_page->read > rb_page_size(reader)))
1632                 goto out;
1633
1634         /* check if we caught up to the tail */
1635         reader = NULL;
1636         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1637                 goto out;
1638
1639         /*
1640          * Splice the empty reader page into the list around the head.
1641          * Reset the reader page to size zero.
1642          */
1643
1644         reader = cpu_buffer->head_page;
1645         cpu_buffer->reader_page->list.next = reader->list.next;
1646         cpu_buffer->reader_page->list.prev = reader->list.prev;
1647
1648         local_set(&cpu_buffer->reader_page->write, 0);
1649         local_set(&cpu_buffer->reader_page->commit, 0);
1650
1651         /* Make the reader page now replace the head */
1652         reader->list.prev->next = &cpu_buffer->reader_page->list;
1653         reader->list.next->prev = &cpu_buffer->reader_page->list;
1654
1655         /*
1656          * If the tail is on the reader, then we must set the head
1657          * to the inserted page, otherwise we set it one before.
1658          */
1659         cpu_buffer->head_page = cpu_buffer->reader_page;
1660
1661         if (cpu_buffer->commit_page != reader)
1662                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1663
1664         /* Finally update the reader page to the new head */
1665         cpu_buffer->reader_page = reader;
1666         rb_reset_reader_page(cpu_buffer);
1667
1668         goto again;
1669
1670  out:
1671         __raw_spin_unlock(&cpu_buffer->lock);
1672         local_irq_restore(flags);
1673
1674         return reader;
1675 }
1676
1677 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1678 {
1679         struct ring_buffer_event *event;
1680         struct buffer_page *reader;
1681         unsigned length;
1682
1683         reader = rb_get_reader_page(cpu_buffer);
1684
1685         /* This function should not be called when buffer is empty */
1686         if (RB_WARN_ON(cpu_buffer, !reader))
1687                 return;
1688
1689         event = rb_reader_event(cpu_buffer);
1690
1691         if (event->type == RINGBUF_TYPE_DATA)
1692                 cpu_buffer->entries--;
1693
1694         rb_update_read_stamp(cpu_buffer, event);
1695
1696         length = rb_event_length(event);
1697         cpu_buffer->reader_page->read += length;
1698 }
1699
1700 static void rb_advance_iter(struct ring_buffer_iter *iter)
1701 {
1702         struct ring_buffer *buffer;
1703         struct ring_buffer_per_cpu *cpu_buffer;
1704         struct ring_buffer_event *event;
1705         unsigned length;
1706
1707         cpu_buffer = iter->cpu_buffer;
1708         buffer = cpu_buffer->buffer;
1709
1710         /*
1711          * Check if we are at the end of the buffer.
1712          */
1713         if (iter->head >= rb_page_size(iter->head_page)) {
1714                 if (RB_WARN_ON(buffer,
1715                                iter->head_page == cpu_buffer->commit_page))
1716                         return;
1717                 rb_inc_iter(iter);
1718                 return;
1719         }
1720
1721         event = rb_iter_head_event(iter);
1722
1723         length = rb_event_length(event);
1724
1725         /*
1726          * This should not be called to advance the header if we are
1727          * at the tail of the buffer.
1728          */
1729         if (RB_WARN_ON(cpu_buffer,
1730                        (iter->head_page == cpu_buffer->commit_page) &&
1731                        (iter->head + length > rb_commit_index(cpu_buffer))))
1732                 return;
1733
1734         rb_update_iter_read_stamp(iter, event);
1735
1736         iter->head += length;
1737
1738         /* check for end of page padding */
1739         if ((iter->head >= rb_page_size(iter->head_page)) &&
1740             (iter->head_page != cpu_buffer->commit_page))
1741                 rb_advance_iter(iter);
1742 }
1743
1744 static struct ring_buffer_event *
1745 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1746 {
1747         struct ring_buffer_per_cpu *cpu_buffer;
1748         struct ring_buffer_event *event;
1749         struct buffer_page *reader;
1750         int nr_loops = 0;
1751
1752         if (!cpu_isset(cpu, buffer->cpumask))
1753                 return NULL;
1754
1755         cpu_buffer = buffer->buffers[cpu];
1756
1757  again:
1758         /*
1759          * We repeat when a timestamp is encountered. It is possible
1760          * to get multiple timestamps from an interrupt entering just
1761          * as one timestamp is about to be written. The max times
1762          * that this can happen is the number of nested interrupts we
1763          * can have.  Nesting 10 deep of interrupts is clearly
1764          * an anomaly.
1765          */
1766         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1767                 return NULL;
1768
1769         reader = rb_get_reader_page(cpu_buffer);
1770         if (!reader)
1771                 return NULL;
1772
1773         event = rb_reader_event(cpu_buffer);
1774
1775         switch (event->type) {
1776         case RINGBUF_TYPE_PADDING:
1777                 RB_WARN_ON(cpu_buffer, 1);
1778                 rb_advance_reader(cpu_buffer);
1779                 return NULL;
1780
1781         case RINGBUF_TYPE_TIME_EXTEND:
1782                 /* Internal data, OK to advance */
1783                 rb_advance_reader(cpu_buffer);
1784                 goto again;
1785
1786         case RINGBUF_TYPE_TIME_STAMP:
1787                 /* FIXME: not implemented */
1788                 rb_advance_reader(cpu_buffer);
1789                 goto again;
1790
1791         case RINGBUF_TYPE_DATA:
1792                 if (ts) {
1793                         *ts = cpu_buffer->read_stamp + event->time_delta;
1794                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1795                 }
1796                 return event;
1797
1798         default:
1799                 BUG();
1800         }
1801
1802         return NULL;
1803 }
1804
1805 static struct ring_buffer_event *
1806 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1807 {
1808         struct ring_buffer *buffer;
1809         struct ring_buffer_per_cpu *cpu_buffer;
1810         struct ring_buffer_event *event;
1811         int nr_loops = 0;
1812
1813         if (ring_buffer_iter_empty(iter))
1814                 return NULL;
1815
1816         cpu_buffer = iter->cpu_buffer;
1817         buffer = cpu_buffer->buffer;
1818
1819  again:
1820         /*
1821          * We repeat when a timestamp is encountered. It is possible
1822          * to get multiple timestamps from an interrupt entering just
1823          * as one timestamp is about to be written. The max times
1824          * that this can happen is the number of nested interrupts we
1825          * can have. Nesting 10 deep of interrupts is clearly
1826          * an anomaly.
1827          */
1828         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
1829                 return NULL;
1830
1831         if (rb_per_cpu_empty(cpu_buffer))
1832                 return NULL;
1833
1834         event = rb_iter_head_event(iter);
1835
1836         switch (event->type) {
1837         case RINGBUF_TYPE_PADDING:
1838                 rb_inc_iter(iter);
1839                 goto again;
1840
1841         case RINGBUF_TYPE_TIME_EXTEND:
1842                 /* Internal data, OK to advance */
1843                 rb_advance_iter(iter);
1844                 goto again;
1845
1846         case RINGBUF_TYPE_TIME_STAMP:
1847                 /* FIXME: not implemented */
1848                 rb_advance_iter(iter);
1849                 goto again;
1850
1851         case RINGBUF_TYPE_DATA:
1852                 if (ts) {
1853                         *ts = iter->read_stamp + event->time_delta;
1854                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1855                 }
1856                 return event;
1857
1858         default:
1859                 BUG();
1860         }
1861
1862         return NULL;
1863 }
1864
1865 /**
1866  * ring_buffer_peek - peek at the next event to be read
1867  * @buffer: The ring buffer to read
1868  * @cpu: The cpu to peak at
1869  * @ts: The timestamp counter of this event.
1870  *
1871  * This will return the event that will be read next, but does
1872  * not consume the data.
1873  */
1874 struct ring_buffer_event *
1875 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1876 {
1877         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1878         struct ring_buffer_event *event;
1879         unsigned long flags;
1880
1881         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1882         event = rb_buffer_peek(buffer, cpu, ts);
1883         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1884
1885         return event;
1886 }
1887
1888 /**
1889  * ring_buffer_iter_peek - peek at the next event to be read
1890  * @iter: The ring buffer iterator
1891  * @ts: The timestamp counter of this event.
1892  *
1893  * This will return the event that will be read next, but does
1894  * not increment the iterator.
1895  */
1896 struct ring_buffer_event *
1897 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1898 {
1899         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1900         struct ring_buffer_event *event;
1901         unsigned long flags;
1902
1903         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1904         event = rb_iter_peek(iter, ts);
1905         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1906
1907         return event;
1908 }
1909
1910 /**
1911  * ring_buffer_consume - return an event and consume it
1912  * @buffer: The ring buffer to get the next event from
1913  *
1914  * Returns the next event in the ring buffer, and that event is consumed.
1915  * Meaning, that sequential reads will keep returning a different event,
1916  * and eventually empty the ring buffer if the producer is slower.
1917  */
1918 struct ring_buffer_event *
1919 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1920 {
1921         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1922         struct ring_buffer_event *event;
1923         unsigned long flags;
1924
1925         if (!cpu_isset(cpu, buffer->cpumask))
1926                 return NULL;
1927
1928         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1929
1930         event = rb_buffer_peek(buffer, cpu, ts);
1931         if (!event)
1932                 goto out;
1933
1934         rb_advance_reader(cpu_buffer);
1935
1936  out:
1937         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1938
1939         return event;
1940 }
1941
1942 /**
1943  * ring_buffer_read_start - start a non consuming read of the buffer
1944  * @buffer: The ring buffer to read from
1945  * @cpu: The cpu buffer to iterate over
1946  *
1947  * This starts up an iteration through the buffer. It also disables
1948  * the recording to the buffer until the reading is finished.
1949  * This prevents the reading from being corrupted. This is not
1950  * a consuming read, so a producer is not expected.
1951  *
1952  * Must be paired with ring_buffer_finish.
1953  */
1954 struct ring_buffer_iter *
1955 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1956 {
1957         struct ring_buffer_per_cpu *cpu_buffer;
1958         struct ring_buffer_iter *iter;
1959         unsigned long flags;
1960
1961         if (!cpu_isset(cpu, buffer->cpumask))
1962                 return NULL;
1963
1964         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1965         if (!iter)
1966                 return NULL;
1967
1968         cpu_buffer = buffer->buffers[cpu];
1969
1970         iter->cpu_buffer = cpu_buffer;
1971
1972         atomic_inc(&cpu_buffer->record_disabled);
1973         synchronize_sched();
1974
1975         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1976         __raw_spin_lock(&cpu_buffer->lock);
1977         rb_iter_reset(iter);
1978         __raw_spin_unlock(&cpu_buffer->lock);
1979         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1980
1981         return iter;
1982 }
1983
1984 /**
1985  * ring_buffer_finish - finish reading the iterator of the buffer
1986  * @iter: The iterator retrieved by ring_buffer_start
1987  *
1988  * This re-enables the recording to the buffer, and frees the
1989  * iterator.
1990  */
1991 void
1992 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1993 {
1994         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1995
1996         atomic_dec(&cpu_buffer->record_disabled);
1997         kfree(iter);
1998 }
1999
2000 /**
2001  * ring_buffer_read - read the next item in the ring buffer by the iterator
2002  * @iter: The ring buffer iterator
2003  * @ts: The time stamp of the event read.
2004  *
2005  * This reads the next event in the ring buffer and increments the iterator.
2006  */
2007 struct ring_buffer_event *
2008 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2009 {
2010         struct ring_buffer_event *event;
2011         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2012         unsigned long flags;
2013
2014         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2015         event = rb_iter_peek(iter, ts);
2016         if (!event)
2017                 goto out;
2018
2019         rb_advance_iter(iter);
2020  out:
2021         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2022
2023         return event;
2024 }
2025
2026 /**
2027  * ring_buffer_size - return the size of the ring buffer (in bytes)
2028  * @buffer: The ring buffer.
2029  */
2030 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2031 {
2032         return BUF_PAGE_SIZE * buffer->pages;
2033 }
2034
2035 static void
2036 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2037 {
2038         cpu_buffer->head_page
2039                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2040         local_set(&cpu_buffer->head_page->write, 0);
2041         local_set(&cpu_buffer->head_page->commit, 0);
2042
2043         cpu_buffer->head_page->read = 0;
2044
2045         cpu_buffer->tail_page = cpu_buffer->head_page;
2046         cpu_buffer->commit_page = cpu_buffer->head_page;
2047
2048         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2049         local_set(&cpu_buffer->reader_page->write, 0);
2050         local_set(&cpu_buffer->reader_page->commit, 0);
2051         cpu_buffer->reader_page->read = 0;
2052
2053         cpu_buffer->overrun = 0;
2054         cpu_buffer->entries = 0;
2055 }
2056
2057 /**
2058  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2059  * @buffer: The ring buffer to reset a per cpu buffer of
2060  * @cpu: The CPU buffer to be reset
2061  */
2062 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2063 {
2064         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2065         unsigned long flags;
2066
2067         if (!cpu_isset(cpu, buffer->cpumask))
2068                 return;
2069
2070         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2071
2072         __raw_spin_lock(&cpu_buffer->lock);
2073
2074         rb_reset_cpu(cpu_buffer);
2075
2076         __raw_spin_unlock(&cpu_buffer->lock);
2077
2078         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2079 }
2080
2081 /**
2082  * ring_buffer_reset - reset a ring buffer
2083  * @buffer: The ring buffer to reset all cpu buffers
2084  */
2085 void ring_buffer_reset(struct ring_buffer *buffer)
2086 {
2087         int cpu;
2088
2089         for_each_buffer_cpu(buffer, cpu)
2090                 ring_buffer_reset_cpu(buffer, cpu);
2091 }
2092
2093 /**
2094  * rind_buffer_empty - is the ring buffer empty?
2095  * @buffer: The ring buffer to test
2096  */
2097 int ring_buffer_empty(struct ring_buffer *buffer)
2098 {
2099         struct ring_buffer_per_cpu *cpu_buffer;
2100         int cpu;
2101
2102         /* yes this is racy, but if you don't like the race, lock the buffer */
2103         for_each_buffer_cpu(buffer, cpu) {
2104                 cpu_buffer = buffer->buffers[cpu];
2105                 if (!rb_per_cpu_empty(cpu_buffer))
2106                         return 0;
2107         }
2108         return 1;
2109 }
2110
2111 /**
2112  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2113  * @buffer: The ring buffer
2114  * @cpu: The CPU buffer to test
2115  */
2116 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2117 {
2118         struct ring_buffer_per_cpu *cpu_buffer;
2119
2120         if (!cpu_isset(cpu, buffer->cpumask))
2121                 return 1;
2122
2123         cpu_buffer = buffer->buffers[cpu];
2124         return rb_per_cpu_empty(cpu_buffer);
2125 }
2126
2127 /**
2128  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2129  * @buffer_a: One buffer to swap with
2130  * @buffer_b: The other buffer to swap with
2131  *
2132  * This function is useful for tracers that want to take a "snapshot"
2133  * of a CPU buffer and has another back up buffer lying around.
2134  * it is expected that the tracer handles the cpu buffer not being
2135  * used at the moment.
2136  */
2137 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2138                          struct ring_buffer *buffer_b, int cpu)
2139 {
2140         struct ring_buffer_per_cpu *cpu_buffer_a;
2141         struct ring_buffer_per_cpu *cpu_buffer_b;
2142
2143         if (!cpu_isset(cpu, buffer_a->cpumask) ||
2144             !cpu_isset(cpu, buffer_b->cpumask))
2145                 return -EINVAL;
2146
2147         /* At least make sure the two buffers are somewhat the same */
2148         if (buffer_a->size != buffer_b->size ||
2149             buffer_a->pages != buffer_b->pages)
2150                 return -EINVAL;
2151
2152         cpu_buffer_a = buffer_a->buffers[cpu];
2153         cpu_buffer_b = buffer_b->buffers[cpu];
2154
2155         /*
2156          * We can't do a synchronize_sched here because this
2157          * function can be called in atomic context.
2158          * Normally this will be called from the same CPU as cpu.
2159          * If not it's up to the caller to protect this.
2160          */
2161         atomic_inc(&cpu_buffer_a->record_disabled);
2162         atomic_inc(&cpu_buffer_b->record_disabled);
2163
2164         buffer_a->buffers[cpu] = cpu_buffer_b;
2165         buffer_b->buffers[cpu] = cpu_buffer_a;
2166
2167         cpu_buffer_b->buffer = buffer_a;
2168         cpu_buffer_a->buffer = buffer_b;
2169
2170         atomic_dec(&cpu_buffer_a->record_disabled);
2171         atomic_dec(&cpu_buffer_b->record_disabled);
2172
2173         return 0;
2174 }
2175
2176 static ssize_t
2177 rb_simple_read(struct file *filp, char __user *ubuf,
2178                size_t cnt, loff_t *ppos)
2179 {
2180         int *p = filp->private_data;
2181         char buf[64];
2182         int r;
2183
2184         /* !ring_buffers_off == tracing_on */
2185         r = sprintf(buf, "%d\n", !*p);
2186
2187         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2188 }
2189
2190 static ssize_t
2191 rb_simple_write(struct file *filp, const char __user *ubuf,
2192                 size_t cnt, loff_t *ppos)
2193 {
2194         int *p = filp->private_data;
2195         char buf[64];
2196         long val;
2197         int ret;
2198
2199         if (cnt >= sizeof(buf))
2200                 return -EINVAL;
2201
2202         if (copy_from_user(&buf, ubuf, cnt))
2203                 return -EFAULT;
2204
2205         buf[cnt] = 0;
2206
2207         ret = strict_strtoul(buf, 10, &val);
2208         if (ret < 0)
2209                 return ret;
2210
2211         /* !ring_buffers_off == tracing_on */
2212         *p = !val;
2213
2214         (*ppos)++;
2215
2216         return cnt;
2217 }
2218
2219 static struct file_operations rb_simple_fops = {
2220         .open           = tracing_open_generic,
2221         .read           = rb_simple_read,
2222         .write          = rb_simple_write,
2223 };
2224
2225
2226 static __init int rb_init_debugfs(void)
2227 {
2228         struct dentry *d_tracer;
2229         struct dentry *entry;
2230
2231         d_tracer = tracing_init_dentry();
2232
2233         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2234                                     &ring_buffers_off, &rb_simple_fops);
2235         if (!entry)
2236                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2237
2238         return 0;
2239 }
2240
2241 fs_initcall(rb_init_debugfs);