1 // Copyright 2017 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // Notes on buffering modes
6 // ------------------------
7 //
8 // Threads and strings are cached to improve performance and reduce buffer
9 // usage. The caching involves emitting separate records that identify
10 // threads/strings and then referring to them by a numeric id. For performance
11 // each thread in the application maintains its own cache.
12 //
13 // Oneshot: The trace buffer is just one large buffer, and records are written
14 // until the buffer is full after which all further records are dropped.
15 //
16 // Circular:
17 // The trace buffer is effectively divided into two pieces, and tracing begins
18 // by writing to the first piece. Once one buffer fills we start writing to the
19 // other one. This results in half the buffer being dropped at every switch,
20 // but simplifies things because we don't have to worry about varying record
21 // lengths.
22 //
23 // Streaming:
24 // The trace buffer is effectively divided into two pieces, and tracing begins
25 // by writing to the first piece. Once one buffer fills we start writing to the
26 // other buffer, if it is available, and notify the handler that the buffer is
27 // full. If the other buffer is not available, then records are dropped until
28 // it becomes available. The other buffer is unavailable between the point when
29 // it filled and when the handler reports back that the buffer's contents have
30 // been saved.
31 //
32 // There are two important properties we wish to preserve in circular and
33 // streaming modes:
34 // 1) We don't want records describing threads and strings to be dropped:
35 // otherwise records referring to them will have nothing to refer to.
36 // 2) We don't want thread records to be dropped at all: Fidelity of recording
37 // of all traced threads is important, even if some of their records are
38 // dropped.
39 // To implement both (1) and (2) we introduce a third buffer that holds
40 // records we don't want to drop called the "durable buffer". Threads and
41 // small strings are recorded there. The two buffers holding normal trace
42 // output are called "rolling buffers", as they fill we roll from one to the
43 // next. Thread and string records typically aren't very large, the durable
44 // buffer can hold a lot of records. To keep things simple, until there's a
45 // compelling reason to do something more, once the durable buffer fills
46 // tracing effectively stops, and all further records are dropped.
47 // Note: The term "rolling buffer" is intended to be internal to the trace
48 // engine/reader/manager and is not intended to appear in public APIs
49 // (at least not today).
50 //
51 // The protocol between the trace engine and the handler for saving buffers in
52 // streaming mode is as follows:
53 // 1) Buffer fills -> handler gets notified via
54 //    |trace_handler_ops::notify_buffer_full()|. Two arguments are passed
55 //    along with this request:
56 //    |wrapped_count| - records how many times tracing has wrapped from one
57 //    buffer to the next, and also records the current buffer which is the one
58 //    needing saving. Since there are two rolling buffers, the buffer to save
59 //    is |wrapped_count & 1|.
60 //    |durable_data_end| - records how much data has been written to the
61 //    durable buffer thus far. This data needs to be written before data from
62 //    the rolling buffers is written so string and thread references work.
63 // 2) The handler receives the "notify_buffer_full" request.
64 // 3) The handler saves new durable data since the last time, saves the
65 //    rolling buffer, and replies back to the engine via
66 //    |trace_engine_mark_buffer_saved()|.
67 // 4) The engine receives this notification and marks the buffer as now empty.
68 //    The next time the engine tries to allocate space from this buffer it will
69 //    succeed.
70 // Note that the handler is free to save buffers at whatever rate it can
71 // manage. The protocol allows for records to be dropped if buffers can't be
72 // saved fast enough.
73 
74 #include "context_impl.h"
75 
76 #include <assert.h>
77 #include <inttypes.h>
78 
79 #include <fbl/auto_lock.h>
80 #include <trace-engine/fields.h>
81 #include <trace-engine/handler.h>
82 
83 #include <atomic>
84 
85 namespace trace {
86 namespace {
87 
88 // The next context generation number.
89 std::atomic<uint32_t> g_next_generation{1u};
90 
91 } // namespace
92 } // namespace trace
93 
trace_context(void * buffer,size_t buffer_num_bytes,trace_buffering_mode_t buffering_mode,trace_handler_t * handler)94 trace_context::trace_context(void* buffer, size_t buffer_num_bytes,
95                              trace_buffering_mode_t buffering_mode,
96                              trace_handler_t* handler)
97     : generation_(trace::g_next_generation.fetch_add(1u, std::memory_order_relaxed) + 1u),
98       buffering_mode_(buffering_mode),
99       buffer_start_(reinterpret_cast<uint8_t*>(buffer)),
100       buffer_end_(buffer_start_ + buffer_num_bytes),
101       header_(reinterpret_cast<trace_buffer_header*>(buffer)),
102       handler_(handler) {
103     ZX_DEBUG_ASSERT(buffer_num_bytes >= kMinPhysicalBufferSize);
104     ZX_DEBUG_ASSERT(buffer_num_bytes <= kMaxPhysicalBufferSize);
105     ZX_DEBUG_ASSERT(generation_ != 0u);
106     ComputeBufferSizes();
107 }
108 
109 trace_context::~trace_context() = default;
110 
AllocRecord(size_t num_bytes)111 uint64_t* trace_context::AllocRecord(size_t num_bytes) {
112     ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
113     if (unlikely(num_bytes > TRACE_ENCODED_RECORD_MAX_LENGTH))
114         return nullptr;
115     static_assert(TRACE_ENCODED_RECORD_MAX_LENGTH < kMaxRollingBufferSize, "");
116 
117     // For the circular and streaming cases, try at most once for each buffer.
118     // Note: Keep the normal case of one successful pass the fast path.
119     // E.g., We don't do a mode comparison unless we have to.
120 
121     for (int iter = 0; iter < 2; ++iter) {
122         // TODO(dje): This can be optimized a bit. Later.
123         uint64_t offset_plus_counter =
124             rolling_buffer_current_.fetch_add(num_bytes,
125                                               std::memory_order_relaxed);
126         uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
127         int buffer_number = GetBufferNumber(wrapped_count);
128         uint64_t buffer_offset  = GetBufferOffset(offset_plus_counter);
129         // Note: There's no worry of an overflow in the calcs here.
130         if (likely(buffer_offset + num_bytes <= rolling_buffer_size_)) {
131             uint8_t* ptr = rolling_buffer_start_[buffer_number] + buffer_offset;
132             return reinterpret_cast<uint64_t*>(ptr); // success!
133         }
134 
135         // Buffer is full!
136 
137         switch (buffering_mode_) {
138         case TRACE_BUFFERING_MODE_ONESHOT:
139             ZX_DEBUG_ASSERT(iter == 0);
140             ZX_DEBUG_ASSERT(wrapped_count == 0);
141             ZX_DEBUG_ASSERT(buffer_number == 0);
142             MarkOneshotBufferFull(buffer_offset);
143             return nullptr;
144         case TRACE_BUFFERING_MODE_STREAMING: {
145             MarkRollingBufferFull(wrapped_count, buffer_offset);
146             // If the TraceManager is slow in saving buffers we could get
147             // here a lot. Do a quick check and early exit for this case.
148             if (unlikely(!IsOtherRollingBufferReady(buffer_number))) {
149                 MarkRecordDropped();
150                 StreamingBufferFullCheck(wrapped_count, buffer_offset);
151                 return nullptr;
152             }
153             break;
154         }
155         case TRACE_BUFFERING_MODE_CIRCULAR:
156             MarkRollingBufferFull(wrapped_count, buffer_offset);
157             break;
158         default:
159             __UNREACHABLE;
160         }
161 
162         if (iter == 1) {
163             // Second time through. We tried one buffer, it was full.
164             // We then switched to the other buffer, which was empty at
165             // the time, and now it is full too. This is technically
166             // possible in either circular or streaming modes, but rare.
167             // There are two possibilities here:
168             // 1) Keep trying (gated by some means).
169             // 2) Drop the record.
170             // In order to not introduce excessive latency into the app
171             // we choose (2). To assist the developer we at least provide
172             // a record that this happened, but since it's rare we keep
173             // it simple and maintain just a global count and no time
174             // information.
175             num_records_dropped_after_buffer_switch_.fetch_add(1, std::memory_order_relaxed);
176             return nullptr;
177         }
178 
179         if (!SwitchRollingBuffer(wrapped_count, buffer_offset)) {
180             MarkRecordDropped();
181             return nullptr;
182         }
183 
184         // Loop and try again.
185     }
186 
187     __UNREACHABLE;
188 }
189 
StreamingBufferFullCheck(uint32_t wrapped_count,uint64_t buffer_offset)190 void trace_context::StreamingBufferFullCheck(uint32_t wrapped_count,
191                                              uint64_t buffer_offset) {
192     // We allow the current offset to grow and grow as each
193     // new tracing request is made: It's a trade-off to not penalize
194     // performance in this case. The number of counter bits is enough
195     // to not make this a concern: See the comments for
196     // |kUsableBufferOffsetBits|.
197     //
198     // As an absolute paranoia check, if the current buffer offset
199     // approaches overflow, grab the lock and snap the offset back
200     // to the end of the buffer. We grab the lock so that the
201     // buffer can't change while we're doing this.
202     if (unlikely(buffer_offset > MaxUsableBufferOffset())) {
203         fbl::AutoLock lock(&buffer_switch_mutex_);
204         uint32_t current_wrapped_count = CurrentWrappedCount();
205         if (GetBufferNumber(current_wrapped_count) ==
206             GetBufferNumber(wrapped_count)) {
207             SnapToEnd(wrapped_count);
208         }
209     }
210 }
211 
212 // Returns false if there's some reason to not record this record.
213 
SwitchRollingBuffer(uint32_t wrapped_count,uint64_t buffer_offset)214 bool trace_context::SwitchRollingBuffer(uint32_t wrapped_count,
215                                         uint64_t buffer_offset) {
216     // While atomic variables are used to track things, we switch
217     // buffers under the lock due to multiple pieces of state being
218     // changed.
219     fbl::AutoLock lock(&buffer_switch_mutex_);
220 
221     // If the durable buffer happened to fill while we were waiting for
222     // the lock we're done.
223     if (unlikely(tracing_artificially_stopped_)) {
224         return false;
225     }
226 
227     uint32_t current_wrapped_count = CurrentWrappedCount();
228     // Anything allocated to the durable buffer after this point
229     // won't be for this buffer. This is racy, but all we need is
230     // some usable value for where the durable pointer is.
231     uint64_t durable_data_end = DurableBytesAllocated();
232 
233     ZX_DEBUG_ASSERT(wrapped_count <= current_wrapped_count);
234     if (likely(wrapped_count == current_wrapped_count)) {
235         // Haven't switched buffers yet.
236         if (buffering_mode_ == TRACE_BUFFERING_MODE_STREAMING) {
237             // Is the other buffer ready?
238             if (!IsOtherRollingBufferReady(GetBufferNumber(wrapped_count))) {
239                 // Nope. There are two possibilities here:
240                 // 1) Wait for the other buffer to be saved.
241                 // 2) Start dropping records until the other buffer is
242                 //    saved.
243                 // In order to not introduce excessive latency into the
244                 // app we choose (2). To assist the developer we at
245                 // least provide a record that indicates the window
246                 // during which we dropped records.
247                 // TODO(dje): Maybe have a future mode where we block
248                 // until there's space. This is useful during some
249                 // kinds of debugging: Something is going wrong and we
250                 // care less about performance and more about keeping
251                 // data, and the dropped data may be the clue to find
252                 // the bug.
253                 return false;
254             }
255 
256             SwitchRollingBufferLocked(wrapped_count, buffer_offset);
257 
258             // Notify the handler so it starts saving the buffer if
259             // we're in streaming mode.
260             // Note: The actual notification must be done *after*
261             // updating the buffer header: we need trace_manager to
262             // see the updates. The handler will get notified on the
263             // engine's async loop (and thus can't call back into us
264             // while we still have the lock).
265             NotifyRollingBufferFullLocked(wrapped_count, durable_data_end);
266         } else {
267             SwitchRollingBufferLocked(wrapped_count, buffer_offset);
268         }
269     } else {
270         // Someone else switched buffers while we were trying to obtain
271         // the lock. Nothing to do here.
272     }
273 
274     return true;
275 }
276 
AllocDurableRecord(size_t num_bytes)277 uint64_t* trace_context::AllocDurableRecord(size_t num_bytes) {
278     ZX_DEBUG_ASSERT(UsingDurableBuffer());
279     ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
280 
281     uint64_t buffer_offset =
282         durable_buffer_current_.fetch_add(num_bytes,
283                                           std::memory_order_relaxed);
284     if (likely(buffer_offset + num_bytes <= durable_buffer_size_)) {
285         uint8_t* ptr = durable_buffer_start_ + buffer_offset;
286         return reinterpret_cast<uint64_t*>(ptr); // success!
287     }
288 
289     // Buffer is full!
290     MarkDurableBufferFull(buffer_offset);
291 
292     return nullptr;
293 }
294 
AllocThreadIndex(trace_thread_index_t * out_index)295 bool trace_context::AllocThreadIndex(trace_thread_index_t* out_index) {
296     trace_thread_index_t index = next_thread_index_.fetch_add(1u, std::memory_order_relaxed);
297     if (unlikely(index > TRACE_ENCODED_THREAD_REF_MAX_INDEX)) {
298         // Guard again possible wrapping.
299         next_thread_index_.store(TRACE_ENCODED_THREAD_REF_MAX_INDEX + 1u,
300                                  std::memory_order_relaxed);
301         return false;
302     }
303     *out_index = index;
304     return true;
305 }
306 
AllocStringIndex(trace_string_index_t * out_index)307 bool trace_context::AllocStringIndex(trace_string_index_t* out_index) {
308     trace_string_index_t index = next_string_index_.fetch_add(1u, std::memory_order_relaxed);
309     if (unlikely(index > TRACE_ENCODED_STRING_REF_MAX_INDEX)) {
310         // Guard again possible wrapping.
311         next_string_index_.store(TRACE_ENCODED_STRING_REF_MAX_INDEX + 1u,
312                                  std::memory_order_relaxed);
313         return false;
314     }
315     *out_index = index;
316     return true;
317 }
318 
ComputeBufferSizes()319 void trace_context::ComputeBufferSizes() {
320     size_t full_buffer_size = buffer_end_ - buffer_start_;
321     ZX_DEBUG_ASSERT(full_buffer_size >= kMinPhysicalBufferSize);
322     ZX_DEBUG_ASSERT(full_buffer_size <= kMaxPhysicalBufferSize);
323     size_t header_size = sizeof(trace_buffer_header);
324     switch (buffering_mode_) {
325     case TRACE_BUFFERING_MODE_ONESHOT:
326         // Create one big buffer, where durable and non-durable records share
327         // the same buffer. There is no separate buffer for durable records.
328         durable_buffer_start_ = nullptr;
329         durable_buffer_size_ = 0;
330         rolling_buffer_start_[0] = buffer_start_ + header_size;
331         rolling_buffer_size_ = full_buffer_size - header_size;
332         // The second rolling buffer is not used.
333         rolling_buffer_start_[1] = nullptr;
334         break;
335     case TRACE_BUFFERING_MODE_CIRCULAR:
336     case TRACE_BUFFERING_MODE_STREAMING: {
337         // Rather than make things more complex on the user, at least for now,
338         // we choose the sizes of the durable and rolling buffers.
339         // Note: The durable buffer must have enough space for at least
340         // the initialization record.
341         // TODO(dje): The current choices are wip.
342         uint64_t avail = full_buffer_size - header_size;
343         uint64_t durable_buffer_size = GET_DURABLE_BUFFER_SIZE(avail);
344         if (durable_buffer_size > kMaxDurableBufferSize)
345             durable_buffer_size = kMaxDurableBufferSize;
346         // Further adjust |durable_buffer_size| to ensure all buffers are a
347         // multiple of 8. |full_buffer_size| is guaranteed by
348         // |trace_start_engine()| to be a multiple of 4096. We only assume
349         // header_size is a multiple of 8. In order for rolling_buffer_size
350         // to be a multiple of 8 we need (avail - durable_buffer_size) to be a
351         // multiple of 16. Round durable_buffer_size up as necessary.
352         uint64_t off_by = (avail - durable_buffer_size) & 15;
353         ZX_DEBUG_ASSERT(off_by == 0 || off_by == 8);
354         durable_buffer_size += off_by;
355         ZX_DEBUG_ASSERT((durable_buffer_size & 7) == 0);
356         // The value of |kMinPhysicalBufferSize| ensures this:
357         ZX_DEBUG_ASSERT(durable_buffer_size >= kMinDurableBufferSize);
358         uint64_t rolling_buffer_size = (avail - durable_buffer_size) / 2;
359         ZX_DEBUG_ASSERT((rolling_buffer_size & 7) == 0);
360         // We need to maintain the invariant that the entire buffer is used.
361         // This works if the buffer size is a multiple of
362         // sizeof(trace_buffer_header), which is true since the buffer is a
363         // vmo (some number of 4K pages).
364         ZX_DEBUG_ASSERT(durable_buffer_size + 2 * rolling_buffer_size == avail);
365         durable_buffer_start_ = buffer_start_ + header_size;
366         durable_buffer_size_ = durable_buffer_size;
367         rolling_buffer_start_[0] = durable_buffer_start_ + durable_buffer_size_;
368         rolling_buffer_start_[1] = rolling_buffer_start_[0] + rolling_buffer_size;
369         rolling_buffer_size_ = rolling_buffer_size;
370         break;
371     }
372     default:
373         __UNREACHABLE;
374     }
375 
376     durable_buffer_current_.store(0);
377     durable_buffer_full_mark_.store(0);
378     rolling_buffer_current_.store(0);
379     rolling_buffer_full_mark_[0].store(0);
380     rolling_buffer_full_mark_[1].store(0);
381 }
382 
InitBufferHeader()383 void trace_context::InitBufferHeader() {
384     memset(header_, 0, sizeof(*header_));
385 
386     header_->magic = TRACE_BUFFER_HEADER_MAGIC;
387     header_->version = TRACE_BUFFER_HEADER_V0;
388     header_->buffering_mode = static_cast<uint8_t>(buffering_mode_);
389     header_->total_size = buffer_end_ - buffer_start_;
390     header_->durable_buffer_size = durable_buffer_size_;
391     header_->rolling_buffer_size = rolling_buffer_size_;
392 }
393 
UpdateBufferHeaderAfterStopped()394 void trace_context::UpdateBufferHeaderAfterStopped() {
395     // If the buffer filled, then the current pointer is "snapped" to the end.
396     // Therefore in that case we need to use the buffer_full_mark.
397     uint64_t durable_last_offset = durable_buffer_current_.load(std::memory_order_relaxed);
398     uint64_t durable_buffer_full_mark = durable_buffer_full_mark_.load(std::memory_order_relaxed);
399     if (durable_buffer_full_mark != 0)
400         durable_last_offset = durable_buffer_full_mark;
401     header_->durable_data_end = durable_last_offset;
402 
403     uint64_t offset_plus_counter =
404         rolling_buffer_current_.load(std::memory_order_relaxed);
405     uint64_t last_offset = GetBufferOffset(offset_plus_counter);
406     uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
407     header_->wrapped_count = wrapped_count;
408     int buffer_number = GetBufferNumber(wrapped_count);
409     uint64_t buffer_full_mark = rolling_buffer_full_mark_[buffer_number].load(std::memory_order_relaxed);
410     if (buffer_full_mark != 0)
411         last_offset = buffer_full_mark;
412     header_->rolling_data_end[buffer_number] = last_offset;
413 
414     header_->num_records_dropped = num_records_dropped();
415 }
416 
RollingBytesAllocated() const417 size_t trace_context::RollingBytesAllocated() const {
418     switch (buffering_mode_) {
419     case TRACE_BUFFERING_MODE_ONESHOT: {
420         // There is a window during the processing of buffer-full where
421         // |rolling_buffer_current_| may point beyond the end of the buffer.
422         // This is ok, we don't promise anything better.
423         uint64_t full_bytes = rolling_buffer_full_mark_[0].load(std::memory_order_relaxed);
424         if (full_bytes != 0)
425             return full_bytes;
426         return rolling_buffer_current_.load(std::memory_order_relaxed);
427     }
428     case TRACE_BUFFERING_MODE_CIRCULAR:
429     case TRACE_BUFFERING_MODE_STREAMING: {
430         // Obtain the lock so that the buffers aren't switched on us while
431         // we're trying to compute the total.
432         fbl::AutoLock lock(&buffer_switch_mutex_);
433         uint64_t offset_plus_counter =
434             rolling_buffer_current_.load(std::memory_order_relaxed);
435         uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
436         int buffer_number = GetBufferNumber(wrapped_count);
437         // Note: If we catch things at the point where the buffer has
438         // filled, but before we swap buffers, then |buffer_offset| can point
439         // beyond the end. This is ok, we don't promise anything better.
440         uint64_t buffer_offset  = GetBufferOffset(offset_plus_counter);
441         if (wrapped_count == 0)
442             return buffer_offset;
443         // We've wrapped at least once, so the other buffer's "full mark"
444         // must be set. However, it may be zero if streaming and we happened
445         // to stop at a point where the buffer was saved, and hasn't
446         // subsequently been written to.
447         uint64_t full_mark_other_buffer = rolling_buffer_full_mark_[!buffer_number].load(std::memory_order_relaxed);
448         return full_mark_other_buffer + buffer_offset;
449     }
450     default:
451         __UNREACHABLE;
452     }
453 }
454 
DurableBytesAllocated() const455 size_t trace_context::DurableBytesAllocated() const {
456     // Note: This will return zero in oneshot mode (as it should).
457     uint64_t offset = durable_buffer_full_mark_.load(std::memory_order_relaxed);
458     if (offset == 0)
459         offset = durable_buffer_current_.load(std::memory_order_relaxed);
460     return offset;
461 }
462 
MarkDurableBufferFull(uint64_t last_offset)463 void trace_context::MarkDurableBufferFull(uint64_t last_offset) {
464     // Snap to the endpoint to reduce likelihood of pointer wrap-around.
465     // Otherwise each new attempt fill continually increase the offset.
466     durable_buffer_current_.store(reinterpret_cast<uint64_t>(durable_buffer_size_),
467                                   std::memory_order_relaxed);
468 
469     // Mark the end point if not already marked.
470     uintptr_t expected_mark = 0u;
471     if (durable_buffer_full_mark_.compare_exchange_strong(
472             expected_mark, last_offset,
473             std::memory_order_relaxed, std::memory_order_relaxed)) {
474         fprintf(stderr, "TraceEngine: durable buffer full @offset %" PRIu64 "\n",
475                 last_offset);
476         header_->durable_data_end = last_offset;
477 
478         // A record may be written that relies on this durable record.
479         // To preserve data integrity, we disable all further tracing.
480         // There is a small window where a non-durable record could get
481         // emitted that depends on this durable record. It's rare
482         // enough and inconsequential enough that we ignore it.
483         // TODO(dje): Another possibility is we could let tracing
484         // continue and start allocating future durable records in the
485         // rolling buffers, and accept potentially lost durable
486         // records. Another possibility is to remove the durable buffer,
487         // and, say, have separate caches for each rolling buffer.
488         MarkTracingArtificiallyStopped();
489     }
490 }
491 
MarkOneshotBufferFull(uint64_t last_offset)492 void trace_context::MarkOneshotBufferFull(uint64_t last_offset) {
493     SnapToEnd(0);
494 
495     // Mark the end point if not already marked.
496     uintptr_t expected_mark = 0u;
497     if (rolling_buffer_full_mark_[0].compare_exchange_strong(
498             expected_mark, last_offset,
499             std::memory_order_relaxed, std::memory_order_relaxed)) {
500         header_->rolling_data_end[0] = last_offset;
501     }
502 
503     MarkRecordDropped();
504 }
505 
MarkRollingBufferFull(uint32_t wrapped_count,uint64_t last_offset)506 void trace_context::MarkRollingBufferFull(uint32_t wrapped_count, uint64_t last_offset) {
507     // Mark the end point if not already marked.
508     int buffer_number = GetBufferNumber(wrapped_count);
509     uint64_t expected_mark = 0u;
510     if (rolling_buffer_full_mark_[buffer_number].compare_exchange_strong(
511             expected_mark, last_offset,
512             std::memory_order_relaxed, std::memory_order_relaxed)) {
513         header_->rolling_data_end[buffer_number] = last_offset;
514     }
515 }
516 
SwitchRollingBufferLocked(uint32_t prev_wrapped_count,uint64_t prev_last_offset)517 void trace_context::SwitchRollingBufferLocked(uint32_t prev_wrapped_count,
518                                               uint64_t prev_last_offset) {
519     // This has already done in streaming mode when the buffer was marked as
520     // saved, but hasn't been done yet for circular mode. KISS and just do it
521     // again. It's ok to do again as we don't resume allocating trace records
522     // until we update |rolling_buffer_current_|.
523     uint32_t new_wrapped_count = prev_wrapped_count + 1;
524     int next_buffer = GetBufferNumber(new_wrapped_count);
525     rolling_buffer_full_mark_[next_buffer].store(0, std::memory_order_relaxed);
526     header_->rolling_data_end[next_buffer] = 0;
527 
528     // Do this last: After this tracing resumes in the new buffer.
529     uint64_t new_offset_plus_counter = MakeOffsetPlusCounter(0, new_wrapped_count);
530     rolling_buffer_current_.store(new_offset_plus_counter,
531                                      std::memory_order_relaxed);
532 }
533 
MarkTracingArtificiallyStopped()534 void trace_context::MarkTracingArtificiallyStopped() {
535     // Grab the lock in part so that we don't switch buffers between
536     // |CurrentWrappedCount()| and |SnapToEnd()|.
537     fbl::AutoLock lock(&buffer_switch_mutex_);
538 
539     // Disable tracing by making it look like the current rolling
540     // buffer is full. AllocRecord, on seeing the buffer is full, will
541     // then check |tracing_artificially_stopped_|.
542     tracing_artificially_stopped_ = true;
543     SnapToEnd(CurrentWrappedCount());
544 }
545 
NotifyRollingBufferFullLocked(uint32_t wrapped_count,uint64_t durable_data_end)546 void trace_context::NotifyRollingBufferFullLocked(uint32_t wrapped_count,
547                                                   uint64_t durable_data_end) {
548     // The notification is handled on the engine's event loop as
549     // we need this done outside of the lock: Certain handlers
550     // (e.g., trace-benchmark) just want to immediately call
551     // |trace_engine_mark_buffer_saved()| which wants to reacquire
552     // the lock. Secondly, if we choose to wait until the buffer context is
553     // released before notifying the handler then we can't do so now as we
554     // still have a reference to the buffer context.
555     trace_engine_request_save_buffer(wrapped_count, durable_data_end);
556 }
557 
HandleSaveRollingBufferRequest(uint32_t wrapped_count,uint64_t durable_data_end)558 void trace_context::HandleSaveRollingBufferRequest(uint32_t wrapped_count,
559                                                    uint64_t durable_data_end) {
560     // TODO(dje): An open issue is solving the problem of TraceManager
561     // prematurely reading the buffer: We know the buffer is full, but
562     // the only way we know existing writers have completed is when
563     // they release their trace context. Fortunately we know when all
564     // context acquisitions for the purpose of writing to the buffer
565     // have been released. The question is how to use this info.
566     // For now we punt the problem to the handler. Ultimately we could
567     // provide callers with a way to wait, and have trace_release_context()
568     // check for waiters and if any are present send a signal like it does
569     // for SIGNAL_CONTEXT_RELEASED.
570     handler_->ops->notify_buffer_full(handler_, wrapped_count,
571                                       durable_data_end);
572 }
573 
MarkRollingBufferSaved(uint32_t wrapped_count,uint64_t durable_data_end)574 void trace_context::MarkRollingBufferSaved(uint32_t wrapped_count,
575                                            uint64_t durable_data_end) {
576     fbl::AutoLock lock(&buffer_switch_mutex_);
577 
578     int buffer_number = GetBufferNumber(wrapped_count);
579     {
580         // TODO(dje): Manage bad responses from TraceManager.
581         int current_buffer_number = GetBufferNumber(GetWrappedCount(rolling_buffer_current_.load(std::memory_order_relaxed)));
582         ZX_DEBUG_ASSERT(buffer_number != current_buffer_number);
583     }
584     rolling_buffer_full_mark_[buffer_number].store(0, std::memory_order_relaxed);
585     header_->rolling_data_end[buffer_number] = 0;
586     // Don't update |rolling_buffer_current_| here, that is done when we
587     // successfully allocate the next record. Until then we want to keep
588     // dropping records.
589 }
590