1 // Copyright 2017 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Notes on buffering modes
6 // ------------------------
7 //
8 // Threads and strings are cached to improve performance and reduce buffer
9 // usage. The caching involves emitting separate records that identify
10 // threads/strings and then referring to them by a numeric id. For performance
11 // each thread in the application maintains its own cache.
12 //
13 // Oneshot: The trace buffer is just one large buffer, and records are written
14 // until the buffer is full after which all further records are dropped.
15 //
16 // Circular:
17 // The trace buffer is effectively divided into two pieces, and tracing begins
18 // by writing to the first piece. Once one buffer fills we start writing to the
19 // other one. This results in half the buffer being dropped at every switch,
20 // but simplifies things because we don't have to worry about varying record
21 // lengths.
22 //
23 // Streaming:
24 // The trace buffer is effectively divided into two pieces, and tracing begins
25 // by writing to the first piece. Once one buffer fills we start writing to the
26 // other buffer, if it is available, and notify the handler that the buffer is
27 // full. If the other buffer is not available, then records are dropped until
28 // it becomes available. The other buffer is unavailable between the point when
29 // it filled and when the handler reports back that the buffer's contents have
30 // been saved.
31 //
32 // There are two important properties we wish to preserve in circular and
33 // streaming modes:
34 // 1) We don't want records describing threads and strings to be dropped:
35 // otherwise records referring to them will have nothing to refer to.
36 // 2) We don't want thread records to be dropped at all: Fidelity of recording
37 // of all traced threads is important, even if some of their records are
38 // dropped.
39 // To implement both (1) and (2) we introduce a third buffer that holds
40 // records we don't want to drop called the "durable buffer". Threads and
41 // small strings are recorded there. The two buffers holding normal trace
42 // output are called "rolling buffers", as they fill we roll from one to the
43 // next. Thread and string records typically aren't very large, the durable
44 // buffer can hold a lot of records. To keep things simple, until there's a
45 // compelling reason to do something more, once the durable buffer fills
46 // tracing effectively stops, and all further records are dropped.
47 // Note: The term "rolling buffer" is intended to be internal to the trace
48 // engine/reader/manager and is not intended to appear in public APIs
49 // (at least not today).
50 //
51 // The protocol between the trace engine and the handler for saving buffers in
52 // streaming mode is as follows:
53 // 1) Buffer fills -> handler gets notified via
54 // |trace_handler_ops::notify_buffer_full()|. Two arguments are passed
55 // along with this request:
56 // |wrapped_count| - records how many times tracing has wrapped from one
57 // buffer to the next, and also records the current buffer which is the one
58 // needing saving. Since there are two rolling buffers, the buffer to save
59 // is |wrapped_count & 1|.
60 // |durable_data_end| - records how much data has been written to the
61 // durable buffer thus far. This data needs to be written before data from
62 // the rolling buffers is written so string and thread references work.
63 // 2) The handler receives the "notify_buffer_full" request.
64 // 3) The handler saves new durable data since the last time, saves the
65 // rolling buffer, and replies back to the engine via
66 // |trace_engine_mark_buffer_saved()|.
67 // 4) The engine receives this notification and marks the buffer as now empty.
68 // The next time the engine tries to allocate space from this buffer it will
69 // succeed.
70 // Note that the handler is free to save buffers at whatever rate it can
71 // manage. The protocol allows for records to be dropped if buffers can't be
72 // saved fast enough.
73
74 #include "context_impl.h"
75
76 #include <assert.h>
77 #include <inttypes.h>
78
79 #include <fbl/auto_lock.h>
80 #include <trace-engine/fields.h>
81 #include <trace-engine/handler.h>
82
83 #include <atomic>
84
85 namespace trace {
86 namespace {
87
88 // The next context generation number.
89 std::atomic<uint32_t> g_next_generation{1u};
90
91 } // namespace
92 } // namespace trace
93
trace_context(void * buffer,size_t buffer_num_bytes,trace_buffering_mode_t buffering_mode,trace_handler_t * handler)94 trace_context::trace_context(void* buffer, size_t buffer_num_bytes,
95 trace_buffering_mode_t buffering_mode,
96 trace_handler_t* handler)
97 : generation_(trace::g_next_generation.fetch_add(1u, std::memory_order_relaxed) + 1u),
98 buffering_mode_(buffering_mode),
99 buffer_start_(reinterpret_cast<uint8_t*>(buffer)),
100 buffer_end_(buffer_start_ + buffer_num_bytes),
101 header_(reinterpret_cast<trace_buffer_header*>(buffer)),
102 handler_(handler) {
103 ZX_DEBUG_ASSERT(buffer_num_bytes >= kMinPhysicalBufferSize);
104 ZX_DEBUG_ASSERT(buffer_num_bytes <= kMaxPhysicalBufferSize);
105 ZX_DEBUG_ASSERT(generation_ != 0u);
106 ComputeBufferSizes();
107 }
108
109 trace_context::~trace_context() = default;
110
AllocRecord(size_t num_bytes)111 uint64_t* trace_context::AllocRecord(size_t num_bytes) {
112 ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
113 if (unlikely(num_bytes > TRACE_ENCODED_RECORD_MAX_LENGTH))
114 return nullptr;
115 static_assert(TRACE_ENCODED_RECORD_MAX_LENGTH < kMaxRollingBufferSize, "");
116
117 // For the circular and streaming cases, try at most once for each buffer.
118 // Note: Keep the normal case of one successful pass the fast path.
119 // E.g., We don't do a mode comparison unless we have to.
120
121 for (int iter = 0; iter < 2; ++iter) {
122 // TODO(dje): This can be optimized a bit. Later.
123 uint64_t offset_plus_counter =
124 rolling_buffer_current_.fetch_add(num_bytes,
125 std::memory_order_relaxed);
126 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
127 int buffer_number = GetBufferNumber(wrapped_count);
128 uint64_t buffer_offset = GetBufferOffset(offset_plus_counter);
129 // Note: There's no worry of an overflow in the calcs here.
130 if (likely(buffer_offset + num_bytes <= rolling_buffer_size_)) {
131 uint8_t* ptr = rolling_buffer_start_[buffer_number] + buffer_offset;
132 return reinterpret_cast<uint64_t*>(ptr); // success!
133 }
134
135 // Buffer is full!
136
137 switch (buffering_mode_) {
138 case TRACE_BUFFERING_MODE_ONESHOT:
139 ZX_DEBUG_ASSERT(iter == 0);
140 ZX_DEBUG_ASSERT(wrapped_count == 0);
141 ZX_DEBUG_ASSERT(buffer_number == 0);
142 MarkOneshotBufferFull(buffer_offset);
143 return nullptr;
144 case TRACE_BUFFERING_MODE_STREAMING: {
145 MarkRollingBufferFull(wrapped_count, buffer_offset);
146 // If the TraceManager is slow in saving buffers we could get
147 // here a lot. Do a quick check and early exit for this case.
148 if (unlikely(!IsOtherRollingBufferReady(buffer_number))) {
149 MarkRecordDropped();
150 StreamingBufferFullCheck(wrapped_count, buffer_offset);
151 return nullptr;
152 }
153 break;
154 }
155 case TRACE_BUFFERING_MODE_CIRCULAR:
156 MarkRollingBufferFull(wrapped_count, buffer_offset);
157 break;
158 default:
159 __UNREACHABLE;
160 }
161
162 if (iter == 1) {
163 // Second time through. We tried one buffer, it was full.
164 // We then switched to the other buffer, which was empty at
165 // the time, and now it is full too. This is technically
166 // possible in either circular or streaming modes, but rare.
167 // There are two possibilities here:
168 // 1) Keep trying (gated by some means).
169 // 2) Drop the record.
170 // In order to not introduce excessive latency into the app
171 // we choose (2). To assist the developer we at least provide
172 // a record that this happened, but since it's rare we keep
173 // it simple and maintain just a global count and no time
174 // information.
175 num_records_dropped_after_buffer_switch_.fetch_add(1, std::memory_order_relaxed);
176 return nullptr;
177 }
178
179 if (!SwitchRollingBuffer(wrapped_count, buffer_offset)) {
180 MarkRecordDropped();
181 return nullptr;
182 }
183
184 // Loop and try again.
185 }
186
187 __UNREACHABLE;
188 }
189
StreamingBufferFullCheck(uint32_t wrapped_count,uint64_t buffer_offset)190 void trace_context::StreamingBufferFullCheck(uint32_t wrapped_count,
191 uint64_t buffer_offset) {
192 // We allow the current offset to grow and grow as each
193 // new tracing request is made: It's a trade-off to not penalize
194 // performance in this case. The number of counter bits is enough
195 // to not make this a concern: See the comments for
196 // |kUsableBufferOffsetBits|.
197 //
198 // As an absolute paranoia check, if the current buffer offset
199 // approaches overflow, grab the lock and snap the offset back
200 // to the end of the buffer. We grab the lock so that the
201 // buffer can't change while we're doing this.
202 if (unlikely(buffer_offset > MaxUsableBufferOffset())) {
203 fbl::AutoLock lock(&buffer_switch_mutex_);
204 uint32_t current_wrapped_count = CurrentWrappedCount();
205 if (GetBufferNumber(current_wrapped_count) ==
206 GetBufferNumber(wrapped_count)) {
207 SnapToEnd(wrapped_count);
208 }
209 }
210 }
211
212 // Returns false if there's some reason to not record this record.
213
SwitchRollingBuffer(uint32_t wrapped_count,uint64_t buffer_offset)214 bool trace_context::SwitchRollingBuffer(uint32_t wrapped_count,
215 uint64_t buffer_offset) {
216 // While atomic variables are used to track things, we switch
217 // buffers under the lock due to multiple pieces of state being
218 // changed.
219 fbl::AutoLock lock(&buffer_switch_mutex_);
220
221 // If the durable buffer happened to fill while we were waiting for
222 // the lock we're done.
223 if (unlikely(tracing_artificially_stopped_)) {
224 return false;
225 }
226
227 uint32_t current_wrapped_count = CurrentWrappedCount();
228 // Anything allocated to the durable buffer after this point
229 // won't be for this buffer. This is racy, but all we need is
230 // some usable value for where the durable pointer is.
231 uint64_t durable_data_end = DurableBytesAllocated();
232
233 ZX_DEBUG_ASSERT(wrapped_count <= current_wrapped_count);
234 if (likely(wrapped_count == current_wrapped_count)) {
235 // Haven't switched buffers yet.
236 if (buffering_mode_ == TRACE_BUFFERING_MODE_STREAMING) {
237 // Is the other buffer ready?
238 if (!IsOtherRollingBufferReady(GetBufferNumber(wrapped_count))) {
239 // Nope. There are two possibilities here:
240 // 1) Wait for the other buffer to be saved.
241 // 2) Start dropping records until the other buffer is
242 // saved.
243 // In order to not introduce excessive latency into the
244 // app we choose (2). To assist the developer we at
245 // least provide a record that indicates the window
246 // during which we dropped records.
247 // TODO(dje): Maybe have a future mode where we block
248 // until there's space. This is useful during some
249 // kinds of debugging: Something is going wrong and we
250 // care less about performance and more about keeping
251 // data, and the dropped data may be the clue to find
252 // the bug.
253 return false;
254 }
255
256 SwitchRollingBufferLocked(wrapped_count, buffer_offset);
257
258 // Notify the handler so it starts saving the buffer if
259 // we're in streaming mode.
260 // Note: The actual notification must be done *after*
261 // updating the buffer header: we need trace_manager to
262 // see the updates. The handler will get notified on the
263 // engine's async loop (and thus can't call back into us
264 // while we still have the lock).
265 NotifyRollingBufferFullLocked(wrapped_count, durable_data_end);
266 } else {
267 SwitchRollingBufferLocked(wrapped_count, buffer_offset);
268 }
269 } else {
270 // Someone else switched buffers while we were trying to obtain
271 // the lock. Nothing to do here.
272 }
273
274 return true;
275 }
276
AllocDurableRecord(size_t num_bytes)277 uint64_t* trace_context::AllocDurableRecord(size_t num_bytes) {
278 ZX_DEBUG_ASSERT(UsingDurableBuffer());
279 ZX_DEBUG_ASSERT((num_bytes & 7) == 0);
280
281 uint64_t buffer_offset =
282 durable_buffer_current_.fetch_add(num_bytes,
283 std::memory_order_relaxed);
284 if (likely(buffer_offset + num_bytes <= durable_buffer_size_)) {
285 uint8_t* ptr = durable_buffer_start_ + buffer_offset;
286 return reinterpret_cast<uint64_t*>(ptr); // success!
287 }
288
289 // Buffer is full!
290 MarkDurableBufferFull(buffer_offset);
291
292 return nullptr;
293 }
294
AllocThreadIndex(trace_thread_index_t * out_index)295 bool trace_context::AllocThreadIndex(trace_thread_index_t* out_index) {
296 trace_thread_index_t index = next_thread_index_.fetch_add(1u, std::memory_order_relaxed);
297 if (unlikely(index > TRACE_ENCODED_THREAD_REF_MAX_INDEX)) {
298 // Guard again possible wrapping.
299 next_thread_index_.store(TRACE_ENCODED_THREAD_REF_MAX_INDEX + 1u,
300 std::memory_order_relaxed);
301 return false;
302 }
303 *out_index = index;
304 return true;
305 }
306
AllocStringIndex(trace_string_index_t * out_index)307 bool trace_context::AllocStringIndex(trace_string_index_t* out_index) {
308 trace_string_index_t index = next_string_index_.fetch_add(1u, std::memory_order_relaxed);
309 if (unlikely(index > TRACE_ENCODED_STRING_REF_MAX_INDEX)) {
310 // Guard again possible wrapping.
311 next_string_index_.store(TRACE_ENCODED_STRING_REF_MAX_INDEX + 1u,
312 std::memory_order_relaxed);
313 return false;
314 }
315 *out_index = index;
316 return true;
317 }
318
ComputeBufferSizes()319 void trace_context::ComputeBufferSizes() {
320 size_t full_buffer_size = buffer_end_ - buffer_start_;
321 ZX_DEBUG_ASSERT(full_buffer_size >= kMinPhysicalBufferSize);
322 ZX_DEBUG_ASSERT(full_buffer_size <= kMaxPhysicalBufferSize);
323 size_t header_size = sizeof(trace_buffer_header);
324 switch (buffering_mode_) {
325 case TRACE_BUFFERING_MODE_ONESHOT:
326 // Create one big buffer, where durable and non-durable records share
327 // the same buffer. There is no separate buffer for durable records.
328 durable_buffer_start_ = nullptr;
329 durable_buffer_size_ = 0;
330 rolling_buffer_start_[0] = buffer_start_ + header_size;
331 rolling_buffer_size_ = full_buffer_size - header_size;
332 // The second rolling buffer is not used.
333 rolling_buffer_start_[1] = nullptr;
334 break;
335 case TRACE_BUFFERING_MODE_CIRCULAR:
336 case TRACE_BUFFERING_MODE_STREAMING: {
337 // Rather than make things more complex on the user, at least for now,
338 // we choose the sizes of the durable and rolling buffers.
339 // Note: The durable buffer must have enough space for at least
340 // the initialization record.
341 // TODO(dje): The current choices are wip.
342 uint64_t avail = full_buffer_size - header_size;
343 uint64_t durable_buffer_size = GET_DURABLE_BUFFER_SIZE(avail);
344 if (durable_buffer_size > kMaxDurableBufferSize)
345 durable_buffer_size = kMaxDurableBufferSize;
346 // Further adjust |durable_buffer_size| to ensure all buffers are a
347 // multiple of 8. |full_buffer_size| is guaranteed by
348 // |trace_start_engine()| to be a multiple of 4096. We only assume
349 // header_size is a multiple of 8. In order for rolling_buffer_size
350 // to be a multiple of 8 we need (avail - durable_buffer_size) to be a
351 // multiple of 16. Round durable_buffer_size up as necessary.
352 uint64_t off_by = (avail - durable_buffer_size) & 15;
353 ZX_DEBUG_ASSERT(off_by == 0 || off_by == 8);
354 durable_buffer_size += off_by;
355 ZX_DEBUG_ASSERT((durable_buffer_size & 7) == 0);
356 // The value of |kMinPhysicalBufferSize| ensures this:
357 ZX_DEBUG_ASSERT(durable_buffer_size >= kMinDurableBufferSize);
358 uint64_t rolling_buffer_size = (avail - durable_buffer_size) / 2;
359 ZX_DEBUG_ASSERT((rolling_buffer_size & 7) == 0);
360 // We need to maintain the invariant that the entire buffer is used.
361 // This works if the buffer size is a multiple of
362 // sizeof(trace_buffer_header), which is true since the buffer is a
363 // vmo (some number of 4K pages).
364 ZX_DEBUG_ASSERT(durable_buffer_size + 2 * rolling_buffer_size == avail);
365 durable_buffer_start_ = buffer_start_ + header_size;
366 durable_buffer_size_ = durable_buffer_size;
367 rolling_buffer_start_[0] = durable_buffer_start_ + durable_buffer_size_;
368 rolling_buffer_start_[1] = rolling_buffer_start_[0] + rolling_buffer_size;
369 rolling_buffer_size_ = rolling_buffer_size;
370 break;
371 }
372 default:
373 __UNREACHABLE;
374 }
375
376 durable_buffer_current_.store(0);
377 durable_buffer_full_mark_.store(0);
378 rolling_buffer_current_.store(0);
379 rolling_buffer_full_mark_[0].store(0);
380 rolling_buffer_full_mark_[1].store(0);
381 }
382
InitBufferHeader()383 void trace_context::InitBufferHeader() {
384 memset(header_, 0, sizeof(*header_));
385
386 header_->magic = TRACE_BUFFER_HEADER_MAGIC;
387 header_->version = TRACE_BUFFER_HEADER_V0;
388 header_->buffering_mode = static_cast<uint8_t>(buffering_mode_);
389 header_->total_size = buffer_end_ - buffer_start_;
390 header_->durable_buffer_size = durable_buffer_size_;
391 header_->rolling_buffer_size = rolling_buffer_size_;
392 }
393
UpdateBufferHeaderAfterStopped()394 void trace_context::UpdateBufferHeaderAfterStopped() {
395 // If the buffer filled, then the current pointer is "snapped" to the end.
396 // Therefore in that case we need to use the buffer_full_mark.
397 uint64_t durable_last_offset = durable_buffer_current_.load(std::memory_order_relaxed);
398 uint64_t durable_buffer_full_mark = durable_buffer_full_mark_.load(std::memory_order_relaxed);
399 if (durable_buffer_full_mark != 0)
400 durable_last_offset = durable_buffer_full_mark;
401 header_->durable_data_end = durable_last_offset;
402
403 uint64_t offset_plus_counter =
404 rolling_buffer_current_.load(std::memory_order_relaxed);
405 uint64_t last_offset = GetBufferOffset(offset_plus_counter);
406 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
407 header_->wrapped_count = wrapped_count;
408 int buffer_number = GetBufferNumber(wrapped_count);
409 uint64_t buffer_full_mark = rolling_buffer_full_mark_[buffer_number].load(std::memory_order_relaxed);
410 if (buffer_full_mark != 0)
411 last_offset = buffer_full_mark;
412 header_->rolling_data_end[buffer_number] = last_offset;
413
414 header_->num_records_dropped = num_records_dropped();
415 }
416
RollingBytesAllocated() const417 size_t trace_context::RollingBytesAllocated() const {
418 switch (buffering_mode_) {
419 case TRACE_BUFFERING_MODE_ONESHOT: {
420 // There is a window during the processing of buffer-full where
421 // |rolling_buffer_current_| may point beyond the end of the buffer.
422 // This is ok, we don't promise anything better.
423 uint64_t full_bytes = rolling_buffer_full_mark_[0].load(std::memory_order_relaxed);
424 if (full_bytes != 0)
425 return full_bytes;
426 return rolling_buffer_current_.load(std::memory_order_relaxed);
427 }
428 case TRACE_BUFFERING_MODE_CIRCULAR:
429 case TRACE_BUFFERING_MODE_STREAMING: {
430 // Obtain the lock so that the buffers aren't switched on us while
431 // we're trying to compute the total.
432 fbl::AutoLock lock(&buffer_switch_mutex_);
433 uint64_t offset_plus_counter =
434 rolling_buffer_current_.load(std::memory_order_relaxed);
435 uint32_t wrapped_count = GetWrappedCount(offset_plus_counter);
436 int buffer_number = GetBufferNumber(wrapped_count);
437 // Note: If we catch things at the point where the buffer has
438 // filled, but before we swap buffers, then |buffer_offset| can point
439 // beyond the end. This is ok, we don't promise anything better.
440 uint64_t buffer_offset = GetBufferOffset(offset_plus_counter);
441 if (wrapped_count == 0)
442 return buffer_offset;
443 // We've wrapped at least once, so the other buffer's "full mark"
444 // must be set. However, it may be zero if streaming and we happened
445 // to stop at a point where the buffer was saved, and hasn't
446 // subsequently been written to.
447 uint64_t full_mark_other_buffer = rolling_buffer_full_mark_[!buffer_number].load(std::memory_order_relaxed);
448 return full_mark_other_buffer + buffer_offset;
449 }
450 default:
451 __UNREACHABLE;
452 }
453 }
454
DurableBytesAllocated() const455 size_t trace_context::DurableBytesAllocated() const {
456 // Note: This will return zero in oneshot mode (as it should).
457 uint64_t offset = durable_buffer_full_mark_.load(std::memory_order_relaxed);
458 if (offset == 0)
459 offset = durable_buffer_current_.load(std::memory_order_relaxed);
460 return offset;
461 }
462
MarkDurableBufferFull(uint64_t last_offset)463 void trace_context::MarkDurableBufferFull(uint64_t last_offset) {
464 // Snap to the endpoint to reduce likelihood of pointer wrap-around.
465 // Otherwise each new attempt fill continually increase the offset.
466 durable_buffer_current_.store(reinterpret_cast<uint64_t>(durable_buffer_size_),
467 std::memory_order_relaxed);
468
469 // Mark the end point if not already marked.
470 uintptr_t expected_mark = 0u;
471 if (durable_buffer_full_mark_.compare_exchange_strong(
472 expected_mark, last_offset,
473 std::memory_order_relaxed, std::memory_order_relaxed)) {
474 fprintf(stderr, "TraceEngine: durable buffer full @offset %" PRIu64 "\n",
475 last_offset);
476 header_->durable_data_end = last_offset;
477
478 // A record may be written that relies on this durable record.
479 // To preserve data integrity, we disable all further tracing.
480 // There is a small window where a non-durable record could get
481 // emitted that depends on this durable record. It's rare
482 // enough and inconsequential enough that we ignore it.
483 // TODO(dje): Another possibility is we could let tracing
484 // continue and start allocating future durable records in the
485 // rolling buffers, and accept potentially lost durable
486 // records. Another possibility is to remove the durable buffer,
487 // and, say, have separate caches for each rolling buffer.
488 MarkTracingArtificiallyStopped();
489 }
490 }
491
MarkOneshotBufferFull(uint64_t last_offset)492 void trace_context::MarkOneshotBufferFull(uint64_t last_offset) {
493 SnapToEnd(0);
494
495 // Mark the end point if not already marked.
496 uintptr_t expected_mark = 0u;
497 if (rolling_buffer_full_mark_[0].compare_exchange_strong(
498 expected_mark, last_offset,
499 std::memory_order_relaxed, std::memory_order_relaxed)) {
500 header_->rolling_data_end[0] = last_offset;
501 }
502
503 MarkRecordDropped();
504 }
505
MarkRollingBufferFull(uint32_t wrapped_count,uint64_t last_offset)506 void trace_context::MarkRollingBufferFull(uint32_t wrapped_count, uint64_t last_offset) {
507 // Mark the end point if not already marked.
508 int buffer_number = GetBufferNumber(wrapped_count);
509 uint64_t expected_mark = 0u;
510 if (rolling_buffer_full_mark_[buffer_number].compare_exchange_strong(
511 expected_mark, last_offset,
512 std::memory_order_relaxed, std::memory_order_relaxed)) {
513 header_->rolling_data_end[buffer_number] = last_offset;
514 }
515 }
516
SwitchRollingBufferLocked(uint32_t prev_wrapped_count,uint64_t prev_last_offset)517 void trace_context::SwitchRollingBufferLocked(uint32_t prev_wrapped_count,
518 uint64_t prev_last_offset) {
519 // This has already done in streaming mode when the buffer was marked as
520 // saved, but hasn't been done yet for circular mode. KISS and just do it
521 // again. It's ok to do again as we don't resume allocating trace records
522 // until we update |rolling_buffer_current_|.
523 uint32_t new_wrapped_count = prev_wrapped_count + 1;
524 int next_buffer = GetBufferNumber(new_wrapped_count);
525 rolling_buffer_full_mark_[next_buffer].store(0, std::memory_order_relaxed);
526 header_->rolling_data_end[next_buffer] = 0;
527
528 // Do this last: After this tracing resumes in the new buffer.
529 uint64_t new_offset_plus_counter = MakeOffsetPlusCounter(0, new_wrapped_count);
530 rolling_buffer_current_.store(new_offset_plus_counter,
531 std::memory_order_relaxed);
532 }
533
MarkTracingArtificiallyStopped()534 void trace_context::MarkTracingArtificiallyStopped() {
535 // Grab the lock in part so that we don't switch buffers between
536 // |CurrentWrappedCount()| and |SnapToEnd()|.
537 fbl::AutoLock lock(&buffer_switch_mutex_);
538
539 // Disable tracing by making it look like the current rolling
540 // buffer is full. AllocRecord, on seeing the buffer is full, will
541 // then check |tracing_artificially_stopped_|.
542 tracing_artificially_stopped_ = true;
543 SnapToEnd(CurrentWrappedCount());
544 }
545
NotifyRollingBufferFullLocked(uint32_t wrapped_count,uint64_t durable_data_end)546 void trace_context::NotifyRollingBufferFullLocked(uint32_t wrapped_count,
547 uint64_t durable_data_end) {
548 // The notification is handled on the engine's event loop as
549 // we need this done outside of the lock: Certain handlers
550 // (e.g., trace-benchmark) just want to immediately call
551 // |trace_engine_mark_buffer_saved()| which wants to reacquire
552 // the lock. Secondly, if we choose to wait until the buffer context is
553 // released before notifying the handler then we can't do so now as we
554 // still have a reference to the buffer context.
555 trace_engine_request_save_buffer(wrapped_count, durable_data_end);
556 }
557
HandleSaveRollingBufferRequest(uint32_t wrapped_count,uint64_t durable_data_end)558 void trace_context::HandleSaveRollingBufferRequest(uint32_t wrapped_count,
559 uint64_t durable_data_end) {
560 // TODO(dje): An open issue is solving the problem of TraceManager
561 // prematurely reading the buffer: We know the buffer is full, but
562 // the only way we know existing writers have completed is when
563 // they release their trace context. Fortunately we know when all
564 // context acquisitions for the purpose of writing to the buffer
565 // have been released. The question is how to use this info.
566 // For now we punt the problem to the handler. Ultimately we could
567 // provide callers with a way to wait, and have trace_release_context()
568 // check for waiters and if any are present send a signal like it does
569 // for SIGNAL_CONTEXT_RELEASED.
570 handler_->ops->notify_buffer_full(handler_, wrapped_count,
571 durable_data_end);
572 }
573
MarkRollingBufferSaved(uint32_t wrapped_count,uint64_t durable_data_end)574 void trace_context::MarkRollingBufferSaved(uint32_t wrapped_count,
575 uint64_t durable_data_end) {
576 fbl::AutoLock lock(&buffer_switch_mutex_);
577
578 int buffer_number = GetBufferNumber(wrapped_count);
579 {
580 // TODO(dje): Manage bad responses from TraceManager.
581 int current_buffer_number = GetBufferNumber(GetWrappedCount(rolling_buffer_current_.load(std::memory_order_relaxed)));
582 ZX_DEBUG_ASSERT(buffer_number != current_buffer_number);
583 }
584 rolling_buffer_full_mark_[buffer_number].store(0, std::memory_order_relaxed);
585 header_->rolling_data_end[buffer_number] = 0;
586 // Don't update |rolling_buffer_current_| here, that is done when we
587 // successfully allocate the next record. Until then we want to keep
588 // dropping records.
589 }
590