1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <atomic>
6 #include <stdint.h>
7 #include <string.h>
8 #include <threads.h>
9 #include <unistd.h>
10
11 #include <cobalt-client/cpp/counter-internal.h>
12 #include <cobalt-client/cpp/counter.h>
13 #include <fbl/auto_call.h>
14 #include <fbl/auto_lock.h>
15 #include <fbl/mutex.h>
16 #include <fbl/string.h>
17 #include <fbl/string_printf.h>
18 #include <fuchsia/cobalt/c/fidl.h>
19 #include <lib/fidl/cpp/vector_view.h>
20 #include <lib/sync/completion.h>
21 #include <lib/zx/time.h>
22 #include <unittest/unittest.h>
23
24 #include "fake_logger.h"
25
26 namespace cobalt_client {
27 namespace {
28
29 constexpr uint64_t kMetricId = 1;
30
31 // Number of threads spawned for multi-threaded tests.
32 constexpr uint64_t kThreads = 20;
33
34 // Component name.
35 constexpr char kComponent[] = "SomeRamdomCounterComponent";
36
37 constexpr uint32_t kEventCode = 2;
38
39 } // namespace
40
41 namespace internal {
42 namespace {
43
MakeMetricOptions()44 MetricOptions MakeMetricOptions() {
45 MetricOptions options;
46 options.SetMode(MetricOptions::Mode::kRemote);
47 options.metric_id = kMetricId;
48 options.component = kComponent;
49 options.event_code = kEventCode;
50 return options;
51 }
52
MakeRemoteMetricInfo()53 RemoteMetricInfo MakeRemoteMetricInfo() {
54 return RemoteMetricInfo::From(MakeMetricOptions());
55 }
56
MakeRemoteCounter()57 RemoteCounter MakeRemoteCounter() {
58 return RemoteCounter(MakeRemoteMetricInfo());
59 }
60
61 // Verify that increments increases the underlying count by 1.
62 // This is veryfying the default behaviour.
TestIncrement()63 bool TestIncrement() {
64 BEGIN_TEST;
65 BaseCounter<uint64_t> counter;
66
67 ASSERT_EQ(counter.Load(), 0);
68 counter.Increment();
69 ASSERT_EQ(counter.Load(), 1);
70 counter.Increment();
71 ASSERT_EQ(counter.Load(), 2);
72 END_TEST;
73 }
74
75 // Verify that increments increases the underlying count by val.
TestIncrementByVal()76 bool TestIncrementByVal() {
77 BEGIN_TEST;
78 BaseCounter<uint64_t> counter;
79
80 ASSERT_EQ(counter.Load(), 0);
81 counter.Increment(23);
82 ASSERT_EQ(counter.Load(), 23);
83 END_TEST;
84 }
85
86 // Verify that exchangest the underlying count to 0, and returns the current value.
87 // This is veryfying the default behaviour.
TestExchange()88 bool TestExchange() {
89 BEGIN_TEST;
90 BaseCounter<uint64_t> counter;
91
92 counter.Increment(24);
93 ASSERT_EQ(counter.Load(), 24);
94 EXPECT_EQ(counter.Exchange(), 24);
95 ASSERT_EQ(counter.Load(), 0);
96 END_TEST;
97 }
98
99 // Verify that exchangest the underlying count to 0, and returns the current value.
100 // This is veryfying the default behaviour.
TestExchangeByVal()101 bool TestExchangeByVal() {
102 BEGIN_TEST;
103 BaseCounter<uint64_t> counter;
104
105 counter.Increment(4);
106 ASSERT_EQ(counter.Load(), 4);
107 EXPECT_EQ(counter.Exchange(3), 4);
108 ASSERT_EQ(counter.Load(), 3);
109 EXPECT_EQ(counter.Exchange(2), 3);
110 ASSERT_EQ(counter.Load(), 2);
111 END_TEST;
112 }
113
114 struct IncrementArgs {
115 // Counter to be operated on.
116 BaseCounter<uint64_t>* counter;
117
118 // Wait for main thread to signal before we start.
119 sync_completion_t* start;
120
121 // Amount to increment the counter with.
122 BaseCounter<uint64_t>::Type value;
123 };
124
IncrementFn(void * args)125 int IncrementFn(void* args) {
126 IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
127 sync_completion_wait(increment_args->start, zx::sec(20).get());
128 for (uint64_t i = 0; i < increment_args->value; ++i) {
129 increment_args->counter->Increment(increment_args->value);
130 }
131 return thrd_success;
132 }
133
TestIncrementMultiThread()134 bool TestIncrementMultiThread() {
135 BEGIN_TEST;
136 sync_completion_t start;
137 BaseCounter<uint64_t> counter;
138 fbl::Vector<thrd_t> thread_ids;
139 IncrementArgs args[kThreads];
140
141 thread_ids.reserve(kThreads);
142 for (uint64_t i = 0; i < kThreads; ++i) {
143 thread_ids.push_back({});
144 }
145
146 for (uint64_t i = 0; i < kThreads; ++i) {
147 auto& thread_id = thread_ids[i];
148 args[i].counter = &counter;
149 args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
150 args[i].start = &start;
151 ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
152 }
153
154 // Notify threads to start incrementing the count.
155 sync_completion_signal(&start);
156
157 // Wait for all threads to finish.
158 for (const auto& thread_id : thread_ids) {
159 thrd_join(thread_id, nullptr);
160 }
161
162 // Each thread should increase the counter by a total of value^2, which yields a total of:
163 // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
164 ASSERT_EQ(counter.Load(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
165 END_TEST;
166 }
167
168 struct ExchangeArgs {
169 // Counter to be operated on.
170 BaseCounter<uint64_t>* counter;
171
172 // Accumulated value of exchanged values in the counter.
173 std::atomic<BaseCounter<uint64_t>::Type>* accumulated;
174
175 // Wait for main thread to signal before we start.
176 sync_completion_t* start;
177
178 // Amount to increment the counter with.
179 BaseCounter<uint64_t>::Type value;
180 };
181
182 // After all threads exit, all but one value has been added to the accumulated var,
183 // this is the last thread to call exchange, which is why test should add the current
184 // value of the counter to the accumulated atomic obtain a deterministic result.
ExchangeFn(void * args)185 int ExchangeFn(void* args) {
186 ExchangeArgs* exchange_args = static_cast<ExchangeArgs*>(args);
187 sync_completion_wait(exchange_args->start, zx::sec(20).get());
188 BaseCounter<uint64_t>::Type value = exchange_args->counter->Exchange(exchange_args->value);
189 exchange_args->accumulated->fetch_add(value, std::memory_order_relaxed);
190 return thrd_success;
191 }
192
193 // Verify that when exchanging all intermediate values are seen by exactly 1 thread.
194 // Everythread will exhange the seen value with their value, and add it to an atomic,
195 // the result should be the same as above except that we need to add counter.Load() +
196 // accumulated_value.
TestExchangeMultiThread()197 bool TestExchangeMultiThread() {
198 BEGIN_TEST;
199 sync_completion_t start;
200 BaseCounter<uint64_t> counter;
201 std::atomic<BaseCounter<uint64_t>::Type> accumulated(0);
202 fbl::Vector<thrd_t> thread_ids;
203 ExchangeArgs args[kThreads];
204
205 thread_ids.reserve(kThreads);
206 for (uint64_t i = 0; i < kThreads; ++i) {
207 thread_ids.push_back({});
208 }
209
210 for (uint64_t i = 0; i < kThreads; ++i) {
211 auto& thread_id = thread_ids[i];
212 args[i].counter = &counter;
213 args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
214 args[i].start = &start;
215 args[i].accumulated = &accumulated;
216 ASSERT_EQ(thrd_create(&thread_id, ExchangeFn, &args[i]), thrd_success);
217 }
218
219 // Notify threads to start incrementing the count.
220 sync_completion_signal(&start);
221
222 // Wait for all threads to finish.
223 for (const auto& thread_id : thread_ids) {
224 thrd_join(thread_id, nullptr);
225 }
226
227 // Each thread should increase the counter by a total of value, which yields a total of:
228 // kThreads * (kThreads + 1)/ 2 = Sum(i=1, kThreads) i
229 ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
230 kThreads * (kThreads + 1) / 2);
231 END_TEST;
232 }
233
234 // Verify flushed values match and the delta is set to 0.
TestFlush()235 bool TestFlush() {
236 BEGIN_TEST;
237 RemoteCounter counter = MakeRemoteCounter();
238 counter.Increment(20);
239 RemoteMetricInfo actual_metric_info;
240 RemoteMetricInfo expected_metric_info = MakeRemoteMetricInfo();
241 FakeLogger logger;
242 logger.set_should_fail(false);
243 int64_t actual_count;
244
245 // Check that all data is present, we abuse some implementation details which guarantee
246 // that metadata is first in the flushed values, and the last element is the event_data we
247 // are measuring, which adds some restrictions to the internal implementation, but makes the
248 // test cleaner and readable.
249 ASSERT_TRUE(counter.Flush(&logger));
250
251 actual_metric_info = logger.logged_counts()[0].metric_info;
252 actual_count = logger.logged_counts()[0].count;
253
254 // We capture the values and then verify outside to avoid having to pass flag around,
255 // and have more descriptive messages on errors.
256 ASSERT_TRUE(actual_metric_info == expected_metric_info);
257 ASSERT_EQ(actual_count, 20);
258 ASSERT_EQ(counter.Load(), 0);
259 END_TEST;
260 }
261
262 // Verify that the metadata used to create the counter is part of the flushes observation
263 // and that the current value of the counter is correct, plus resets to 0 after flush.
TestUndoFlush()264 bool TestUndoFlush() {
265 BEGIN_TEST;
266 RemoteCounter counter = MakeRemoteCounter();
267 FakeLogger logger;
268 logger.set_should_fail(false);
269
270 counter.Increment(20);
271 ASSERT_TRUE(counter.Flush(&logger));
272 ASSERT_EQ(counter.Load(), 0);
273 counter.UndoFlush();
274 ASSERT_EQ(counter.Load(), 20);
275 END_TEST;
276 }
277
278 struct FlushArgs {
279 // Counter to be incremented or flushed by a given thread.
280 RemoteCounter* counter;
281
282 // Used to make the threads wait until all have been initialized.
283 sync_completion_t* start;
284
285 // Flushed accumulated value.
286 std::atomic<RemoteCounter::Type>* accumulated;
287
288 // Number of times to perform the operation.
289 size_t operation_count = 0;
290
291 // Flush operations are thread-compatible and we use a mutex
292 // to provide a barrier. Interaction between other operations
293 // and flushes should remain thread-safe and eventually consistent.
294 fbl::Mutex* flush_mutex = nullptr;
295
296 // Whether the thread should flush or increment.
297 bool flush = false;
298 };
299
FlushFn(void * args)300 int FlushFn(void* args) {
301 FlushArgs* flush_args = static_cast<FlushArgs*>(args);
302 sync_completion_wait(flush_args->start, zx::sec(20).get());
303 FakeLogger logger;
304 for (size_t i = 0; i < flush_args->operation_count; ++i) {
305 if (flush_args->flush) {
306 fbl::AutoLock lock(flush_args->flush_mutex);
307 flush_args->counter->Flush(&logger);
308 } else {
309 flush_args->counter->Increment();
310 }
311 }
312
313 if (flush_args->flush) {
314 for (auto count_entry : logger.logged_counts()) {
315 flush_args->accumulated->fetch_add(count_entry.count, std::memory_order_relaxed);
316 }
317 }
318 return thrd_success;
319 }
320
321 // Verify the consistency calling flush from multiple threads. There will be kThreads incrementing
322 // the counter, kThreads flushing, and at the end we flush again, and the accumulated counter should
323 // be equal to the total |kThreads| (|kThreads| + 1) / 2.
TestFlushMultithread()324 bool TestFlushMultithread() {
325 BEGIN_TEST;
326 sync_completion_t start;
327 RemoteCounter counter = MakeRemoteCounter();
328 std::atomic<BaseCounter<int64_t>::Type> accumulated(0);
329 fbl::Vector<thrd_t> thread_ids;
330 fbl::Mutex flush_mutex;
331 FlushArgs args[kThreads];
332
333 thread_ids.reserve(kThreads);
334 for (uint64_t i = 0; i < kThreads; ++i) {
335 thread_ids.push_back({});
336 }
337
338 for (uint64_t i = 0; i < kThreads; ++i) {
339 auto& thread_id = thread_ids[i];
340 args[i].counter = &counter;
341 args[i].operation_count = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
342 args[i].start = &start;
343 args[i].accumulated = &accumulated;
344 args[i].flush = i % 2;
345 args[i].flush_mutex = &flush_mutex;
346 ASSERT_EQ(thrd_create(&thread_id, FlushFn, &args[i]), thrd_success);
347 }
348
349 // Notify threads to start incrementing the count.
350 sync_completion_signal(&start);
351
352 // Wait for all threads to finish.
353 for (const auto& thread_id : thread_ids) {
354 thrd_join(thread_id, nullptr);
355 }
356
357 // The total number of increment is the sum of odd numbers from 1 to 20 so
358 // |ceil(kThreads/2)|^2.
359 constexpr size_t ceil_threads = (kThreads / 2) + kThreads % 2;
360
361 // Since the last thread to finish might not have flushed, we verify that the total of whats
362 // left, plust what we have accumulated equals the expected amount.
363 ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
364 ceil_threads * ceil_threads);
365 END_TEST;
366 }
367
368 BEGIN_TEST_CASE(BaseCounterTest)
369 RUN_TEST(TestIncrement)
370 RUN_TEST(TestIncrementByVal)
371 RUN_TEST(TestExchange)
372 RUN_TEST(TestExchangeByVal)
373 RUN_TEST(TestIncrementMultiThread)
374 RUN_TEST(TestExchangeMultiThread)
375 END_TEST_CASE(BaseCounterTest)
376
377 BEGIN_TEST_CASE(RemoteCounterTest)
378 RUN_TEST(TestFlush)
379 RUN_TEST(TestUndoFlush)
380 RUN_TEST(TestFlushMultithread)
381 END_TEST_CASE(RemoteCounterTest)
382
383 } // namespace
384 } // namespace internal
385
386 namespace {
387
TestIncrement()388 bool TestIncrement() {
389 BEGIN_TEST;
390 Counter counter(internal::MakeMetricOptions());
391
392 ASSERT_EQ(counter.GetRemoteCount(), 0);
393 counter.Increment();
394 ASSERT_EQ(counter.GetRemoteCount(), 1);
395 counter.Increment(24);
396 ASSERT_EQ(counter.GetRemoteCount(), 25);
397 END_TEST;
398 }
399
400 struct IncrementArgs {
401 // Counter to be incremented.
402 Counter* counter;
403
404 // Number of times to call increment.
405 size_t times = 0;
406
407 // Signals threads to start incrementing.
408 sync_completion_t* start;
409 };
410
IncrementFn(void * args)411 int IncrementFn(void* args) {
412 IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
413 sync_completion_wait(increment_args->start, zx::sec(20).get());
414
415 for (size_t i = 0; i < increment_args->times; ++i) {
416 increment_args->counter->Increment(increment_args->times);
417 }
418 return thrd_success;
419 }
420
TestIncrementMultiThread()421 bool TestIncrementMultiThread() {
422 BEGIN_TEST;
423 sync_completion_t start;
424 Counter counter(internal::MakeMetricOptions());
425 fbl::Vector<thrd_t> thread_ids;
426 IncrementArgs args[kThreads];
427
428 thread_ids.reserve(kThreads);
429 for (uint64_t i = 0; i < kThreads; ++i) {
430 thread_ids.push_back({});
431 }
432
433 for (uint64_t i = 0; i < kThreads; ++i) {
434 auto& thread_id = thread_ids[i];
435 args[i].counter = &counter;
436 args[i].times = static_cast<Counter::Count>(i + 1);
437 args[i].start = &start;
438 ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
439 }
440
441 // Notify threads to start incrementing the count.
442 sync_completion_signal(&start);
443
444 // Wait for all threads to finish.
445 for (const auto& thread_id : thread_ids) {
446 thrd_join(thread_id, nullptr);
447 }
448
449 // Each thread should increase the counter by a total of value^2, which yields a total of:
450 // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
451 ASSERT_EQ(counter.GetRemoteCount(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
452 END_TEST;
453 }
454
455 BEGIN_TEST_CASE(CounterTest)
456 RUN_TEST(TestIncrement)
457 RUN_TEST(TestIncrementMultiThread)
458 END_TEST_CASE(CounterTest)
459
460 } // namespace
461 } // namespace cobalt_client
462