1 // Copyright 2018 The Fuchsia Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <atomic>
6 #include <stdint.h>
7 #include <string.h>
8 #include <threads.h>
9 #include <unistd.h>
10 
11 #include <cobalt-client/cpp/counter-internal.h>
12 #include <cobalt-client/cpp/counter.h>
13 #include <fbl/auto_call.h>
14 #include <fbl/auto_lock.h>
15 #include <fbl/mutex.h>
16 #include <fbl/string.h>
17 #include <fbl/string_printf.h>
18 #include <fuchsia/cobalt/c/fidl.h>
19 #include <lib/fidl/cpp/vector_view.h>
20 #include <lib/sync/completion.h>
21 #include <lib/zx/time.h>
22 #include <unittest/unittest.h>
23 
24 #include "fake_logger.h"
25 
26 namespace cobalt_client {
27 namespace {
28 
29 constexpr uint64_t kMetricId = 1;
30 
31 // Number of threads spawned for multi-threaded tests.
32 constexpr uint64_t kThreads = 20;
33 
34 // Component name.
35 constexpr char kComponent[] = "SomeRamdomCounterComponent";
36 
37 constexpr uint32_t kEventCode = 2;
38 
39 } // namespace
40 
41 namespace internal {
42 namespace {
43 
MakeMetricOptions()44 MetricOptions MakeMetricOptions() {
45     MetricOptions options;
46     options.SetMode(MetricOptions::Mode::kRemote);
47     options.metric_id = kMetricId;
48     options.component = kComponent;
49     options.event_code = kEventCode;
50     return options;
51 }
52 
MakeRemoteMetricInfo()53 RemoteMetricInfo MakeRemoteMetricInfo() {
54     return RemoteMetricInfo::From(MakeMetricOptions());
55 }
56 
MakeRemoteCounter()57 RemoteCounter MakeRemoteCounter() {
58     return RemoteCounter(MakeRemoteMetricInfo());
59 }
60 
61 // Verify that increments increases the underlying count by 1.
62 // This is veryfying the default behaviour.
TestIncrement()63 bool TestIncrement() {
64     BEGIN_TEST;
65     BaseCounter<uint64_t> counter;
66 
67     ASSERT_EQ(counter.Load(), 0);
68     counter.Increment();
69     ASSERT_EQ(counter.Load(), 1);
70     counter.Increment();
71     ASSERT_EQ(counter.Load(), 2);
72     END_TEST;
73 }
74 
75 // Verify that increments increases the underlying count by val.
TestIncrementByVal()76 bool TestIncrementByVal() {
77     BEGIN_TEST;
78     BaseCounter<uint64_t> counter;
79 
80     ASSERT_EQ(counter.Load(), 0);
81     counter.Increment(23);
82     ASSERT_EQ(counter.Load(), 23);
83     END_TEST;
84 }
85 
86 // Verify that exchangest the underlying count to 0, and returns the current value.
87 // This is veryfying the default behaviour.
TestExchange()88 bool TestExchange() {
89     BEGIN_TEST;
90     BaseCounter<uint64_t> counter;
91 
92     counter.Increment(24);
93     ASSERT_EQ(counter.Load(), 24);
94     EXPECT_EQ(counter.Exchange(), 24);
95     ASSERT_EQ(counter.Load(), 0);
96     END_TEST;
97 }
98 
99 // Verify that exchangest the underlying count to 0, and returns the current value.
100 // This is veryfying the default behaviour.
TestExchangeByVal()101 bool TestExchangeByVal() {
102     BEGIN_TEST;
103     BaseCounter<uint64_t> counter;
104 
105     counter.Increment(4);
106     ASSERT_EQ(counter.Load(), 4);
107     EXPECT_EQ(counter.Exchange(3), 4);
108     ASSERT_EQ(counter.Load(), 3);
109     EXPECT_EQ(counter.Exchange(2), 3);
110     ASSERT_EQ(counter.Load(), 2);
111     END_TEST;
112 }
113 
114 struct IncrementArgs {
115     // Counter to be operated on.
116     BaseCounter<uint64_t>* counter;
117 
118     // Wait for main thread to signal before we start.
119     sync_completion_t* start;
120 
121     // Amount to increment the counter with.
122     BaseCounter<uint64_t>::Type value;
123 };
124 
IncrementFn(void * args)125 int IncrementFn(void* args) {
126     IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
127     sync_completion_wait(increment_args->start, zx::sec(20).get());
128     for (uint64_t i = 0; i < increment_args->value; ++i) {
129         increment_args->counter->Increment(increment_args->value);
130     }
131     return thrd_success;
132 }
133 
TestIncrementMultiThread()134 bool TestIncrementMultiThread() {
135     BEGIN_TEST;
136     sync_completion_t start;
137     BaseCounter<uint64_t> counter;
138     fbl::Vector<thrd_t> thread_ids;
139     IncrementArgs args[kThreads];
140 
141     thread_ids.reserve(kThreads);
142     for (uint64_t i = 0; i < kThreads; ++i) {
143         thread_ids.push_back({});
144     }
145 
146     for (uint64_t i = 0; i < kThreads; ++i) {
147         auto& thread_id = thread_ids[i];
148         args[i].counter = &counter;
149         args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
150         args[i].start = &start;
151         ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
152     }
153 
154     // Notify threads to start incrementing the count.
155     sync_completion_signal(&start);
156 
157     // Wait for all threads to finish.
158     for (const auto& thread_id : thread_ids) {
159         thrd_join(thread_id, nullptr);
160     }
161 
162     // Each thread should increase the counter by a total of value^2, which yields a total of:
163     // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
164     ASSERT_EQ(counter.Load(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
165     END_TEST;
166 }
167 
168 struct ExchangeArgs {
169     // Counter to be operated on.
170     BaseCounter<uint64_t>* counter;
171 
172     // Accumulated value of exchanged values in the counter.
173     std::atomic<BaseCounter<uint64_t>::Type>* accumulated;
174 
175     // Wait for main thread to signal before we start.
176     sync_completion_t* start;
177 
178     // Amount to increment the counter with.
179     BaseCounter<uint64_t>::Type value;
180 };
181 
182 // After all threads exit, all but one value has been added to the accumulated var,
183 // this is the last thread to call exchange, which is why test should add the current
184 // value of the counter to the accumulated atomic obtain a deterministic result.
ExchangeFn(void * args)185 int ExchangeFn(void* args) {
186     ExchangeArgs* exchange_args = static_cast<ExchangeArgs*>(args);
187     sync_completion_wait(exchange_args->start, zx::sec(20).get());
188     BaseCounter<uint64_t>::Type value = exchange_args->counter->Exchange(exchange_args->value);
189     exchange_args->accumulated->fetch_add(value, std::memory_order_relaxed);
190     return thrd_success;
191 }
192 
193 // Verify that when exchanging all intermediate values are seen by exactly 1 thread.
194 // Everythread will exhange the seen value with their value, and add it to an atomic,
195 // the result should be the same as above except that we need to add counter.Load() +
196 // accumulated_value.
TestExchangeMultiThread()197 bool TestExchangeMultiThread() {
198     BEGIN_TEST;
199     sync_completion_t start;
200     BaseCounter<uint64_t> counter;
201     std::atomic<BaseCounter<uint64_t>::Type> accumulated(0);
202     fbl::Vector<thrd_t> thread_ids;
203     ExchangeArgs args[kThreads];
204 
205     thread_ids.reserve(kThreads);
206     for (uint64_t i = 0; i < kThreads; ++i) {
207         thread_ids.push_back({});
208     }
209 
210     for (uint64_t i = 0; i < kThreads; ++i) {
211         auto& thread_id = thread_ids[i];
212         args[i].counter = &counter;
213         args[i].value = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
214         args[i].start = &start;
215         args[i].accumulated = &accumulated;
216         ASSERT_EQ(thrd_create(&thread_id, ExchangeFn, &args[i]), thrd_success);
217     }
218 
219     // Notify threads to start incrementing the count.
220     sync_completion_signal(&start);
221 
222     // Wait for all threads to finish.
223     for (const auto& thread_id : thread_ids) {
224         thrd_join(thread_id, nullptr);
225     }
226 
227     // Each thread should increase the counter by a total of value, which yields a total of:
228     // kThreads * (kThreads + 1)/ 2 = Sum(i=1, kThreads) i
229     ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
230               kThreads * (kThreads + 1) / 2);
231     END_TEST;
232 }
233 
234 // Verify flushed values match and the delta is set to 0.
TestFlush()235 bool TestFlush() {
236     BEGIN_TEST;
237     RemoteCounter counter = MakeRemoteCounter();
238     counter.Increment(20);
239     RemoteMetricInfo actual_metric_info;
240     RemoteMetricInfo expected_metric_info = MakeRemoteMetricInfo();
241     FakeLogger logger;
242     logger.set_should_fail(false);
243     int64_t actual_count;
244 
245     // Check that all data is present, we abuse some implementation details which guarantee
246     // that metadata is first in the flushed values, and the last element is the event_data we
247     // are measuring, which adds some restrictions to the internal implementation, but makes the
248     // test cleaner and readable.
249     ASSERT_TRUE(counter.Flush(&logger));
250 
251     actual_metric_info = logger.logged_counts()[0].metric_info;
252     actual_count = logger.logged_counts()[0].count;
253 
254     // We capture the values and then verify outside to avoid having to pass flag around,
255     // and have more descriptive messages on errors.
256     ASSERT_TRUE(actual_metric_info == expected_metric_info);
257     ASSERT_EQ(actual_count, 20);
258     ASSERT_EQ(counter.Load(), 0);
259     END_TEST;
260 }
261 
262 // Verify that the metadata used to create the counter is part of the flushes observation
263 // and that the current value of the counter is correct, plus resets to 0 after flush.
TestUndoFlush()264 bool TestUndoFlush() {
265     BEGIN_TEST;
266     RemoteCounter counter = MakeRemoteCounter();
267     FakeLogger logger;
268     logger.set_should_fail(false);
269 
270     counter.Increment(20);
271     ASSERT_TRUE(counter.Flush(&logger));
272     ASSERT_EQ(counter.Load(), 0);
273     counter.UndoFlush();
274     ASSERT_EQ(counter.Load(), 20);
275     END_TEST;
276 }
277 
278 struct FlushArgs {
279     // Counter to be incremented or flushed by a given thread.
280     RemoteCounter* counter;
281 
282     // Used to make the threads wait until all have been initialized.
283     sync_completion_t* start;
284 
285     // Flushed accumulated value.
286     std::atomic<RemoteCounter::Type>* accumulated;
287 
288     // Number of times to perform the operation.
289     size_t operation_count = 0;
290 
291     // Flush operations are thread-compatible and we use a mutex
292     // to provide a barrier. Interaction between other operations
293     // and flushes should remain thread-safe and eventually consistent.
294     fbl::Mutex* flush_mutex = nullptr;
295 
296     // Whether the thread should flush or increment.
297     bool flush = false;
298 };
299 
FlushFn(void * args)300 int FlushFn(void* args) {
301     FlushArgs* flush_args = static_cast<FlushArgs*>(args);
302     sync_completion_wait(flush_args->start, zx::sec(20).get());
303     FakeLogger logger;
304     for (size_t i = 0; i < flush_args->operation_count; ++i) {
305         if (flush_args->flush) {
306             fbl::AutoLock lock(flush_args->flush_mutex);
307             flush_args->counter->Flush(&logger);
308         } else {
309             flush_args->counter->Increment();
310         }
311     }
312 
313     if (flush_args->flush) {
314         for (auto count_entry : logger.logged_counts()) {
315             flush_args->accumulated->fetch_add(count_entry.count, std::memory_order_relaxed);
316         }
317     }
318     return thrd_success;
319 }
320 
321 // Verify the consistency calling flush from multiple threads. There will be kThreads incrementing
322 // the counter, kThreads flushing, and at the end we flush again, and the accumulated counter should
323 // be equal to the total |kThreads| (|kThreads| + 1) / 2.
TestFlushMultithread()324 bool TestFlushMultithread() {
325     BEGIN_TEST;
326     sync_completion_t start;
327     RemoteCounter counter = MakeRemoteCounter();
328     std::atomic<BaseCounter<int64_t>::Type> accumulated(0);
329     fbl::Vector<thrd_t> thread_ids;
330     fbl::Mutex flush_mutex;
331     FlushArgs args[kThreads];
332 
333     thread_ids.reserve(kThreads);
334     for (uint64_t i = 0; i < kThreads; ++i) {
335         thread_ids.push_back({});
336     }
337 
338     for (uint64_t i = 0; i < kThreads; ++i) {
339         auto& thread_id = thread_ids[i];
340         args[i].counter = &counter;
341         args[i].operation_count = static_cast<BaseCounter<uint64_t>::Type>(i + 1);
342         args[i].start = &start;
343         args[i].accumulated = &accumulated;
344         args[i].flush = i % 2;
345         args[i].flush_mutex = &flush_mutex;
346         ASSERT_EQ(thrd_create(&thread_id, FlushFn, &args[i]), thrd_success);
347     }
348 
349     // Notify threads to start incrementing the count.
350     sync_completion_signal(&start);
351 
352     // Wait for all threads to finish.
353     for (const auto& thread_id : thread_ids) {
354         thrd_join(thread_id, nullptr);
355     }
356 
357     // The total number of increment is the sum of odd numbers from 1 to 20 so
358     // |ceil(kThreads/2)|^2.
359     constexpr size_t ceil_threads = (kThreads / 2) + kThreads % 2;
360 
361     // Since the last thread to finish might not have flushed, we verify that the total of whats
362     // left, plust what we have accumulated equals the expected amount.
363     ASSERT_EQ(counter.Load() + accumulated.load(std::memory_order_relaxed),
364               ceil_threads * ceil_threads);
365     END_TEST;
366 }
367 
368 BEGIN_TEST_CASE(BaseCounterTest)
369 RUN_TEST(TestIncrement)
370 RUN_TEST(TestIncrementByVal)
371 RUN_TEST(TestExchange)
372 RUN_TEST(TestExchangeByVal)
373 RUN_TEST(TestIncrementMultiThread)
374 RUN_TEST(TestExchangeMultiThread)
375 END_TEST_CASE(BaseCounterTest)
376 
377 BEGIN_TEST_CASE(RemoteCounterTest)
378 RUN_TEST(TestFlush)
379 RUN_TEST(TestUndoFlush)
380 RUN_TEST(TestFlushMultithread)
381 END_TEST_CASE(RemoteCounterTest)
382 
383 } // namespace
384 } // namespace internal
385 
386 namespace {
387 
TestIncrement()388 bool TestIncrement() {
389     BEGIN_TEST;
390     Counter counter(internal::MakeMetricOptions());
391 
392     ASSERT_EQ(counter.GetRemoteCount(), 0);
393     counter.Increment();
394     ASSERT_EQ(counter.GetRemoteCount(), 1);
395     counter.Increment(24);
396     ASSERT_EQ(counter.GetRemoteCount(), 25);
397     END_TEST;
398 }
399 
400 struct IncrementArgs {
401     // Counter to be incremented.
402     Counter* counter;
403 
404     // Number of times to call increment.
405     size_t times = 0;
406 
407     // Signals threads to start incrementing.
408     sync_completion_t* start;
409 };
410 
IncrementFn(void * args)411 int IncrementFn(void* args) {
412     IncrementArgs* increment_args = static_cast<IncrementArgs*>(args);
413     sync_completion_wait(increment_args->start, zx::sec(20).get());
414 
415     for (size_t i = 0; i < increment_args->times; ++i) {
416         increment_args->counter->Increment(increment_args->times);
417     }
418     return thrd_success;
419 }
420 
TestIncrementMultiThread()421 bool TestIncrementMultiThread() {
422     BEGIN_TEST;
423     sync_completion_t start;
424     Counter counter(internal::MakeMetricOptions());
425     fbl::Vector<thrd_t> thread_ids;
426     IncrementArgs args[kThreads];
427 
428     thread_ids.reserve(kThreads);
429     for (uint64_t i = 0; i < kThreads; ++i) {
430         thread_ids.push_back({});
431     }
432 
433     for (uint64_t i = 0; i < kThreads; ++i) {
434         auto& thread_id = thread_ids[i];
435         args[i].counter = &counter;
436         args[i].times = static_cast<Counter::Count>(i + 1);
437         args[i].start = &start;
438         ASSERT_EQ(thrd_create(&thread_id, IncrementFn, &args[i]), thrd_success);
439     }
440 
441     // Notify threads to start incrementing the count.
442     sync_completion_signal(&start);
443 
444     // Wait for all threads to finish.
445     for (const auto& thread_id : thread_ids) {
446         thrd_join(thread_id, nullptr);
447     }
448 
449     // Each thread should increase the counter by a total of value^2, which yields a total of:
450     // kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6 = Sum(i=1, kThreads) i^2
451     ASSERT_EQ(counter.GetRemoteCount(), kThreads * (kThreads + 1) * (2 * kThreads + 1) / 6);
452     END_TEST;
453 }
454 
455 BEGIN_TEST_CASE(CounterTest)
456 RUN_TEST(TestIncrement)
457 RUN_TEST(TestIncrementMultiThread)
458 END_TEST_CASE(CounterTest)
459 
460 } // namespace
461 } // namespace cobalt_client
462