Merge branches 'tracing/docs', 'tracing/filters', 'tracing/ftrace', 'tracing/kprobes...
[linux-2.6] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/ftrace_irq.h>
9 #include <linux/spinlock.h>
10 #include <linux/debugfs.h>
11 #include <linux/uaccess.h>
12 #include <linux/hardirq.h>
13 #include <linux/module.h>
14 #include <linux/percpu.h>
15 #include <linux/mutex.h>
16 #include <linux/init.h>
17 #include <linux/hash.h>
18 #include <linux/list.h>
19 #include <linux/cpu.h>
20 #include <linux/fs.h>
21
22 #include "trace.h"
23
24 /*
25  * The ring buffer is made up of a list of pages. A separate list of pages is
26  * allocated for each CPU. A writer may only write to a buffer that is
27  * associated with the CPU it is currently executing on.  A reader may read
28  * from any per cpu buffer.
29  *
30  * The reader is special. For each per cpu buffer, the reader has its own
31  * reader page. When a reader has read the entire reader page, this reader
32  * page is swapped with another page in the ring buffer.
33  *
34  * Now, as long as the writer is off the reader page, the reader can do what
35  * ever it wants with that page. The writer will never write to that page
36  * again (as long as it is out of the ring buffer).
37  *
38  * Here's some silly ASCII art.
39  *
40  *   +------+
41  *   |reader|          RING BUFFER
42  *   |page  |
43  *   +------+        +---+   +---+   +---+
44  *                   |   |-->|   |-->|   |
45  *                   +---+   +---+   +---+
46  *                     ^               |
47  *                     |               |
48  *                     +---------------+
49  *
50  *
51  *   +------+
52  *   |reader|          RING BUFFER
53  *   |page  |------------------v
54  *   +------+        +---+   +---+   +---+
55  *                   |   |-->|   |-->|   |
56  *                   +---+   +---+   +---+
57  *                     ^               |
58  *                     |               |
59  *                     +---------------+
60  *
61  *
62  *   +------+
63  *   |reader|          RING BUFFER
64  *   |page  |------------------v
65  *   +------+        +---+   +---+   +---+
66  *      ^            |   |-->|   |-->|   |
67  *      |            +---+   +---+   +---+
68  *      |                              |
69  *      |                              |
70  *      +------------------------------+
71  *
72  *
73  *   +------+
74  *   |buffer|          RING BUFFER
75  *   |page  |------------------v
76  *   +------+        +---+   +---+   +---+
77  *      ^            |   |   |   |-->|   |
78  *      |   New      +---+   +---+   +---+
79  *      |  Reader------^               |
80  *      |   page                       |
81  *      +------------------------------+
82  *
83  *
84  * After we make this swap, the reader can hand this page off to the splice
85  * code and be done with it. It can even allocate a new page if it needs to
86  * and swap that into the ring buffer.
87  *
88  * We will be using cmpxchg soon to make all this lockless.
89  *
90  */
91
92 /*
93  * A fast way to enable or disable all ring buffers is to
94  * call tracing_on or tracing_off. Turning off the ring buffers
95  * prevents all ring buffers from being recorded to.
96  * Turning this switch on, makes it OK to write to the
97  * ring buffer, if the ring buffer is enabled itself.
98  *
99  * There's three layers that must be on in order to write
100  * to the ring buffer.
101  *
102  * 1) This global flag must be set.
103  * 2) The ring buffer must be enabled for recording.
104  * 3) The per cpu buffer must be enabled for recording.
105  *
106  * In case of an anomaly, this global flag has a bit set that
107  * will permantly disable all ring buffers.
108  */
109
110 /*
111  * Global flag to disable all recording to ring buffers
112  *  This has two bits: ON, DISABLED
113  *
114  *  ON   DISABLED
115  * ---- ----------
116  *   0      0        : ring buffers are off
117  *   1      0        : ring buffers are on
118  *   X      1        : ring buffers are permanently disabled
119  */
120
121 enum {
122         RB_BUFFERS_ON_BIT       = 0,
123         RB_BUFFERS_DISABLED_BIT = 1,
124 };
125
126 enum {
127         RB_BUFFERS_ON           = 1 << RB_BUFFERS_ON_BIT,
128         RB_BUFFERS_DISABLED     = 1 << RB_BUFFERS_DISABLED_BIT,
129 };
130
131 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
132
133 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
134
135 /**
136  * tracing_on - enable all tracing buffers
137  *
138  * This function enables all tracing buffers that may have been
139  * disabled with tracing_off.
140  */
141 void tracing_on(void)
142 {
143         set_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
144 }
145 EXPORT_SYMBOL_GPL(tracing_on);
146
147 /**
148  * tracing_off - turn off all tracing buffers
149  *
150  * This function stops all tracing buffers from recording data.
151  * It does not disable any overhead the tracers themselves may
152  * be causing. This function simply causes all recording to
153  * the ring buffers to fail.
154  */
155 void tracing_off(void)
156 {
157         clear_bit(RB_BUFFERS_ON_BIT, &ring_buffer_flags);
158 }
159 EXPORT_SYMBOL_GPL(tracing_off);
160
161 /**
162  * tracing_off_permanent - permanently disable ring buffers
163  *
164  * This function, once called, will disable all ring buffers
165  * permanently.
166  */
167 void tracing_off_permanent(void)
168 {
169         set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
170 }
171
172 /**
173  * tracing_is_on - show state of ring buffers enabled
174  */
175 int tracing_is_on(void)
176 {
177         return ring_buffer_flags == RB_BUFFERS_ON;
178 }
179 EXPORT_SYMBOL_GPL(tracing_is_on);
180
181 #include "trace.h"
182
183 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
184 #define RB_ALIGNMENT            4U
185 #define RB_MAX_SMALL_DATA       28
186
187 enum {
188         RB_LEN_TIME_EXTEND = 8,
189         RB_LEN_TIME_STAMP = 16,
190 };
191
192 static inline int rb_null_event(struct ring_buffer_event *event)
193 {
194         return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
195 }
196
197 static inline int rb_discarded_event(struct ring_buffer_event *event)
198 {
199         return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
200 }
201
202 static void rb_event_set_padding(struct ring_buffer_event *event)
203 {
204         event->type = RINGBUF_TYPE_PADDING;
205         event->time_delta = 0;
206 }
207
208 /**
209  * ring_buffer_event_discard - discard an event in the ring buffer
210  * @buffer: the ring buffer
211  * @event: the event to discard
212  *
213  * Sometimes a event that is in the ring buffer needs to be ignored.
214  * This function lets the user discard an event in the ring buffer
215  * and then that event will not be read later.
216  *
217  * Note, it is up to the user to be careful with this, and protect
218  * against races. If the user discards an event that has been consumed
219  * it is possible that it could corrupt the ring buffer.
220  */
221 void ring_buffer_event_discard(struct ring_buffer_event *event)
222 {
223         event->type = RINGBUF_TYPE_PADDING;
224         /* time delta must be non zero */
225         if (!event->time_delta)
226                 event->time_delta = 1;
227 }
228
229 static unsigned
230 rb_event_data_length(struct ring_buffer_event *event)
231 {
232         unsigned length;
233
234         if (event->len)
235                 length = event->len * RB_ALIGNMENT;
236         else
237                 length = event->array[0];
238         return length + RB_EVNT_HDR_SIZE;
239 }
240
241 /* inline for ring buffer fast paths */
242 static unsigned
243 rb_event_length(struct ring_buffer_event *event)
244 {
245         switch (event->type) {
246         case RINGBUF_TYPE_PADDING:
247                 if (rb_null_event(event))
248                         /* undefined */
249                         return -1;
250                 return rb_event_data_length(event);
251
252         case RINGBUF_TYPE_TIME_EXTEND:
253                 return RB_LEN_TIME_EXTEND;
254
255         case RINGBUF_TYPE_TIME_STAMP:
256                 return RB_LEN_TIME_STAMP;
257
258         case RINGBUF_TYPE_DATA:
259                 return rb_event_data_length(event);
260         default:
261                 BUG();
262         }
263         /* not hit */
264         return 0;
265 }
266
267 /**
268  * ring_buffer_event_length - return the length of the event
269  * @event: the event to get the length of
270  */
271 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
272 {
273         unsigned length = rb_event_length(event);
274         if (event->type != RINGBUF_TYPE_DATA)
275                 return length;
276         length -= RB_EVNT_HDR_SIZE;
277         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
278                 length -= sizeof(event->array[0]);
279         return length;
280 }
281 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
282
283 /* inline for ring buffer fast paths */
284 static void *
285 rb_event_data(struct ring_buffer_event *event)
286 {
287         BUG_ON(event->type != RINGBUF_TYPE_DATA);
288         /* If length is in len field, then array[0] has the data */
289         if (event->len)
290                 return (void *)&event->array[0];
291         /* Otherwise length is in array[0] and array[1] has the data */
292         return (void *)&event->array[1];
293 }
294
295 /**
296  * ring_buffer_event_data - return the data of the event
297  * @event: the event to get the data from
298  */
299 void *ring_buffer_event_data(struct ring_buffer_event *event)
300 {
301         return rb_event_data(event);
302 }
303 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
304
305 #define for_each_buffer_cpu(buffer, cpu)                \
306         for_each_cpu(cpu, buffer->cpumask)
307
308 #define TS_SHIFT        27
309 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
310 #define TS_DELTA_TEST   (~TS_MASK)
311
312 struct buffer_data_page {
313         u64              time_stamp;    /* page time stamp */
314         local_t          commit;        /* write committed index */
315         unsigned char    data[];        /* data of buffer page */
316 };
317
318 struct buffer_page {
319         local_t          write;         /* index for next write */
320         unsigned         read;          /* index for next read */
321         struct list_head list;          /* list of free pages */
322         struct buffer_data_page *page;  /* Actual data page */
323 };
324
325 static void rb_init_page(struct buffer_data_page *bpage)
326 {
327         local_set(&bpage->commit, 0);
328 }
329
330 /**
331  * ring_buffer_page_len - the size of data on the page.
332  * @page: The page to read
333  *
334  * Returns the amount of data on the page, including buffer page header.
335  */
336 size_t ring_buffer_page_len(void *page)
337 {
338         return local_read(&((struct buffer_data_page *)page)->commit)
339                 + BUF_PAGE_HDR_SIZE;
340 }
341
342 /*
343  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
344  * this issue out.
345  */
346 static void free_buffer_page(struct buffer_page *bpage)
347 {
348         free_page((unsigned long)bpage->page);
349         kfree(bpage);
350 }
351
352 /*
353  * We need to fit the time_stamp delta into 27 bits.
354  */
355 static inline int test_time_stamp(u64 delta)
356 {
357         if (delta & TS_DELTA_TEST)
358                 return 1;
359         return 0;
360 }
361
362 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
363
364 /*
365  * head_page == tail_page && head == tail then buffer is empty.
366  */
367 struct ring_buffer_per_cpu {
368         int                             cpu;
369         struct ring_buffer              *buffer;
370         spinlock_t                      reader_lock; /* serialize readers */
371         raw_spinlock_t                  lock;
372         struct lock_class_key           lock_key;
373         struct list_head                pages;
374         struct buffer_page              *head_page;     /* read from head */
375         struct buffer_page              *tail_page;     /* write to tail */
376         struct buffer_page              *commit_page;   /* committed pages */
377         struct buffer_page              *reader_page;
378         unsigned long                   overrun;
379         unsigned long                   entries;
380         u64                             write_stamp;
381         u64                             read_stamp;
382         atomic_t                        record_disabled;
383 };
384
385 struct ring_buffer {
386         unsigned                        pages;
387         unsigned                        flags;
388         int                             cpus;
389         atomic_t                        record_disabled;
390         cpumask_var_t                   cpumask;
391
392         struct mutex                    mutex;
393
394         struct ring_buffer_per_cpu      **buffers;
395
396 #ifdef CONFIG_HOTPLUG_CPU
397         struct notifier_block           cpu_notify;
398 #endif
399         u64                             (*clock)(void);
400 };
401
402 struct ring_buffer_iter {
403         struct ring_buffer_per_cpu      *cpu_buffer;
404         unsigned long                   head;
405         struct buffer_page              *head_page;
406         u64                             read_stamp;
407 };
408
409 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
410 #define RB_WARN_ON(buffer, cond)                                \
411         ({                                                      \
412                 int _____ret = unlikely(cond);                  \
413                 if (_____ret) {                                 \
414                         atomic_inc(&buffer->record_disabled);   \
415                         WARN_ON(1);                             \
416                 }                                               \
417                 _____ret;                                       \
418         })
419
420 /* Up this if you want to test the TIME_EXTENTS and normalization */
421 #define DEBUG_SHIFT 0
422
423 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
424 {
425         u64 time;
426
427         preempt_disable_notrace();
428         /* shift to debug/test normalization and TIME_EXTENTS */
429         time = buffer->clock() << DEBUG_SHIFT;
430         preempt_enable_no_resched_notrace();
431
432         return time;
433 }
434 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
435
436 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
437                                       int cpu, u64 *ts)
438 {
439         /* Just stupid testing the normalize function and deltas */
440         *ts >>= DEBUG_SHIFT;
441 }
442 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
443
444 /**
445  * check_pages - integrity check of buffer pages
446  * @cpu_buffer: CPU buffer with pages to test
447  *
448  * As a safety measure we check to make sure the data pages have not
449  * been corrupted.
450  */
451 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
452 {
453         struct list_head *head = &cpu_buffer->pages;
454         struct buffer_page *bpage, *tmp;
455
456         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
457                 return -1;
458         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
459                 return -1;
460
461         list_for_each_entry_safe(bpage, tmp, head, list) {
462                 if (RB_WARN_ON(cpu_buffer,
463                                bpage->list.next->prev != &bpage->list))
464                         return -1;
465                 if (RB_WARN_ON(cpu_buffer,
466                                bpage->list.prev->next != &bpage->list))
467                         return -1;
468         }
469
470         return 0;
471 }
472
473 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
474                              unsigned nr_pages)
475 {
476         struct list_head *head = &cpu_buffer->pages;
477         struct buffer_page *bpage, *tmp;
478         unsigned long addr;
479         LIST_HEAD(pages);
480         unsigned i;
481
482         for (i = 0; i < nr_pages; i++) {
483                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
484                                     GFP_KERNEL, cpu_to_node(cpu_buffer->cpu));
485                 if (!bpage)
486                         goto free_pages;
487                 list_add(&bpage->list, &pages);
488
489                 addr = __get_free_page(GFP_KERNEL);
490                 if (!addr)
491                         goto free_pages;
492                 bpage->page = (void *)addr;
493                 rb_init_page(bpage->page);
494         }
495
496         list_splice(&pages, head);
497
498         rb_check_pages(cpu_buffer);
499
500         return 0;
501
502  free_pages:
503         list_for_each_entry_safe(bpage, tmp, &pages, list) {
504                 list_del_init(&bpage->list);
505                 free_buffer_page(bpage);
506         }
507         return -ENOMEM;
508 }
509
510 static struct ring_buffer_per_cpu *
511 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
512 {
513         struct ring_buffer_per_cpu *cpu_buffer;
514         struct buffer_page *bpage;
515         unsigned long addr;
516         int ret;
517
518         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
519                                   GFP_KERNEL, cpu_to_node(cpu));
520         if (!cpu_buffer)
521                 return NULL;
522
523         cpu_buffer->cpu = cpu;
524         cpu_buffer->buffer = buffer;
525         spin_lock_init(&cpu_buffer->reader_lock);
526         cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
527         INIT_LIST_HEAD(&cpu_buffer->pages);
528
529         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
530                             GFP_KERNEL, cpu_to_node(cpu));
531         if (!bpage)
532                 goto fail_free_buffer;
533
534         cpu_buffer->reader_page = bpage;
535         addr = __get_free_page(GFP_KERNEL);
536         if (!addr)
537                 goto fail_free_reader;
538         bpage->page = (void *)addr;
539         rb_init_page(bpage->page);
540
541         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
542
543         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
544         if (ret < 0)
545                 goto fail_free_reader;
546
547         cpu_buffer->head_page
548                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
549         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
550
551         return cpu_buffer;
552
553  fail_free_reader:
554         free_buffer_page(cpu_buffer->reader_page);
555
556  fail_free_buffer:
557         kfree(cpu_buffer);
558         return NULL;
559 }
560
561 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
562 {
563         struct list_head *head = &cpu_buffer->pages;
564         struct buffer_page *bpage, *tmp;
565
566         list_del_init(&cpu_buffer->reader_page->list);
567         free_buffer_page(cpu_buffer->reader_page);
568
569         list_for_each_entry_safe(bpage, tmp, head, list) {
570                 list_del_init(&bpage->list);
571                 free_buffer_page(bpage);
572         }
573         kfree(cpu_buffer);
574 }
575
576 /*
577  * Causes compile errors if the struct buffer_page gets bigger
578  * than the struct page.
579  */
580 extern int ring_buffer_page_too_big(void);
581
582 #ifdef CONFIG_HOTPLUG_CPU
583 static int rb_cpu_notify(struct notifier_block *self,
584                          unsigned long action, void *hcpu);
585 #endif
586
587 /**
588  * ring_buffer_alloc - allocate a new ring_buffer
589  * @size: the size in bytes per cpu that is needed.
590  * @flags: attributes to set for the ring buffer.
591  *
592  * Currently the only flag that is available is the RB_FL_OVERWRITE
593  * flag. This flag means that the buffer will overwrite old data
594  * when the buffer wraps. If this flag is not set, the buffer will
595  * drop data when the tail hits the head.
596  */
597 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
598 {
599         struct ring_buffer *buffer;
600         int bsize;
601         int cpu;
602
603         /* Paranoid! Optimizes out when all is well */
604         if (sizeof(struct buffer_page) > sizeof(struct page))
605                 ring_buffer_page_too_big();
606
607
608         /* keep it in its own cache line */
609         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
610                          GFP_KERNEL);
611         if (!buffer)
612                 return NULL;
613
614         if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
615                 goto fail_free_buffer;
616
617         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
618         buffer->flags = flags;
619         buffer->clock = trace_clock_local;
620
621         /* need at least two pages */
622         if (buffer->pages == 1)
623                 buffer->pages++;
624
625         /*
626          * In case of non-hotplug cpu, if the ring-buffer is allocated
627          * in early initcall, it will not be notified of secondary cpus.
628          * In that off case, we need to allocate for all possible cpus.
629          */
630 #ifdef CONFIG_HOTPLUG_CPU
631         get_online_cpus();
632         cpumask_copy(buffer->cpumask, cpu_online_mask);
633 #else
634         cpumask_copy(buffer->cpumask, cpu_possible_mask);
635 #endif
636         buffer->cpus = nr_cpu_ids;
637
638         bsize = sizeof(void *) * nr_cpu_ids;
639         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
640                                   GFP_KERNEL);
641         if (!buffer->buffers)
642                 goto fail_free_cpumask;
643
644         for_each_buffer_cpu(buffer, cpu) {
645                 buffer->buffers[cpu] =
646                         rb_allocate_cpu_buffer(buffer, cpu);
647                 if (!buffer->buffers[cpu])
648                         goto fail_free_buffers;
649         }
650
651 #ifdef CONFIG_HOTPLUG_CPU
652         buffer->cpu_notify.notifier_call = rb_cpu_notify;
653         buffer->cpu_notify.priority = 0;
654         register_cpu_notifier(&buffer->cpu_notify);
655 #endif
656
657         put_online_cpus();
658         mutex_init(&buffer->mutex);
659
660         return buffer;
661
662  fail_free_buffers:
663         for_each_buffer_cpu(buffer, cpu) {
664                 if (buffer->buffers[cpu])
665                         rb_free_cpu_buffer(buffer->buffers[cpu]);
666         }
667         kfree(buffer->buffers);
668
669  fail_free_cpumask:
670         free_cpumask_var(buffer->cpumask);
671         put_online_cpus();
672
673  fail_free_buffer:
674         kfree(buffer);
675         return NULL;
676 }
677 EXPORT_SYMBOL_GPL(ring_buffer_alloc);
678
679 /**
680  * ring_buffer_free - free a ring buffer.
681  * @buffer: the buffer to free.
682  */
683 void
684 ring_buffer_free(struct ring_buffer *buffer)
685 {
686         int cpu;
687
688         get_online_cpus();
689
690 #ifdef CONFIG_HOTPLUG_CPU
691         unregister_cpu_notifier(&buffer->cpu_notify);
692 #endif
693
694         for_each_buffer_cpu(buffer, cpu)
695                 rb_free_cpu_buffer(buffer->buffers[cpu]);
696
697         put_online_cpus();
698
699         free_cpumask_var(buffer->cpumask);
700
701         kfree(buffer);
702 }
703 EXPORT_SYMBOL_GPL(ring_buffer_free);
704
705 void ring_buffer_set_clock(struct ring_buffer *buffer,
706                            u64 (*clock)(void))
707 {
708         buffer->clock = clock;
709 }
710
711 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
712
713 static void
714 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
715 {
716         struct buffer_page *bpage;
717         struct list_head *p;
718         unsigned i;
719
720         atomic_inc(&cpu_buffer->record_disabled);
721         synchronize_sched();
722
723         for (i = 0; i < nr_pages; i++) {
724                 if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
725                         return;
726                 p = cpu_buffer->pages.next;
727                 bpage = list_entry(p, struct buffer_page, list);
728                 list_del_init(&bpage->list);
729                 free_buffer_page(bpage);
730         }
731         if (RB_WARN_ON(cpu_buffer, list_empty(&cpu_buffer->pages)))
732                 return;
733
734         rb_reset_cpu(cpu_buffer);
735
736         rb_check_pages(cpu_buffer);
737
738         atomic_dec(&cpu_buffer->record_disabled);
739
740 }
741
742 static void
743 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
744                 struct list_head *pages, unsigned nr_pages)
745 {
746         struct buffer_page *bpage;
747         struct list_head *p;
748         unsigned i;
749
750         atomic_inc(&cpu_buffer->record_disabled);
751         synchronize_sched();
752
753         for (i = 0; i < nr_pages; i++) {
754                 if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
755                         return;
756                 p = pages->next;
757                 bpage = list_entry(p, struct buffer_page, list);
758                 list_del_init(&bpage->list);
759                 list_add_tail(&bpage->list, &cpu_buffer->pages);
760         }
761         rb_reset_cpu(cpu_buffer);
762
763         rb_check_pages(cpu_buffer);
764
765         atomic_dec(&cpu_buffer->record_disabled);
766 }
767
768 /**
769  * ring_buffer_resize - resize the ring buffer
770  * @buffer: the buffer to resize.
771  * @size: the new size.
772  *
773  * The tracer is responsible for making sure that the buffer is
774  * not being used while changing the size.
775  * Note: We may be able to change the above requirement by using
776  *  RCU synchronizations.
777  *
778  * Minimum size is 2 * BUF_PAGE_SIZE.
779  *
780  * Returns -1 on failure.
781  */
782 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
783 {
784         struct ring_buffer_per_cpu *cpu_buffer;
785         unsigned nr_pages, rm_pages, new_pages;
786         struct buffer_page *bpage, *tmp;
787         unsigned long buffer_size;
788         unsigned long addr;
789         LIST_HEAD(pages);
790         int i, cpu;
791
792         /*
793          * Always succeed at resizing a non-existent buffer:
794          */
795         if (!buffer)
796                 return size;
797
798         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
799         size *= BUF_PAGE_SIZE;
800         buffer_size = buffer->pages * BUF_PAGE_SIZE;
801
802         /* we need a minimum of two pages */
803         if (size < BUF_PAGE_SIZE * 2)
804                 size = BUF_PAGE_SIZE * 2;
805
806         if (size == buffer_size)
807                 return size;
808
809         mutex_lock(&buffer->mutex);
810         get_online_cpus();
811
812         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
813
814         if (size < buffer_size) {
815
816                 /* easy case, just free pages */
817                 if (RB_WARN_ON(buffer, nr_pages >= buffer->pages))
818                         goto out_fail;
819
820                 rm_pages = buffer->pages - nr_pages;
821
822                 for_each_buffer_cpu(buffer, cpu) {
823                         cpu_buffer = buffer->buffers[cpu];
824                         rb_remove_pages(cpu_buffer, rm_pages);
825                 }
826                 goto out;
827         }
828
829         /*
830          * This is a bit more difficult. We only want to add pages
831          * when we can allocate enough for all CPUs. We do this
832          * by allocating all the pages and storing them on a local
833          * link list. If we succeed in our allocation, then we
834          * add these pages to the cpu_buffers. Otherwise we just free
835          * them all and return -ENOMEM;
836          */
837         if (RB_WARN_ON(buffer, nr_pages <= buffer->pages))
838                 goto out_fail;
839
840         new_pages = nr_pages - buffer->pages;
841
842         for_each_buffer_cpu(buffer, cpu) {
843                 for (i = 0; i < new_pages; i++) {
844                         bpage = kzalloc_node(ALIGN(sizeof(*bpage),
845                                                   cache_line_size()),
846                                             GFP_KERNEL, cpu_to_node(cpu));
847                         if (!bpage)
848                                 goto free_pages;
849                         list_add(&bpage->list, &pages);
850                         addr = __get_free_page(GFP_KERNEL);
851                         if (!addr)
852                                 goto free_pages;
853                         bpage->page = (void *)addr;
854                         rb_init_page(bpage->page);
855                 }
856         }
857
858         for_each_buffer_cpu(buffer, cpu) {
859                 cpu_buffer = buffer->buffers[cpu];
860                 rb_insert_pages(cpu_buffer, &pages, new_pages);
861         }
862
863         if (RB_WARN_ON(buffer, !list_empty(&pages)))
864                 goto out_fail;
865
866  out:
867         buffer->pages = nr_pages;
868         put_online_cpus();
869         mutex_unlock(&buffer->mutex);
870
871         return size;
872
873  free_pages:
874         list_for_each_entry_safe(bpage, tmp, &pages, list) {
875                 list_del_init(&bpage->list);
876                 free_buffer_page(bpage);
877         }
878         put_online_cpus();
879         mutex_unlock(&buffer->mutex);
880         return -ENOMEM;
881
882         /*
883          * Something went totally wrong, and we are too paranoid
884          * to even clean up the mess.
885          */
886  out_fail:
887         put_online_cpus();
888         mutex_unlock(&buffer->mutex);
889         return -1;
890 }
891 EXPORT_SYMBOL_GPL(ring_buffer_resize);
892
893 static inline void *
894 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
895 {
896         return bpage->data + index;
897 }
898
899 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
900 {
901         return bpage->page->data + index;
902 }
903
904 static inline struct ring_buffer_event *
905 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
906 {
907         return __rb_page_index(cpu_buffer->reader_page,
908                                cpu_buffer->reader_page->read);
909 }
910
911 static inline struct ring_buffer_event *
912 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
913 {
914         return __rb_page_index(cpu_buffer->head_page,
915                                cpu_buffer->head_page->read);
916 }
917
918 static inline struct ring_buffer_event *
919 rb_iter_head_event(struct ring_buffer_iter *iter)
920 {
921         return __rb_page_index(iter->head_page, iter->head);
922 }
923
924 static inline unsigned rb_page_write(struct buffer_page *bpage)
925 {
926         return local_read(&bpage->write);
927 }
928
929 static inline unsigned rb_page_commit(struct buffer_page *bpage)
930 {
931         return local_read(&bpage->page->commit);
932 }
933
934 /* Size is determined by what has been commited */
935 static inline unsigned rb_page_size(struct buffer_page *bpage)
936 {
937         return rb_page_commit(bpage);
938 }
939
940 static inline unsigned
941 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
942 {
943         return rb_page_commit(cpu_buffer->commit_page);
944 }
945
946 static inline unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
947 {
948         return rb_page_commit(cpu_buffer->head_page);
949 }
950
951 /*
952  * When the tail hits the head and the buffer is in overwrite mode,
953  * the head jumps to the next page and all content on the previous
954  * page is discarded. But before doing so, we update the overrun
955  * variable of the buffer.
956  */
957 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
958 {
959         struct ring_buffer_event *event;
960         unsigned long head;
961
962         for (head = 0; head < rb_head_size(cpu_buffer);
963              head += rb_event_length(event)) {
964
965                 event = __rb_page_index(cpu_buffer->head_page, head);
966                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
967                         return;
968                 /* Only count data entries */
969                 if (event->type != RINGBUF_TYPE_DATA)
970                         continue;
971                 cpu_buffer->overrun++;
972                 cpu_buffer->entries--;
973         }
974 }
975
976 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
977                                struct buffer_page **bpage)
978 {
979         struct list_head *p = (*bpage)->list.next;
980
981         if (p == &cpu_buffer->pages)
982                 p = p->next;
983
984         *bpage = list_entry(p, struct buffer_page, list);
985 }
986
987 static inline unsigned
988 rb_event_index(struct ring_buffer_event *event)
989 {
990         unsigned long addr = (unsigned long)event;
991
992         return (addr & ~PAGE_MASK) - (PAGE_SIZE - BUF_PAGE_SIZE);
993 }
994
995 static int
996 rb_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
997              struct ring_buffer_event *event)
998 {
999         unsigned long addr = (unsigned long)event;
1000         unsigned long index;
1001
1002         index = rb_event_index(event);
1003         addr &= PAGE_MASK;
1004
1005         return cpu_buffer->commit_page->page == (void *)addr &&
1006                 rb_commit_index(cpu_buffer) == index;
1007 }
1008
1009 static void
1010 rb_set_commit_event(struct ring_buffer_per_cpu *cpu_buffer,
1011                     struct ring_buffer_event *event)
1012 {
1013         unsigned long addr = (unsigned long)event;
1014         unsigned long index;
1015
1016         index = rb_event_index(event);
1017         addr &= PAGE_MASK;
1018
1019         while (cpu_buffer->commit_page->page != (void *)addr) {
1020                 if (RB_WARN_ON(cpu_buffer,
1021                           cpu_buffer->commit_page == cpu_buffer->tail_page))
1022                         return;
1023                 cpu_buffer->commit_page->page->commit =
1024                         cpu_buffer->commit_page->write;
1025                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1026                 cpu_buffer->write_stamp =
1027                         cpu_buffer->commit_page->page->time_stamp;
1028         }
1029
1030         /* Now set the commit to the event's index */
1031         local_set(&cpu_buffer->commit_page->page->commit, index);
1032 }
1033
1034 static void
1035 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1036 {
1037         /*
1038          * We only race with interrupts and NMIs on this CPU.
1039          * If we own the commit event, then we can commit
1040          * all others that interrupted us, since the interruptions
1041          * are in stack format (they finish before they come
1042          * back to us). This allows us to do a simple loop to
1043          * assign the commit to the tail.
1044          */
1045  again:
1046         while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1047                 cpu_buffer->commit_page->page->commit =
1048                         cpu_buffer->commit_page->write;
1049                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1050                 cpu_buffer->write_stamp =
1051                         cpu_buffer->commit_page->page->time_stamp;
1052                 /* add barrier to keep gcc from optimizing too much */
1053                 barrier();
1054         }
1055         while (rb_commit_index(cpu_buffer) !=
1056                rb_page_write(cpu_buffer->commit_page)) {
1057                 cpu_buffer->commit_page->page->commit =
1058                         cpu_buffer->commit_page->write;
1059                 barrier();
1060         }
1061
1062         /* again, keep gcc from optimizing */
1063         barrier();
1064
1065         /*
1066          * If an interrupt came in just after the first while loop
1067          * and pushed the tail page forward, we will be left with
1068          * a dangling commit that will never go forward.
1069          */
1070         if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
1071                 goto again;
1072 }
1073
1074 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1075 {
1076         cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
1077         cpu_buffer->reader_page->read = 0;
1078 }
1079
1080 static void rb_inc_iter(struct ring_buffer_iter *iter)
1081 {
1082         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1083
1084         /*
1085          * The iterator could be on the reader page (it starts there).
1086          * But the head could have moved, since the reader was
1087          * found. Check for this case and assign the iterator
1088          * to the head page instead of next.
1089          */
1090         if (iter->head_page == cpu_buffer->reader_page)
1091                 iter->head_page = cpu_buffer->head_page;
1092         else
1093                 rb_inc_page(cpu_buffer, &iter->head_page);
1094
1095         iter->read_stamp = iter->head_page->page->time_stamp;
1096         iter->head = 0;
1097 }
1098
1099 /**
1100  * ring_buffer_update_event - update event type and data
1101  * @event: the even to update
1102  * @type: the type of event
1103  * @length: the size of the event field in the ring buffer
1104  *
1105  * Update the type and data fields of the event. The length
1106  * is the actual size that is written to the ring buffer,
1107  * and with this, we can determine what to place into the
1108  * data field.
1109  */
1110 static void
1111 rb_update_event(struct ring_buffer_event *event,
1112                          unsigned type, unsigned length)
1113 {
1114         event->type = type;
1115
1116         switch (type) {
1117
1118         case RINGBUF_TYPE_PADDING:
1119                 break;
1120
1121         case RINGBUF_TYPE_TIME_EXTEND:
1122                 event->len = DIV_ROUND_UP(RB_LEN_TIME_EXTEND, RB_ALIGNMENT);
1123                 break;
1124
1125         case RINGBUF_TYPE_TIME_STAMP:
1126                 event->len = DIV_ROUND_UP(RB_LEN_TIME_STAMP, RB_ALIGNMENT);
1127                 break;
1128
1129         case RINGBUF_TYPE_DATA:
1130                 length -= RB_EVNT_HDR_SIZE;
1131                 if (length > RB_MAX_SMALL_DATA) {
1132                         event->len = 0;
1133                         event->array[0] = length;
1134                 } else
1135                         event->len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1136                 break;
1137         default:
1138                 BUG();
1139         }
1140 }
1141
1142 static unsigned rb_calculate_event_length(unsigned length)
1143 {
1144         struct ring_buffer_event event; /* Used only for sizeof array */
1145
1146         /* zero length can cause confusions */
1147         if (!length)
1148                 length = 1;
1149
1150         if (length > RB_MAX_SMALL_DATA)
1151                 length += sizeof(event.array[0]);
1152
1153         length += RB_EVNT_HDR_SIZE;
1154         length = ALIGN(length, RB_ALIGNMENT);
1155
1156         return length;
1157 }
1158
1159 static struct ring_buffer_event *
1160 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
1161                   unsigned type, unsigned long length, u64 *ts)
1162 {
1163         struct buffer_page *tail_page, *head_page, *reader_page, *commit_page;
1164         unsigned long tail, write;
1165         struct ring_buffer *buffer = cpu_buffer->buffer;
1166         struct ring_buffer_event *event;
1167         unsigned long flags;
1168         bool lock_taken = false;
1169
1170         commit_page = cpu_buffer->commit_page;
1171         /* we just need to protect against interrupts */
1172         barrier();
1173         tail_page = cpu_buffer->tail_page;
1174         write = local_add_return(length, &tail_page->write);
1175         tail = write - length;
1176
1177         /* See if we shot pass the end of this buffer page */
1178         if (write > BUF_PAGE_SIZE) {
1179                 struct buffer_page *next_page = tail_page;
1180
1181                 local_irq_save(flags);
1182                 /*
1183                  * Since the write to the buffer is still not
1184                  * fully lockless, we must be careful with NMIs.
1185                  * The locks in the writers are taken when a write
1186                  * crosses to a new page. The locks protect against
1187                  * races with the readers (this will soon be fixed
1188                  * with a lockless solution).
1189                  *
1190                  * Because we can not protect against NMIs, and we
1191                  * want to keep traces reentrant, we need to manage
1192                  * what happens when we are in an NMI.
1193                  *
1194                  * NMIs can happen after we take the lock.
1195                  * If we are in an NMI, only take the lock
1196                  * if it is not already taken. Otherwise
1197                  * simply fail.
1198                  */
1199                 if (unlikely(in_nmi())) {
1200                         if (!__raw_spin_trylock(&cpu_buffer->lock))
1201                                 goto out_reset;
1202                 } else
1203                         __raw_spin_lock(&cpu_buffer->lock);
1204
1205                 lock_taken = true;
1206
1207                 rb_inc_page(cpu_buffer, &next_page);
1208
1209                 head_page = cpu_buffer->head_page;
1210                 reader_page = cpu_buffer->reader_page;
1211
1212                 /* we grabbed the lock before incrementing */
1213                 if (RB_WARN_ON(cpu_buffer, next_page == reader_page))
1214                         goto out_reset;
1215
1216                 /*
1217                  * If for some reason, we had an interrupt storm that made
1218                  * it all the way around the buffer, bail, and warn
1219                  * about it.
1220                  */
1221                 if (unlikely(next_page == commit_page)) {
1222                         WARN_ON_ONCE(1);
1223                         goto out_reset;
1224                 }
1225
1226                 if (next_page == head_page) {
1227                         if (!(buffer->flags & RB_FL_OVERWRITE))
1228                                 goto out_reset;
1229
1230                         /* tail_page has not moved yet? */
1231                         if (tail_page == cpu_buffer->tail_page) {
1232                                 /* count overflows */
1233                                 rb_update_overflow(cpu_buffer);
1234
1235                                 rb_inc_page(cpu_buffer, &head_page);
1236                                 cpu_buffer->head_page = head_page;
1237                                 cpu_buffer->head_page->read = 0;
1238                         }
1239                 }
1240
1241                 /*
1242                  * If the tail page is still the same as what we think
1243                  * it is, then it is up to us to update the tail
1244                  * pointer.
1245                  */
1246                 if (tail_page == cpu_buffer->tail_page) {
1247                         local_set(&next_page->write, 0);
1248                         local_set(&next_page->page->commit, 0);
1249                         cpu_buffer->tail_page = next_page;
1250
1251                         /* reread the time stamp */
1252                         *ts = ring_buffer_time_stamp(buffer, cpu_buffer->cpu);
1253                         cpu_buffer->tail_page->page->time_stamp = *ts;
1254                 }
1255
1256                 /*
1257                  * The actual tail page has moved forward.
1258                  */
1259                 if (tail < BUF_PAGE_SIZE) {
1260                         /* Mark the rest of the page with padding */
1261                         event = __rb_page_index(tail_page, tail);
1262                         rb_event_set_padding(event);
1263                 }
1264
1265                 if (tail <= BUF_PAGE_SIZE)
1266                         /* Set the write back to the previous setting */
1267                         local_set(&tail_page->write, tail);
1268
1269                 /*
1270                  * If this was a commit entry that failed,
1271                  * increment that too
1272                  */
1273                 if (tail_page == cpu_buffer->commit_page &&
1274                     tail == rb_commit_index(cpu_buffer)) {
1275                         rb_set_commit_to_write(cpu_buffer);
1276                 }
1277
1278                 __raw_spin_unlock(&cpu_buffer->lock);
1279                 local_irq_restore(flags);
1280
1281                 /* fail and let the caller try again */
1282                 return ERR_PTR(-EAGAIN);
1283         }
1284
1285         /* We reserved something on the buffer */
1286
1287         if (RB_WARN_ON(cpu_buffer, write > BUF_PAGE_SIZE))
1288                 return NULL;
1289
1290         event = __rb_page_index(tail_page, tail);
1291         rb_update_event(event, type, length);
1292
1293         /*
1294          * If this is a commit and the tail is zero, then update
1295          * this page's time stamp.
1296          */
1297         if (!tail && rb_is_commit(cpu_buffer, event))
1298                 cpu_buffer->commit_page->page->time_stamp = *ts;
1299
1300         return event;
1301
1302  out_reset:
1303         /* reset write */
1304         if (tail <= BUF_PAGE_SIZE)
1305                 local_set(&tail_page->write, tail);
1306
1307         if (likely(lock_taken))
1308                 __raw_spin_unlock(&cpu_buffer->lock);
1309         local_irq_restore(flags);
1310         return NULL;
1311 }
1312
1313 static int
1314 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1315                   u64 *ts, u64 *delta)
1316 {
1317         struct ring_buffer_event *event;
1318         static int once;
1319         int ret;
1320
1321         if (unlikely(*delta > (1ULL << 59) && !once++)) {
1322                 printk(KERN_WARNING "Delta way too big! %llu"
1323                        " ts=%llu write stamp = %llu\n",
1324                        (unsigned long long)*delta,
1325                        (unsigned long long)*ts,
1326                        (unsigned long long)cpu_buffer->write_stamp);
1327                 WARN_ON(1);
1328         }
1329
1330         /*
1331          * The delta is too big, we to add a
1332          * new timestamp.
1333          */
1334         event = __rb_reserve_next(cpu_buffer,
1335                                   RINGBUF_TYPE_TIME_EXTEND,
1336                                   RB_LEN_TIME_EXTEND,
1337                                   ts);
1338         if (!event)
1339                 return -EBUSY;
1340
1341         if (PTR_ERR(event) == -EAGAIN)
1342                 return -EAGAIN;
1343
1344         /* Only a commited time event can update the write stamp */
1345         if (rb_is_commit(cpu_buffer, event)) {
1346                 /*
1347                  * If this is the first on the page, then we need to
1348                  * update the page itself, and just put in a zero.
1349                  */
1350                 if (rb_event_index(event)) {
1351                         event->time_delta = *delta & TS_MASK;
1352                         event->array[0] = *delta >> TS_SHIFT;
1353                 } else {
1354                         cpu_buffer->commit_page->page->time_stamp = *ts;
1355                         event->time_delta = 0;
1356                         event->array[0] = 0;
1357                 }
1358                 cpu_buffer->write_stamp = *ts;
1359                 /* let the caller know this was the commit */
1360                 ret = 1;
1361         } else {
1362                 /* Darn, this is just wasted space */
1363                 event->time_delta = 0;
1364                 event->array[0] = 0;
1365                 ret = 0;
1366         }
1367
1368         *delta = 0;
1369
1370         return ret;
1371 }
1372
1373 static struct ring_buffer_event *
1374 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
1375                       unsigned type, unsigned long length)
1376 {
1377         struct ring_buffer_event *event;
1378         u64 ts, delta;
1379         int commit = 0;
1380         int nr_loops = 0;
1381
1382  again:
1383         /*
1384          * We allow for interrupts to reenter here and do a trace.
1385          * If one does, it will cause this original code to loop
1386          * back here. Even with heavy interrupts happening, this
1387          * should only happen a few times in a row. If this happens
1388          * 1000 times in a row, there must be either an interrupt
1389          * storm or we have something buggy.
1390          * Bail!
1391          */
1392         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
1393                 return NULL;
1394
1395         ts = ring_buffer_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu);
1396
1397         /*
1398          * Only the first commit can update the timestamp.
1399          * Yes there is a race here. If an interrupt comes in
1400          * just after the conditional and it traces too, then it
1401          * will also check the deltas. More than one timestamp may
1402          * also be made. But only the entry that did the actual
1403          * commit will be something other than zero.
1404          */
1405         if (cpu_buffer->tail_page == cpu_buffer->commit_page &&
1406             rb_page_write(cpu_buffer->tail_page) ==
1407             rb_commit_index(cpu_buffer)) {
1408
1409                 delta = ts - cpu_buffer->write_stamp;
1410
1411                 /* make sure this delta is calculated here */
1412                 barrier();
1413
1414                 /* Did the write stamp get updated already? */
1415                 if (unlikely(ts < cpu_buffer->write_stamp))
1416                         delta = 0;
1417
1418                 if (test_time_stamp(delta)) {
1419
1420                         commit = rb_add_time_stamp(cpu_buffer, &ts, &delta);
1421
1422                         if (commit == -EBUSY)
1423                                 return NULL;
1424
1425                         if (commit == -EAGAIN)
1426                                 goto again;
1427
1428                         RB_WARN_ON(cpu_buffer, commit < 0);
1429                 }
1430         } else
1431                 /* Non commits have zero deltas */
1432                 delta = 0;
1433
1434         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
1435         if (PTR_ERR(event) == -EAGAIN)
1436                 goto again;
1437
1438         if (!event) {
1439                 if (unlikely(commit))
1440                         /*
1441                          * Ouch! We needed a timestamp and it was commited. But
1442                          * we didn't get our event reserved.
1443                          */
1444                         rb_set_commit_to_write(cpu_buffer);
1445                 return NULL;
1446         }
1447
1448         /*
1449          * If the timestamp was commited, make the commit our entry
1450          * now so that we will update it when needed.
1451          */
1452         if (commit)
1453                 rb_set_commit_event(cpu_buffer, event);
1454         else if (!rb_is_commit(cpu_buffer, event))
1455                 delta = 0;
1456
1457         event->time_delta = delta;
1458
1459         return event;
1460 }
1461
1462 static DEFINE_PER_CPU(int, rb_need_resched);
1463
1464 /**
1465  * ring_buffer_lock_reserve - reserve a part of the buffer
1466  * @buffer: the ring buffer to reserve from
1467  * @length: the length of the data to reserve (excluding event header)
1468  *
1469  * Returns a reseverd event on the ring buffer to copy directly to.
1470  * The user of this interface will need to get the body to write into
1471  * and can use the ring_buffer_event_data() interface.
1472  *
1473  * The length is the length of the data needed, not the event length
1474  * which also includes the event header.
1475  *
1476  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1477  * If NULL is returned, then nothing has been allocated or locked.
1478  */
1479 struct ring_buffer_event *
1480 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
1481 {
1482         struct ring_buffer_per_cpu *cpu_buffer;
1483         struct ring_buffer_event *event;
1484         int cpu, resched;
1485
1486         if (ring_buffer_flags != RB_BUFFERS_ON)
1487                 return NULL;
1488
1489         if (atomic_read(&buffer->record_disabled))
1490                 return NULL;
1491
1492         /* If we are tracing schedule, we don't want to recurse */
1493         resched = ftrace_preempt_disable();
1494
1495         cpu = raw_smp_processor_id();
1496
1497         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1498                 goto out;
1499
1500         cpu_buffer = buffer->buffers[cpu];
1501
1502         if (atomic_read(&cpu_buffer->record_disabled))
1503                 goto out;
1504
1505         length = rb_calculate_event_length(length);
1506         if (length > BUF_PAGE_SIZE)
1507                 goto out;
1508
1509         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
1510         if (!event)
1511                 goto out;
1512
1513         /*
1514          * Need to store resched state on this cpu.
1515          * Only the first needs to.
1516          */
1517
1518         if (preempt_count() == 1)
1519                 per_cpu(rb_need_resched, cpu) = resched;
1520
1521         return event;
1522
1523  out:
1524         ftrace_preempt_enable(resched);
1525         return NULL;
1526 }
1527 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
1528
1529 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
1530                       struct ring_buffer_event *event)
1531 {
1532         cpu_buffer->entries++;
1533
1534         /* Only process further if we own the commit */
1535         if (!rb_is_commit(cpu_buffer, event))
1536                 return;
1537
1538         cpu_buffer->write_stamp += event->time_delta;
1539
1540         rb_set_commit_to_write(cpu_buffer);
1541 }
1542
1543 /**
1544  * ring_buffer_unlock_commit - commit a reserved
1545  * @buffer: The buffer to commit to
1546  * @event: The event pointer to commit.
1547  *
1548  * This commits the data to the ring buffer, and releases any locks held.
1549  *
1550  * Must be paired with ring_buffer_lock_reserve.
1551  */
1552 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
1553                               struct ring_buffer_event *event)
1554 {
1555         struct ring_buffer_per_cpu *cpu_buffer;
1556         int cpu = raw_smp_processor_id();
1557
1558         cpu_buffer = buffer->buffers[cpu];
1559
1560         rb_commit(cpu_buffer, event);
1561
1562         /*
1563          * Only the last preempt count needs to restore preemption.
1564          */
1565         if (preempt_count() == 1)
1566                 ftrace_preempt_enable(per_cpu(rb_need_resched, cpu));
1567         else
1568                 preempt_enable_no_resched_notrace();
1569
1570         return 0;
1571 }
1572 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
1573
1574 /**
1575  * ring_buffer_write - write data to the buffer without reserving
1576  * @buffer: The ring buffer to write to.
1577  * @length: The length of the data being written (excluding the event header)
1578  * @data: The data to write to the buffer.
1579  *
1580  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1581  * one function. If you already have the data to write to the buffer, it
1582  * may be easier to simply call this function.
1583  *
1584  * Note, like ring_buffer_lock_reserve, the length is the length of the data
1585  * and not the length of the event which would hold the header.
1586  */
1587 int ring_buffer_write(struct ring_buffer *buffer,
1588                         unsigned long length,
1589                         void *data)
1590 {
1591         struct ring_buffer_per_cpu *cpu_buffer;
1592         struct ring_buffer_event *event;
1593         unsigned long event_length;
1594         void *body;
1595         int ret = -EBUSY;
1596         int cpu, resched;
1597
1598         if (ring_buffer_flags != RB_BUFFERS_ON)
1599                 return -EBUSY;
1600
1601         if (atomic_read(&buffer->record_disabled))
1602                 return -EBUSY;
1603
1604         resched = ftrace_preempt_disable();
1605
1606         cpu = raw_smp_processor_id();
1607
1608         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1609                 goto out;
1610
1611         cpu_buffer = buffer->buffers[cpu];
1612
1613         if (atomic_read(&cpu_buffer->record_disabled))
1614                 goto out;
1615
1616         event_length = rb_calculate_event_length(length);
1617         event = rb_reserve_next_event(cpu_buffer,
1618                                       RINGBUF_TYPE_DATA, event_length);
1619         if (!event)
1620                 goto out;
1621
1622         body = rb_event_data(event);
1623
1624         memcpy(body, data, length);
1625
1626         rb_commit(cpu_buffer, event);
1627
1628         ret = 0;
1629  out:
1630         ftrace_preempt_enable(resched);
1631
1632         return ret;
1633 }
1634 EXPORT_SYMBOL_GPL(ring_buffer_write);
1635
1636 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
1637 {
1638         struct buffer_page *reader = cpu_buffer->reader_page;
1639         struct buffer_page *head = cpu_buffer->head_page;
1640         struct buffer_page *commit = cpu_buffer->commit_page;
1641
1642         return reader->read == rb_page_commit(reader) &&
1643                 (commit == reader ||
1644                  (commit == head &&
1645                   head->read == rb_page_commit(commit)));
1646 }
1647
1648 /**
1649  * ring_buffer_record_disable - stop all writes into the buffer
1650  * @buffer: The ring buffer to stop writes to.
1651  *
1652  * This prevents all writes to the buffer. Any attempt to write
1653  * to the buffer after this will fail and return NULL.
1654  *
1655  * The caller should call synchronize_sched() after this.
1656  */
1657 void ring_buffer_record_disable(struct ring_buffer *buffer)
1658 {
1659         atomic_inc(&buffer->record_disabled);
1660 }
1661 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
1662
1663 /**
1664  * ring_buffer_record_enable - enable writes to the buffer
1665  * @buffer: The ring buffer to enable writes
1666  *
1667  * Note, multiple disables will need the same number of enables
1668  * to truely enable the writing (much like preempt_disable).
1669  */
1670 void ring_buffer_record_enable(struct ring_buffer *buffer)
1671 {
1672         atomic_dec(&buffer->record_disabled);
1673 }
1674 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
1675
1676 /**
1677  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1678  * @buffer: The ring buffer to stop writes to.
1679  * @cpu: The CPU buffer to stop
1680  *
1681  * This prevents all writes to the buffer. Any attempt to write
1682  * to the buffer after this will fail and return NULL.
1683  *
1684  * The caller should call synchronize_sched() after this.
1685  */
1686 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1687 {
1688         struct ring_buffer_per_cpu *cpu_buffer;
1689
1690         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1691                 return;
1692
1693         cpu_buffer = buffer->buffers[cpu];
1694         atomic_inc(&cpu_buffer->record_disabled);
1695 }
1696 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
1697
1698 /**
1699  * ring_buffer_record_enable_cpu - enable writes to the buffer
1700  * @buffer: The ring buffer to enable writes
1701  * @cpu: The CPU to enable.
1702  *
1703  * Note, multiple disables will need the same number of enables
1704  * to truely enable the writing (much like preempt_disable).
1705  */
1706 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1707 {
1708         struct ring_buffer_per_cpu *cpu_buffer;
1709
1710         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1711                 return;
1712
1713         cpu_buffer = buffer->buffers[cpu];
1714         atomic_dec(&cpu_buffer->record_disabled);
1715 }
1716 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
1717
1718 /**
1719  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1720  * @buffer: The ring buffer
1721  * @cpu: The per CPU buffer to get the entries from.
1722  */
1723 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1724 {
1725         struct ring_buffer_per_cpu *cpu_buffer;
1726         unsigned long ret;
1727
1728         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1729                 return 0;
1730
1731         cpu_buffer = buffer->buffers[cpu];
1732         ret = cpu_buffer->entries;
1733
1734         return ret;
1735 }
1736 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
1737
1738 /**
1739  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1740  * @buffer: The ring buffer
1741  * @cpu: The per CPU buffer to get the number of overruns from
1742  */
1743 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1744 {
1745         struct ring_buffer_per_cpu *cpu_buffer;
1746         unsigned long ret;
1747
1748         if (!cpumask_test_cpu(cpu, buffer->cpumask))
1749                 return 0;
1750
1751         cpu_buffer = buffer->buffers[cpu];
1752         ret = cpu_buffer->overrun;
1753
1754         return ret;
1755 }
1756 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
1757
1758 /**
1759  * ring_buffer_entries - get the number of entries in a buffer
1760  * @buffer: The ring buffer
1761  *
1762  * Returns the total number of entries in the ring buffer
1763  * (all CPU entries)
1764  */
1765 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1766 {
1767         struct ring_buffer_per_cpu *cpu_buffer;
1768         unsigned long entries = 0;
1769         int cpu;
1770
1771         /* if you care about this being correct, lock the buffer */
1772         for_each_buffer_cpu(buffer, cpu) {
1773                 cpu_buffer = buffer->buffers[cpu];
1774                 entries += cpu_buffer->entries;
1775         }
1776
1777         return entries;
1778 }
1779 EXPORT_SYMBOL_GPL(ring_buffer_entries);
1780
1781 /**
1782  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1783  * @buffer: The ring buffer
1784  *
1785  * Returns the total number of overruns in the ring buffer
1786  * (all CPU entries)
1787  */
1788 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1789 {
1790         struct ring_buffer_per_cpu *cpu_buffer;
1791         unsigned long overruns = 0;
1792         int cpu;
1793
1794         /* if you care about this being correct, lock the buffer */
1795         for_each_buffer_cpu(buffer, cpu) {
1796                 cpu_buffer = buffer->buffers[cpu];
1797                 overruns += cpu_buffer->overrun;
1798         }
1799
1800         return overruns;
1801 }
1802 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
1803
1804 static void rb_iter_reset(struct ring_buffer_iter *iter)
1805 {
1806         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1807
1808         /* Iterator usage is expected to have record disabled */
1809         if (list_empty(&cpu_buffer->reader_page->list)) {
1810                 iter->head_page = cpu_buffer->head_page;
1811                 iter->head = cpu_buffer->head_page->read;
1812         } else {
1813                 iter->head_page = cpu_buffer->reader_page;
1814                 iter->head = cpu_buffer->reader_page->read;
1815         }
1816         if (iter->head)
1817                 iter->read_stamp = cpu_buffer->read_stamp;
1818         else
1819                 iter->read_stamp = iter->head_page->page->time_stamp;
1820 }
1821
1822 /**
1823  * ring_buffer_iter_reset - reset an iterator
1824  * @iter: The iterator to reset
1825  *
1826  * Resets the iterator, so that it will start from the beginning
1827  * again.
1828  */
1829 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1830 {
1831         struct ring_buffer_per_cpu *cpu_buffer;
1832         unsigned long flags;
1833
1834         if (!iter)
1835                 return;
1836
1837         cpu_buffer = iter->cpu_buffer;
1838
1839         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1840         rb_iter_reset(iter);
1841         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1842 }
1843 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
1844
1845 /**
1846  * ring_buffer_iter_empty - check if an iterator has no more to read
1847  * @iter: The iterator to check
1848  */
1849 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1850 {
1851         struct ring_buffer_per_cpu *cpu_buffer;
1852
1853         cpu_buffer = iter->cpu_buffer;
1854
1855         return iter->head_page == cpu_buffer->commit_page &&
1856                 iter->head == rb_commit_index(cpu_buffer);
1857 }
1858 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
1859
1860 static void
1861 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1862                      struct ring_buffer_event *event)
1863 {
1864         u64 delta;
1865
1866         switch (event->type) {
1867         case RINGBUF_TYPE_PADDING:
1868                 return;
1869
1870         case RINGBUF_TYPE_TIME_EXTEND:
1871                 delta = event->array[0];
1872                 delta <<= TS_SHIFT;
1873                 delta += event->time_delta;
1874                 cpu_buffer->read_stamp += delta;
1875                 return;
1876
1877         case RINGBUF_TYPE_TIME_STAMP:
1878                 /* FIXME: not implemented */
1879                 return;
1880
1881         case RINGBUF_TYPE_DATA:
1882                 cpu_buffer->read_stamp += event->time_delta;
1883                 return;
1884
1885         default:
1886                 BUG();
1887         }
1888         return;
1889 }
1890
1891 static void
1892 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1893                           struct ring_buffer_event *event)
1894 {
1895         u64 delta;
1896
1897         switch (event->type) {
1898         case RINGBUF_TYPE_PADDING:
1899                 return;
1900
1901         case RINGBUF_TYPE_TIME_EXTEND:
1902                 delta = event->array[0];
1903                 delta <<= TS_SHIFT;
1904                 delta += event->time_delta;
1905                 iter->read_stamp += delta;
1906                 return;
1907
1908         case RINGBUF_TYPE_TIME_STAMP:
1909                 /* FIXME: not implemented */
1910                 return;
1911
1912         case RINGBUF_TYPE_DATA:
1913                 iter->read_stamp += event->time_delta;
1914                 return;
1915
1916         default:
1917                 BUG();
1918         }
1919         return;
1920 }
1921
1922 static struct buffer_page *
1923 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1924 {
1925         struct buffer_page *reader = NULL;
1926         unsigned long flags;
1927         int nr_loops = 0;
1928
1929         local_irq_save(flags);
1930         __raw_spin_lock(&cpu_buffer->lock);
1931
1932  again:
1933         /*
1934          * This should normally only loop twice. But because the
1935          * start of the reader inserts an empty page, it causes
1936          * a case where we will loop three times. There should be no
1937          * reason to loop four times (that I know of).
1938          */
1939         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
1940                 reader = NULL;
1941                 goto out;
1942         }
1943
1944         reader = cpu_buffer->reader_page;
1945
1946         /* If there's more to read, return this page */
1947         if (cpu_buffer->reader_page->read < rb_page_size(reader))
1948                 goto out;
1949
1950         /* Never should we have an index greater than the size */
1951         if (RB_WARN_ON(cpu_buffer,
1952                        cpu_buffer->reader_page->read > rb_page_size(reader)))
1953                 goto out;
1954
1955         /* check if we caught up to the tail */
1956         reader = NULL;
1957         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
1958                 goto out;
1959
1960         /*
1961          * Splice the empty reader page into the list around the head.
1962          * Reset the reader page to size zero.
1963          */
1964
1965         reader = cpu_buffer->head_page;
1966         cpu_buffer->reader_page->list.next = reader->list.next;
1967         cpu_buffer->reader_page->list.prev = reader->list.prev;
1968
1969         local_set(&cpu_buffer->reader_page->write, 0);
1970         local_set(&cpu_buffer->reader_page->page->commit, 0);
1971
1972         /* Make the reader page now replace the head */
1973         reader->list.prev->next = &cpu_buffer->reader_page->list;
1974         reader->list.next->prev = &cpu_buffer->reader_page->list;
1975
1976         /*
1977          * If the tail is on the reader, then we must set the head
1978          * to the inserted page, otherwise we set it one before.
1979          */
1980         cpu_buffer->head_page = cpu_buffer->reader_page;
1981
1982         if (cpu_buffer->commit_page != reader)
1983                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1984
1985         /* Finally update the reader page to the new head */
1986         cpu_buffer->reader_page = reader;
1987         rb_reset_reader_page(cpu_buffer);
1988
1989         goto again;
1990
1991  out:
1992         __raw_spin_unlock(&cpu_buffer->lock);
1993         local_irq_restore(flags);
1994
1995         return reader;
1996 }
1997
1998 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
1999 {
2000         struct ring_buffer_event *event;
2001         struct buffer_page *reader;
2002         unsigned length;
2003
2004         reader = rb_get_reader_page(cpu_buffer);
2005
2006         /* This function should not be called when buffer is empty */
2007         if (RB_WARN_ON(cpu_buffer, !reader))
2008                 return;
2009
2010         event = rb_reader_event(cpu_buffer);
2011
2012         if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
2013                 cpu_buffer->entries--;
2014
2015         rb_update_read_stamp(cpu_buffer, event);
2016
2017         length = rb_event_length(event);
2018         cpu_buffer->reader_page->read += length;
2019 }
2020
2021 static void rb_advance_iter(struct ring_buffer_iter *iter)
2022 {
2023         struct ring_buffer *buffer;
2024         struct ring_buffer_per_cpu *cpu_buffer;
2025         struct ring_buffer_event *event;
2026         unsigned length;
2027
2028         cpu_buffer = iter->cpu_buffer;
2029         buffer = cpu_buffer->buffer;
2030
2031         /*
2032          * Check if we are at the end of the buffer.
2033          */
2034         if (iter->head >= rb_page_size(iter->head_page)) {
2035                 if (RB_WARN_ON(buffer,
2036                                iter->head_page == cpu_buffer->commit_page))
2037                         return;
2038                 rb_inc_iter(iter);
2039                 return;
2040         }
2041
2042         event = rb_iter_head_event(iter);
2043
2044         length = rb_event_length(event);
2045
2046         /*
2047          * This should not be called to advance the header if we are
2048          * at the tail of the buffer.
2049          */
2050         if (RB_WARN_ON(cpu_buffer,
2051                        (iter->head_page == cpu_buffer->commit_page) &&
2052                        (iter->head + length > rb_commit_index(cpu_buffer))))
2053                 return;
2054
2055         rb_update_iter_read_stamp(iter, event);
2056
2057         iter->head += length;
2058
2059         /* check for end of page padding */
2060         if ((iter->head >= rb_page_size(iter->head_page)) &&
2061             (iter->head_page != cpu_buffer->commit_page))
2062                 rb_advance_iter(iter);
2063 }
2064
2065 static struct ring_buffer_event *
2066 rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2067 {
2068         struct ring_buffer_per_cpu *cpu_buffer;
2069         struct ring_buffer_event *event;
2070         struct buffer_page *reader;
2071         int nr_loops = 0;
2072
2073         cpu_buffer = buffer->buffers[cpu];
2074
2075  again:
2076         /*
2077          * We repeat when a timestamp is encountered. It is possible
2078          * to get multiple timestamps from an interrupt entering just
2079          * as one timestamp is about to be written. The max times
2080          * that this can happen is the number of nested interrupts we
2081          * can have.  Nesting 10 deep of interrupts is clearly
2082          * an anomaly.
2083          */
2084         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
2085                 return NULL;
2086
2087         reader = rb_get_reader_page(cpu_buffer);
2088         if (!reader)
2089                 return NULL;
2090
2091         event = rb_reader_event(cpu_buffer);
2092
2093         switch (event->type) {
2094         case RINGBUF_TYPE_PADDING:
2095                 if (rb_null_event(event))
2096                         RB_WARN_ON(cpu_buffer, 1);
2097                 /*
2098                  * Because the writer could be discarding every
2099                  * event it creates (which would probably be bad)
2100                  * if we were to go back to "again" then we may never
2101                  * catch up, and will trigger the warn on, or lock
2102                  * the box. Return the padding, and we will release
2103                  * the current locks, and try again.
2104                  */
2105                 rb_advance_reader(cpu_buffer);
2106                 return event;
2107
2108         case RINGBUF_TYPE_TIME_EXTEND:
2109                 /* Internal data, OK to advance */
2110                 rb_advance_reader(cpu_buffer);
2111                 goto again;
2112
2113         case RINGBUF_TYPE_TIME_STAMP:
2114                 /* FIXME: not implemented */
2115                 rb_advance_reader(cpu_buffer);
2116                 goto again;
2117
2118         case RINGBUF_TYPE_DATA:
2119                 if (ts) {
2120                         *ts = cpu_buffer->read_stamp + event->time_delta;
2121                         ring_buffer_normalize_time_stamp(buffer,
2122                                                          cpu_buffer->cpu, ts);
2123                 }
2124                 return event;
2125
2126         default:
2127                 BUG();
2128         }
2129
2130         return NULL;
2131 }
2132 EXPORT_SYMBOL_GPL(ring_buffer_peek);
2133
2134 static struct ring_buffer_event *
2135 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2136 {
2137         struct ring_buffer *buffer;
2138         struct ring_buffer_per_cpu *cpu_buffer;
2139         struct ring_buffer_event *event;
2140         int nr_loops = 0;
2141
2142         if (ring_buffer_iter_empty(iter))
2143                 return NULL;
2144
2145         cpu_buffer = iter->cpu_buffer;
2146         buffer = cpu_buffer->buffer;
2147
2148  again:
2149         /*
2150          * We repeat when a timestamp is encountered. It is possible
2151          * to get multiple timestamps from an interrupt entering just
2152          * as one timestamp is about to be written. The max times
2153          * that this can happen is the number of nested interrupts we
2154          * can have. Nesting 10 deep of interrupts is clearly
2155          * an anomaly.
2156          */
2157         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 10))
2158                 return NULL;
2159
2160         if (rb_per_cpu_empty(cpu_buffer))
2161                 return NULL;
2162
2163         event = rb_iter_head_event(iter);
2164
2165         switch (event->type) {
2166         case RINGBUF_TYPE_PADDING:
2167                 if (rb_null_event(event)) {
2168                         rb_inc_iter(iter);
2169                         goto again;
2170                 }
2171                 rb_advance_iter(iter);
2172                 return event;
2173
2174         case RINGBUF_TYPE_TIME_EXTEND:
2175                 /* Internal data, OK to advance */
2176                 rb_advance_iter(iter);
2177                 goto again;
2178
2179         case RINGBUF_TYPE_TIME_STAMP:
2180                 /* FIXME: not implemented */
2181                 rb_advance_iter(iter);
2182                 goto again;
2183
2184         case RINGBUF_TYPE_DATA:
2185                 if (ts) {
2186                         *ts = iter->read_stamp + event->time_delta;
2187                         ring_buffer_normalize_time_stamp(buffer,
2188                                                          cpu_buffer->cpu, ts);
2189                 }
2190                 return event;
2191
2192         default:
2193                 BUG();
2194         }
2195
2196         return NULL;
2197 }
2198 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
2199
2200 /**
2201  * ring_buffer_peek - peek at the next event to be read
2202  * @buffer: The ring buffer to read
2203  * @cpu: The cpu to peak at
2204  * @ts: The timestamp counter of this event.
2205  *
2206  * This will return the event that will be read next, but does
2207  * not consume the data.
2208  */
2209 struct ring_buffer_event *
2210 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
2211 {
2212         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2213         struct ring_buffer_event *event;
2214         unsigned long flags;
2215
2216         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2217                 return NULL;
2218
2219  again:
2220         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2221         event = rb_buffer_peek(buffer, cpu, ts);
2222         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2223
2224         if (event && event->type == RINGBUF_TYPE_PADDING) {
2225                 cpu_relax();
2226                 goto again;
2227         }
2228
2229         return event;
2230 }
2231
2232 /**
2233  * ring_buffer_iter_peek - peek at the next event to be read
2234  * @iter: The ring buffer iterator
2235  * @ts: The timestamp counter of this event.
2236  *
2237  * This will return the event that will be read next, but does
2238  * not increment the iterator.
2239  */
2240 struct ring_buffer_event *
2241 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
2242 {
2243         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2244         struct ring_buffer_event *event;
2245         unsigned long flags;
2246
2247  again:
2248         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2249         event = rb_iter_peek(iter, ts);
2250         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2251
2252         if (event && event->type == RINGBUF_TYPE_PADDING) {
2253                 cpu_relax();
2254                 goto again;
2255         }
2256
2257         return event;
2258 }
2259
2260 /**
2261  * ring_buffer_consume - return an event and consume it
2262  * @buffer: The ring buffer to get the next event from
2263  *
2264  * Returns the next event in the ring buffer, and that event is consumed.
2265  * Meaning, that sequential reads will keep returning a different event,
2266  * and eventually empty the ring buffer if the producer is slower.
2267  */
2268 struct ring_buffer_event *
2269 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
2270 {
2271         struct ring_buffer_per_cpu *cpu_buffer;
2272         struct ring_buffer_event *event = NULL;
2273         unsigned long flags;
2274
2275  again:
2276         /* might be called in atomic */
2277         preempt_disable();
2278
2279         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2280                 goto out;
2281
2282         cpu_buffer = buffer->buffers[cpu];
2283         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2284
2285         event = rb_buffer_peek(buffer, cpu, ts);
2286         if (!event)
2287                 goto out_unlock;
2288
2289         rb_advance_reader(cpu_buffer);
2290
2291  out_unlock:
2292         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2293
2294  out:
2295         preempt_enable();
2296
2297         if (event && event->type == RINGBUF_TYPE_PADDING) {
2298                 cpu_relax();
2299                 goto again;
2300         }
2301
2302         return event;
2303 }
2304 EXPORT_SYMBOL_GPL(ring_buffer_consume);
2305
2306 /**
2307  * ring_buffer_read_start - start a non consuming read of the buffer
2308  * @buffer: The ring buffer to read from
2309  * @cpu: The cpu buffer to iterate over
2310  *
2311  * This starts up an iteration through the buffer. It also disables
2312  * the recording to the buffer until the reading is finished.
2313  * This prevents the reading from being corrupted. This is not
2314  * a consuming read, so a producer is not expected.
2315  *
2316  * Must be paired with ring_buffer_finish.
2317  */
2318 struct ring_buffer_iter *
2319 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
2320 {
2321         struct ring_buffer_per_cpu *cpu_buffer;
2322         struct ring_buffer_iter *iter;
2323         unsigned long flags;
2324
2325         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2326                 return NULL;
2327
2328         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
2329         if (!iter)
2330                 return NULL;
2331
2332         cpu_buffer = buffer->buffers[cpu];
2333
2334         iter->cpu_buffer = cpu_buffer;
2335
2336         atomic_inc(&cpu_buffer->record_disabled);
2337         synchronize_sched();
2338
2339         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2340         __raw_spin_lock(&cpu_buffer->lock);
2341         rb_iter_reset(iter);
2342         __raw_spin_unlock(&cpu_buffer->lock);
2343         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2344
2345         return iter;
2346 }
2347 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
2348
2349 /**
2350  * ring_buffer_finish - finish reading the iterator of the buffer
2351  * @iter: The iterator retrieved by ring_buffer_start
2352  *
2353  * This re-enables the recording to the buffer, and frees the
2354  * iterator.
2355  */
2356 void
2357 ring_buffer_read_finish(struct ring_buffer_iter *iter)
2358 {
2359         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2360
2361         atomic_dec(&cpu_buffer->record_disabled);
2362         kfree(iter);
2363 }
2364 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
2365
2366 /**
2367  * ring_buffer_read - read the next item in the ring buffer by the iterator
2368  * @iter: The ring buffer iterator
2369  * @ts: The time stamp of the event read.
2370  *
2371  * This reads the next event in the ring buffer and increments the iterator.
2372  */
2373 struct ring_buffer_event *
2374 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
2375 {
2376         struct ring_buffer_event *event;
2377         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2378         unsigned long flags;
2379
2380  again:
2381         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2382         event = rb_iter_peek(iter, ts);
2383         if (!event)
2384                 goto out;
2385
2386         rb_advance_iter(iter);
2387  out:
2388         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2389
2390         if (event && event->type == RINGBUF_TYPE_PADDING) {
2391                 cpu_relax();
2392                 goto again;
2393         }
2394
2395         return event;
2396 }
2397 EXPORT_SYMBOL_GPL(ring_buffer_read);
2398
2399 /**
2400  * ring_buffer_size - return the size of the ring buffer (in bytes)
2401  * @buffer: The ring buffer.
2402  */
2403 unsigned long ring_buffer_size(struct ring_buffer *buffer)
2404 {
2405         return BUF_PAGE_SIZE * buffer->pages;
2406 }
2407 EXPORT_SYMBOL_GPL(ring_buffer_size);
2408
2409 static void
2410 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
2411 {
2412         cpu_buffer->head_page
2413                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
2414         local_set(&cpu_buffer->head_page->write, 0);
2415         local_set(&cpu_buffer->head_page->page->commit, 0);
2416
2417         cpu_buffer->head_page->read = 0;
2418
2419         cpu_buffer->tail_page = cpu_buffer->head_page;
2420         cpu_buffer->commit_page = cpu_buffer->head_page;
2421
2422         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
2423         local_set(&cpu_buffer->reader_page->write, 0);
2424         local_set(&cpu_buffer->reader_page->page->commit, 0);
2425         cpu_buffer->reader_page->read = 0;
2426
2427         cpu_buffer->overrun = 0;
2428         cpu_buffer->entries = 0;
2429
2430         cpu_buffer->write_stamp = 0;
2431         cpu_buffer->read_stamp = 0;
2432 }
2433
2434 /**
2435  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2436  * @buffer: The ring buffer to reset a per cpu buffer of
2437  * @cpu: The CPU buffer to be reset
2438  */
2439 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
2440 {
2441         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2442         unsigned long flags;
2443
2444         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2445                 return;
2446
2447         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2448
2449         __raw_spin_lock(&cpu_buffer->lock);
2450
2451         rb_reset_cpu(cpu_buffer);
2452
2453         __raw_spin_unlock(&cpu_buffer->lock);
2454
2455         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2456 }
2457 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
2458
2459 /**
2460  * ring_buffer_reset - reset a ring buffer
2461  * @buffer: The ring buffer to reset all cpu buffers
2462  */
2463 void ring_buffer_reset(struct ring_buffer *buffer)
2464 {
2465         int cpu;
2466
2467         for_each_buffer_cpu(buffer, cpu)
2468                 ring_buffer_reset_cpu(buffer, cpu);
2469 }
2470 EXPORT_SYMBOL_GPL(ring_buffer_reset);
2471
2472 /**
2473  * rind_buffer_empty - is the ring buffer empty?
2474  * @buffer: The ring buffer to test
2475  */
2476 int ring_buffer_empty(struct ring_buffer *buffer)
2477 {
2478         struct ring_buffer_per_cpu *cpu_buffer;
2479         int cpu;
2480
2481         /* yes this is racy, but if you don't like the race, lock the buffer */
2482         for_each_buffer_cpu(buffer, cpu) {
2483                 cpu_buffer = buffer->buffers[cpu];
2484                 if (!rb_per_cpu_empty(cpu_buffer))
2485                         return 0;
2486         }
2487
2488         return 1;
2489 }
2490 EXPORT_SYMBOL_GPL(ring_buffer_empty);
2491
2492 /**
2493  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2494  * @buffer: The ring buffer
2495  * @cpu: The CPU buffer to test
2496  */
2497 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
2498 {
2499         struct ring_buffer_per_cpu *cpu_buffer;
2500         int ret;
2501
2502         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2503                 return 1;
2504
2505         cpu_buffer = buffer->buffers[cpu];
2506         ret = rb_per_cpu_empty(cpu_buffer);
2507
2508
2509         return ret;
2510 }
2511 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
2512
2513 /**
2514  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2515  * @buffer_a: One buffer to swap with
2516  * @buffer_b: The other buffer to swap with
2517  *
2518  * This function is useful for tracers that want to take a "snapshot"
2519  * of a CPU buffer and has another back up buffer lying around.
2520  * it is expected that the tracer handles the cpu buffer not being
2521  * used at the moment.
2522  */
2523 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
2524                          struct ring_buffer *buffer_b, int cpu)
2525 {
2526         struct ring_buffer_per_cpu *cpu_buffer_a;
2527         struct ring_buffer_per_cpu *cpu_buffer_b;
2528         int ret = -EINVAL;
2529
2530         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
2531             !cpumask_test_cpu(cpu, buffer_b->cpumask))
2532                 goto out;
2533
2534         /* At least make sure the two buffers are somewhat the same */
2535         if (buffer_a->pages != buffer_b->pages)
2536                 goto out;
2537
2538         ret = -EAGAIN;
2539
2540         if (ring_buffer_flags != RB_BUFFERS_ON)
2541                 goto out;
2542
2543         if (atomic_read(&buffer_a->record_disabled))
2544                 goto out;
2545
2546         if (atomic_read(&buffer_b->record_disabled))
2547                 goto out;
2548
2549         cpu_buffer_a = buffer_a->buffers[cpu];
2550         cpu_buffer_b = buffer_b->buffers[cpu];
2551
2552         if (atomic_read(&cpu_buffer_a->record_disabled))
2553                 goto out;
2554
2555         if (atomic_read(&cpu_buffer_b->record_disabled))
2556                 goto out;
2557
2558         /*
2559          * We can't do a synchronize_sched here because this
2560          * function can be called in atomic context.
2561          * Normally this will be called from the same CPU as cpu.
2562          * If not it's up to the caller to protect this.
2563          */
2564         atomic_inc(&cpu_buffer_a->record_disabled);
2565         atomic_inc(&cpu_buffer_b->record_disabled);
2566
2567         buffer_a->buffers[cpu] = cpu_buffer_b;
2568         buffer_b->buffers[cpu] = cpu_buffer_a;
2569
2570         cpu_buffer_b->buffer = buffer_a;
2571         cpu_buffer_a->buffer = buffer_b;
2572
2573         atomic_dec(&cpu_buffer_a->record_disabled);
2574         atomic_dec(&cpu_buffer_b->record_disabled);
2575
2576         ret = 0;
2577 out:
2578         return ret;
2579 }
2580 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
2581
2582 static void rb_remove_entries(struct ring_buffer_per_cpu *cpu_buffer,
2583                               struct buffer_data_page *bpage,
2584                               unsigned int offset)
2585 {
2586         struct ring_buffer_event *event;
2587         unsigned long head;
2588
2589         __raw_spin_lock(&cpu_buffer->lock);
2590         for (head = offset; head < local_read(&bpage->commit);
2591              head += rb_event_length(event)) {
2592
2593                 event = __rb_data_page_index(bpage, head);
2594                 if (RB_WARN_ON(cpu_buffer, rb_null_event(event)))
2595                         return;
2596                 /* Only count data entries */
2597                 if (event->type != RINGBUF_TYPE_DATA)
2598                         continue;
2599                 cpu_buffer->entries--;
2600         }
2601         __raw_spin_unlock(&cpu_buffer->lock);
2602 }
2603
2604 /**
2605  * ring_buffer_alloc_read_page - allocate a page to read from buffer
2606  * @buffer: the buffer to allocate for.
2607  *
2608  * This function is used in conjunction with ring_buffer_read_page.
2609  * When reading a full page from the ring buffer, these functions
2610  * can be used to speed up the process. The calling function should
2611  * allocate a few pages first with this function. Then when it
2612  * needs to get pages from the ring buffer, it passes the result
2613  * of this function into ring_buffer_read_page, which will swap
2614  * the page that was allocated, with the read page of the buffer.
2615  *
2616  * Returns:
2617  *  The page allocated, or NULL on error.
2618  */
2619 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer)
2620 {
2621         struct buffer_data_page *bpage;
2622         unsigned long addr;
2623
2624         addr = __get_free_page(GFP_KERNEL);
2625         if (!addr)
2626                 return NULL;
2627
2628         bpage = (void *)addr;
2629
2630         rb_init_page(bpage);
2631
2632         return bpage;
2633 }
2634
2635 /**
2636  * ring_buffer_free_read_page - free an allocated read page
2637  * @buffer: the buffer the page was allocate for
2638  * @data: the page to free
2639  *
2640  * Free a page allocated from ring_buffer_alloc_read_page.
2641  */
2642 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
2643 {
2644         free_page((unsigned long)data);
2645 }
2646
2647 /**
2648  * ring_buffer_read_page - extract a page from the ring buffer
2649  * @buffer: buffer to extract from
2650  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
2651  * @len: amount to extract
2652  * @cpu: the cpu of the buffer to extract
2653  * @full: should the extraction only happen when the page is full.
2654  *
2655  * This function will pull out a page from the ring buffer and consume it.
2656  * @data_page must be the address of the variable that was returned
2657  * from ring_buffer_alloc_read_page. This is because the page might be used
2658  * to swap with a page in the ring buffer.
2659  *
2660  * for example:
2661  *      rpage = ring_buffer_alloc_read_page(buffer);
2662  *      if (!rpage)
2663  *              return error;
2664  *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
2665  *      if (ret >= 0)
2666  *              process_page(rpage, ret);
2667  *
2668  * When @full is set, the function will not return true unless
2669  * the writer is off the reader page.
2670  *
2671  * Note: it is up to the calling functions to handle sleeps and wakeups.
2672  *  The ring buffer can be used anywhere in the kernel and can not
2673  *  blindly call wake_up. The layer that uses the ring buffer must be
2674  *  responsible for that.
2675  *
2676  * Returns:
2677  *  >=0 if data has been transferred, returns the offset of consumed data.
2678  *  <0 if no data has been transferred.
2679  */
2680 int ring_buffer_read_page(struct ring_buffer *buffer,
2681                           void **data_page, size_t len, int cpu, int full)
2682 {
2683         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
2684         struct ring_buffer_event *event;
2685         struct buffer_data_page *bpage;
2686         struct buffer_page *reader;
2687         unsigned long flags;
2688         unsigned int commit;
2689         unsigned int read;
2690         u64 save_timestamp;
2691         int ret = -1;
2692
2693         if (!cpumask_test_cpu(cpu, buffer->cpumask))
2694                 goto out;
2695
2696         /*
2697          * If len is not big enough to hold the page header, then
2698          * we can not copy anything.
2699          */
2700         if (len <= BUF_PAGE_HDR_SIZE)
2701                 goto out;
2702
2703         len -= BUF_PAGE_HDR_SIZE;
2704
2705         if (!data_page)
2706                 goto out;
2707
2708         bpage = *data_page;
2709         if (!bpage)
2710                 goto out;
2711
2712         spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2713
2714         reader = rb_get_reader_page(cpu_buffer);
2715         if (!reader)
2716                 goto out_unlock;
2717
2718         event = rb_reader_event(cpu_buffer);
2719
2720         read = reader->read;
2721         commit = rb_page_commit(reader);
2722
2723         /*
2724          * If this page has been partially read or
2725          * if len is not big enough to read the rest of the page or
2726          * a writer is still on the page, then
2727          * we must copy the data from the page to the buffer.
2728          * Otherwise, we can simply swap the page with the one passed in.
2729          */
2730         if (read || (len < (commit - read)) ||
2731             cpu_buffer->reader_page == cpu_buffer->commit_page) {
2732                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
2733                 unsigned int rpos = read;
2734                 unsigned int pos = 0;
2735                 unsigned int size;
2736
2737                 if (full)
2738                         goto out_unlock;
2739
2740                 if (len > (commit - read))
2741                         len = (commit - read);
2742
2743                 size = rb_event_length(event);
2744
2745                 if (len < size)
2746                         goto out_unlock;
2747
2748                 /* save the current timestamp, since the user will need it */
2749                 save_timestamp = cpu_buffer->read_stamp;
2750
2751                 /* Need to copy one event at a time */
2752                 do {
2753                         memcpy(bpage->data + pos, rpage->data + rpos, size);
2754
2755                         len -= size;
2756
2757                         rb_advance_reader(cpu_buffer);
2758                         rpos = reader->read;
2759                         pos += size;
2760
2761                         event = rb_reader_event(cpu_buffer);
2762                         size = rb_event_length(event);
2763                 } while (len > size);
2764
2765                 /* update bpage */
2766                 local_set(&bpage->commit, pos);
2767                 bpage->time_stamp = save_timestamp;
2768
2769                 /* we copied everything to the beginning */
2770                 read = 0;
2771         } else {
2772                 /* swap the pages */
2773                 rb_init_page(bpage);
2774                 bpage = reader->page;
2775                 reader->page = *data_page;
2776                 local_set(&reader->write, 0);
2777                 reader->read = 0;
2778                 *data_page = bpage;
2779
2780                 /* update the entry counter */
2781                 rb_remove_entries(cpu_buffer, bpage, read);
2782         }
2783         ret = read;
2784
2785  out_unlock:
2786         spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2787
2788  out:
2789         return ret;
2790 }
2791
2792 static ssize_t
2793 rb_simple_read(struct file *filp, char __user *ubuf,
2794                size_t cnt, loff_t *ppos)
2795 {
2796         unsigned long *p = filp->private_data;
2797         char buf[64];
2798         int r;
2799
2800         if (test_bit(RB_BUFFERS_DISABLED_BIT, p))
2801                 r = sprintf(buf, "permanently disabled\n");
2802         else
2803                 r = sprintf(buf, "%d\n", test_bit(RB_BUFFERS_ON_BIT, p));
2804
2805         return simple_read_from_buffer(ubuf, cnt, ppos, buf, r);
2806 }
2807
2808 static ssize_t
2809 rb_simple_write(struct file *filp, const char __user *ubuf,
2810                 size_t cnt, loff_t *ppos)
2811 {
2812         unsigned long *p = filp->private_data;
2813         char buf[64];
2814         unsigned long val;
2815         int ret;
2816
2817         if (cnt >= sizeof(buf))
2818                 return -EINVAL;
2819
2820         if (copy_from_user(&buf, ubuf, cnt))
2821                 return -EFAULT;
2822
2823         buf[cnt] = 0;
2824
2825         ret = strict_strtoul(buf, 10, &val);
2826         if (ret < 0)
2827                 return ret;
2828
2829         if (val)
2830                 set_bit(RB_BUFFERS_ON_BIT, p);
2831         else
2832                 clear_bit(RB_BUFFERS_ON_BIT, p);
2833
2834         (*ppos)++;
2835
2836         return cnt;
2837 }
2838
2839 static const struct file_operations rb_simple_fops = {
2840         .open           = tracing_open_generic,
2841         .read           = rb_simple_read,
2842         .write          = rb_simple_write,
2843 };
2844
2845
2846 static __init int rb_init_debugfs(void)
2847 {
2848         struct dentry *d_tracer;
2849         struct dentry *entry;
2850
2851         d_tracer = tracing_init_dentry();
2852
2853         entry = debugfs_create_file("tracing_on", 0644, d_tracer,
2854                                     &ring_buffer_flags, &rb_simple_fops);
2855         if (!entry)
2856                 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2857
2858         return 0;
2859 }
2860
2861 fs_initcall(rb_init_debugfs);
2862
2863 #ifdef CONFIG_HOTPLUG_CPU
2864 static int rb_cpu_notify(struct notifier_block *self,
2865                          unsigned long action, void *hcpu)
2866 {
2867         struct ring_buffer *buffer =
2868                 container_of(self, struct ring_buffer, cpu_notify);
2869         long cpu = (long)hcpu;
2870
2871         switch (action) {
2872         case CPU_UP_PREPARE:
2873         case CPU_UP_PREPARE_FROZEN:
2874                 if (cpu_isset(cpu, *buffer->cpumask))
2875                         return NOTIFY_OK;
2876
2877                 buffer->buffers[cpu] =
2878                         rb_allocate_cpu_buffer(buffer, cpu);
2879                 if (!buffer->buffers[cpu]) {
2880                         WARN(1, "failed to allocate ring buffer on CPU %ld\n",
2881                              cpu);
2882                         return NOTIFY_OK;
2883                 }
2884                 smp_wmb();
2885                 cpu_set(cpu, *buffer->cpumask);
2886                 break;
2887         case CPU_DOWN_PREPARE:
2888         case CPU_DOWN_PREPARE_FROZEN:
2889                 /*
2890                  * Do nothing.
2891                  *  If we were to free the buffer, then the user would
2892                  *  lose any trace that was in the buffer.
2893                  */
2894                 break;
2895         default:
2896                 break;
2897         }
2898         return NOTIFY_OK;
2899 }
2900 #endif