1 /*
2 *
3 * SPDX-License-Identifier: Apache-2.0
4 *
5 * Copyright (c) 2025 Jorge Ramirez-Ortiz <jorge.ramirez@oss.qualcomm.com>
6 */
7
8 #include <zephyr/logging/log.h>
9 LOG_MODULE_REGISTER(latmon, CONFIG_LATMON_LOG_LEVEL);
10
11 #include <zephyr/net/latmon.h>
12 #include <zephyr/net/socket.h>
13
14 /* Latmon < -- > Latmus Interface */
15 #define LATMON_NET_PORT CONFIG_NET_LATMON_PORT
16
17 struct latmon_net_request {
18 uint32_t period_usecs;
19 uint32_t histogram_cells;
20 } __packed;
21
22 struct latmon_net_data {
23 int32_t sum_lat_hi;
24 int32_t sum_lat_lo;
25 int32_t min_lat;
26 int32_t max_lat;
27 uint32_t overruns;
28 uint32_t samples;
29 } __packed;
30
31 /* Private IPC: Zephyr application to Latmon service */
32 struct latmon_message {
33 net_latmon_measure_t measure_func;
34 int latmus; /* latmus connection */
35 };
36
37 K_MSGQ_DEFINE(latmon_msgq, sizeof(struct latmon_message), 2, 4);
38
39 /*
40 * Note: Using a small period (e.g., less than 100 microseconds) may result in
41 * the reporting too good interrupt latencies during a short test due to cache
42 * effects.
43 */
44 struct latmus_conf {
45 uint32_t max_samples;
46 uint32_t period; /* in usecs */
47 uint32_t cells;
48 };
49
50 /*
51 * Each cell represents a 1 usec timespan.
52 * note: the sampling period cannot be longer than 1 sec.
53 */
54 #define MAX_SAMPLING_PERIOD_USEC 1000000
55 #define HISTOGRAM_CELLS_MAX 1000
56 struct latmon_data {
57 bool warmed; /* sample data can be used */
58 uint32_t histogram[HISTOGRAM_CELLS_MAX];
59 uint32_t current_samples;
60 uint32_t overruns;
61 uint32_t min_lat;
62 uint32_t max_lat;
63 uint64_t sum_lat;
64 };
65
66 /* Message queue for sample data transfers */
67 K_MSGQ_DEFINE(xfer_msgq, sizeof(struct latmon_data), 10, 4);
68
69 /* Network transfer thread: sends data to Latmus */
70 #define XFER_THREAD_STACK_SIZE CONFIG_NET_LATMON_XFER_THREAD_STACK_SIZE
71 #define XFER_THREAD_PRIORITY CONFIG_NET_LATMON_XFER_THREAD_PRIORITY
72 K_THREAD_STACK_DEFINE(xfer_thread_stack, XFER_THREAD_STACK_SIZE);
73 static struct k_thread xfer_thread;
74
75 /* Latmon thread: receives application requests */
76 #define LATMON_THREAD_PRIORITY CONFIG_NET_LATMON_THREAD_PRIORITY
77 #define LATMON_STACK_SIZE CONFIG_NET_LATMON_THREAD_STACK_SIZE
78
79 /* Monitor thread: performs the sampling */
80 #define MONITOR_THREAD_PRIORITY CONFIG_NET_LATMON_MONITOR_THREAD_PRIORITY
81 #define MONITOR_STACK_SIZE CONFIG_NET_LATMON_MONITOR_THREAD_STACK_SIZE
82 static K_THREAD_STACK_DEFINE(monitor_stack, MONITOR_STACK_SIZE);
83
84 static struct k_thread monitor_thread;
85 static k_tid_t monitor_tid;
86 static bool abort_monitor;
87
88 /* Synchronization */
89 static K_SEM_DEFINE(latmon_done, 0, 1);
90 static K_SEM_DEFINE(monitor_done, 0, 1);
91
send_net_data(int latmus,const void * buf,size_t count)92 static ssize_t send_net_data(int latmus, const void *buf, size_t count)
93 {
94 ssize_t total_written = 0;
95 ssize_t bytes_written;
96
97 while (count > 0) {
98 const char *p = (const char *)buf + total_written;
99
100 bytes_written = zsock_send(latmus, p, count, 0);
101 if (bytes_written < 0) {
102 if (errno == EINTR) {
103 continue;
104 }
105 return -1;
106 }
107 if (bytes_written == 0) {
108 break;
109 }
110
111 total_written += bytes_written;
112 count -= bytes_written;
113 }
114
115 return total_written;
116 }
117
send_sample_data(int latmus,struct latmon_data * data)118 static int send_sample_data(int latmus, struct latmon_data *data)
119 {
120 struct latmon_net_data ndata = {
121 .sum_lat_lo = htonl(data->sum_lat & 0xffffffff),
122 .sum_lat_hi = htonl(data->sum_lat >> 32),
123 .samples = htonl(data->current_samples),
124 .overruns = htonl(data->overruns),
125 .min_lat = htonl(data->min_lat),
126 .max_lat = htonl(data->max_lat),
127 };
128
129 /* Reset the data */
130 data->min_lat = UINT32_MAX;
131 data->current_samples = 0;
132 data->overruns = 0;
133 data->max_lat = 0;
134 data->sum_lat = 0;
135
136 return (send_net_data(latmus, &ndata, sizeof(ndata)) <= 0 ? -1 : 0);
137 }
138
send_trailing_data(int latmus,struct latmus_conf * conf,struct latmon_data * data)139 static int send_trailing_data(int latmus, struct latmus_conf *conf,
140 struct latmon_data *data)
141 {
142 int count = conf->cells * sizeof(data->histogram[0]);
143 ssize_t ret = 0;
144
145 if (data->current_samples != 0 && send_sample_data(latmus, data) < 0) {
146 return -1;
147 }
148
149 /* send empty frame */
150 if (send_sample_data(latmus, data) < 0) {
151 return -1;
152 }
153
154 /* send histogram if enabled (ie, conf->cells > 0) */
155 for (int cell = 0; cell < conf->cells; cell++) {
156 data->histogram[cell] = htonl(data->histogram[cell]);
157 }
158
159 ret = send_net_data(latmus, data->histogram, count);
160 memset(data->histogram, 0, count);
161
162 if (ret < 0) {
163 LOG_INF("failed tx histogram (ret=%d, errno %d)", ret, errno);
164 return -1;
165 }
166
167 return 0;
168 }
169
prepare_sample_data(uint32_t delta,struct latmus_conf * conf,struct latmon_data * data)170 static int prepare_sample_data(uint32_t delta, struct latmus_conf *conf,
171 struct latmon_data *data)
172 {
173 uint32_t delta_ns = k_cyc_to_ns_floor64(delta);
174 uint32_t delta_us = delta_ns / 1000;
175
176 data->sum_lat += delta_ns;
177
178 if (delta_ns < data->min_lat) {
179 data->min_lat = delta_ns;
180 }
181
182 if (delta_ns > data->max_lat) {
183 data->max_lat = delta_ns;
184 }
185
186 while (delta_us > conf->period) {
187 data->overruns++;
188 delta_us -= conf->period;
189 }
190
191 if (conf->cells != 0) {
192 if (delta_us >= conf->cells) {
193 /* histogram outlier */
194 delta_us = conf->cells - 1;
195 }
196
197 data->histogram[delta_us]++;
198 }
199
200 return ++data->current_samples < conf->max_samples ? -EAGAIN : 0;
201 }
202
enqueue_sample_data(struct latmon_data * data)203 static int enqueue_sample_data(struct latmon_data *data)
204 {
205 int ret = 0;
206
207 /* Drop the warming samples */
208 if (data->warmed == false) {
209 data->warmed = true;
210 goto out;
211 }
212
213 /* Enqueue the data for transfer */
214 ret = k_msgq_put(&xfer_msgq, data, K_NO_WAIT);
215 if (ret < 0) {
216 LOG_ERR("Failed to enqueue netdata (queue full)");
217 }
218 out:
219 /* Reset the data */
220 data->min_lat = UINT32_MAX;
221 data->current_samples = 0;
222 data->overruns = 0;
223 data->max_lat = 0;
224 data->sum_lat = 0;
225
226 return ret;
227 }
228
xfer_thread_func(void * p1,void * p2,void * p3)229 static void xfer_thread_func(void *p1, void *p2, void *p3)
230 {
231 int latmus = *(int *)p1;
232 struct latmon_data sample;
233
234 LOG_INF("Transfer thread priority: %d", XFER_THREAD_PRIORITY);
235
236 for (;;) {
237 if (k_msgq_get(&xfer_msgq, &sample, K_FOREVER) != 0) {
238 LOG_ERR("Failed to get sample data to transfer");
239 continue;
240 }
241
242 if (send_sample_data(latmus, &sample) < 0) {
243 LOG_ERR("Failed to transfer sample data");
244 break;
245 }
246 }
247 }
248
start_xfer_thread(int * latmus)249 static void start_xfer_thread(int *latmus)
250 {
251 k_thread_create(&xfer_thread, xfer_thread_stack, XFER_THREAD_STACK_SIZE,
252 xfer_thread_func, latmus, NULL, NULL,
253 XFER_THREAD_PRIORITY, 0, K_MSEC(10));
254 }
255
abort_xfer_thread(void)256 static void abort_xfer_thread(void)
257 {
258 k_thread_abort(&xfer_thread);
259 }
260
measure(uint32_t * delta,struct latmon_message * msg,struct latmon_data * data,struct latmus_conf * conf)261 static int measure(uint32_t *delta, struct latmon_message *msg,
262 struct latmon_data *data,
263 struct latmus_conf *conf)
264 {
265 if (data->warmed == true) {
266 k_usleep(conf->period);
267 }
268
269 if (msg->measure_func(delta) < 0) {
270 if (data->overruns++ > conf->max_samples / 2) {
271 return -1;
272 }
273 /* Just an overrun */
274 return 1;
275 }
276 return 0;
277 }
278
monitor_thread_func(void * p1,void * p2,void * p3)279 static void monitor_thread_func(void *p1, void *p2, void *p3)
280 {
281 struct latmon_message *msg = p1;
282 struct latmus_conf *conf = p2;
283 struct latmon_data *data = p3;
284 uint32_t delta = 0;
285 int ret = 0;
286
287 LOG_INF("Monitor thread priority: %d", MONITOR_THREAD_PRIORITY);
288
289 /* Prepare transfer thread */
290 start_xfer_thread(&msg->latmus);
291
292 LOG_INF("\tmonitoring started:");
293 LOG_INF("\t - samples per period: %u", conf->max_samples);
294 LOG_INF("\t - period: %u usecs", conf->period);
295 LOG_INF("\t - histogram cells: %u", conf->cells);
296
297 /* Sampling loop */
298 memset(data, 0, sizeof(*data));
299 data->min_lat = UINT32_MAX;
300 data->warmed = false;
301 delta = 0;
302 do {
303 ret = measure(&delta, msg, data, conf);
304 if (ret != 0) {
305 if (ret < 0) {
306 LOG_ERR("\tExcessive overruns, abort!");
307 goto out;
308 }
309 continue;
310 }
311
312 if (prepare_sample_data(delta, conf, data) == -EAGAIN) {
313 continue;
314 }
315
316 ret = enqueue_sample_data(data);
317 /* Abort allowed after all samples have been queued */
318 } while (abort_monitor == false && ret == 0);
319 out:
320 abort_xfer_thread();
321 k_sem_give(&monitor_done);
322 monitor_tid = NULL;
323
324 LOG_INF("\tmonitoring stopped");
325 }
326
broadcast_ip_address(struct in_addr * ip_addr)327 static int broadcast_ip_address(struct in_addr *ip_addr)
328 {
329 char ip_str[NET_IPV4_ADDR_LEN];
330 struct sockaddr_in broadcast;
331 int sock = -1;
332 int ret = -1;
333
334 if (ip_addr == NULL || ip_addr->s_addr == INADDR_ANY) {
335 LOG_ERR("Invalid IP address for broadcast");
336 return -1;
337 }
338
339 sock = zsock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
340 if (sock < 0) {
341 LOG_ERR("Failed to create broadcast socket : %d", errno);
342 return -1;
343 }
344
345 broadcast.sin_addr.s_addr = htonl(INADDR_BROADCAST);
346 broadcast.sin_port = htons(LATMON_NET_PORT);
347 broadcast.sin_family = AF_INET;
348
349 if (net_addr_ntop(AF_INET, ip_addr, ip_str, sizeof(ip_str)) == NULL) {
350 LOG_ERR("Failed to convert IP address to string");
351 ret = -1;
352 goto out;
353 }
354
355 ret = zsock_sendto(sock, ip_str, strlen(ip_str), 0,
356 (struct sockaddr *)&broadcast, sizeof(broadcast));
357
358 out:
359 zsock_close(sock);
360
361 return ret;
362 }
363
364 /* Get a socket to listen to Latmus requests */
net_latmon_get_socket(struct sockaddr * connection_addr)365 int net_latmon_get_socket(struct sockaddr *connection_addr)
366 {
367 struct sockaddr_in addr = {
368 .sin_family = AF_INET,
369 .sin_addr.s_addr = htonl(INADDR_ANY),
370 .sin_port = htons(LATMON_NET_PORT)
371 };
372 int s, on = 1;
373
374 if (connection_addr != NULL) {
375 memcpy(&addr, connection_addr, sizeof(addr));
376 }
377
378 s = zsock_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
379 if (s < 0) {
380 LOG_ERR("failed to create latmon socket : %d", errno);
381 return -1;
382 }
383
384 zsock_setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
385 if (zsock_bind(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
386 LOG_ERR("failed to bind latmon socket : %d", errno);
387 zsock_close(s);
388 return -1;
389 }
390
391 if (zsock_listen(s, 1) < 0) {
392 LOG_ERR("failed to listen on latmon socket : %d", errno);
393 zsock_close(s);
394 return -1;
395 }
396
397 return s;
398 }
399
400 /* Waits for connection from Latmus */
net_latmon_connect(int socket,struct in_addr * ip)401 int net_latmon_connect(int socket, struct in_addr *ip)
402 {
403 struct zsock_pollfd fd[1] = { {.fd = socket, .events = ZSOCK_POLLIN } };
404 struct sockaddr_in clnt_addr;
405 const int timeout = 5000;
406 socklen_t len;
407 int latmus = -1;
408 int ret;
409
410 LOG_INF("Waiting for Latmus ... ");
411
412 /* Broadcast Latmon's address every timeout seconds until connected */
413 ret = zsock_poll(fd, 1, timeout);
414 if (ret < 0) {
415 LOG_ERR("Poll error: %d", errno);
416 return -1;
417 } else if (ret == 0) {
418 /* Timeout waiting for connection */
419 if (broadcast_ip_address(ip) < 0) {
420 LOG_ERR("Broadcast error");
421 return -1;
422 }
423
424 /* Client should retry the connection if broadcast succeeded */
425 return -EAGAIN;
426 }
427
428 /*
429 * As per MISRA guidelines, an 'else' clause is required. However, we
430 * chose to prioritize adherence to the project's code style guidelines.
431 */
432 len = sizeof(clnt_addr);
433 latmus = zsock_accept(socket, (struct sockaddr *)&clnt_addr, &len);
434 if (latmus < 0) {
435 LOG_INF("Failed accepting new connection...");
436 return -1;
437 }
438
439 return latmus;
440 }
441
net_latmon_start(int latmus,net_latmon_measure_t measure_f)442 void net_latmon_start(int latmus, net_latmon_measure_t measure_f)
443 {
444 struct latmon_message msg = {
445 .measure_func = measure_f,
446 .latmus = latmus,
447 };
448
449 k_msgq_put(&latmon_msgq, &msg, K_NO_WAIT);
450 k_sem_take(&latmon_done, K_FOREVER);
451 }
452
net_latmon_running(void)453 bool net_latmon_running(void)
454 {
455 return monitor_tid ? true : false;
456 }
457
get_latmus_conf(ssize_t len,struct latmon_net_request * req,struct latmus_conf * conf)458 static int get_latmus_conf(ssize_t len, struct latmon_net_request *req,
459 struct latmus_conf *conf)
460 {
461 if (len != sizeof(*req)) {
462 return -1;
463 }
464
465 if (ntohl(req->period_usecs) == 0) {
466 LOG_ERR("null period received, invalid\n");
467 return -1;
468 }
469
470 if (ntohl(req->period_usecs) > MAX_SAMPLING_PERIOD_USEC) {
471 LOG_ERR("invalid period received: %u usecs\n",
472 ntohl(req->period_usecs));
473 return -1;
474 }
475
476 if (ntohl(req->histogram_cells) > HISTOGRAM_CELLS_MAX) {
477 LOG_ERR("invalid histogram size received: %u > %u cells\n",
478 ntohl(req->histogram_cells), HISTOGRAM_CELLS_MAX);
479 return -1;
480 }
481
482 conf->period = ntohl(req->period_usecs);
483 conf->cells = ntohl(req->histogram_cells);
484 conf->max_samples = MAX_SAMPLING_PERIOD_USEC / conf->period;
485
486 return 0;
487 }
488
start_monitoring(struct latmon_message * msg,struct latmus_conf * conf,struct latmon_data * data)489 static void start_monitoring(struct latmon_message *msg,
490 struct latmus_conf *conf,
491 struct latmon_data *data)
492 {
493 k_sem_reset(&monitor_done);
494 abort_monitor = false;
495
496 memset(data, 0, sizeof(*data));
497 monitor_tid = k_thread_create(&monitor_thread, monitor_stack,
498 MONITOR_STACK_SIZE, monitor_thread_func,
499 msg, conf, data, MONITOR_THREAD_PRIORITY, 0, K_NO_WAIT);
500 }
501
stop_monitoring(void)502 static void stop_monitoring(void)
503 {
504 if (monitor_tid == 0) {
505 return;
506 }
507
508 abort_monitor = true;
509 k_sem_take(&monitor_done, K_FOREVER);
510 }
511
handle_connection(struct latmon_message * msg)512 static void handle_connection(struct latmon_message *msg)
513 {
514 #if (K_HEAP_MEM_POOL_SIZE > 0)
515 struct latmus_conf *conf = k_malloc(sizeof(*conf));
516 struct latmon_data *data = k_malloc(sizeof(*data));
517 struct latmon_net_request req;
518 ssize_t len;
519
520 if (conf == 0 || data == 0) {
521 LOG_ERR("Failed to allocate memory, check HEAP_MEM_POOL_SIZE");
522 goto out;
523 }
524
525 memset(conf, 0, sizeof(*conf));
526
527 for (;;) {
528 len = zsock_recv(msg->latmus, &req, sizeof(req), 0);
529 stop_monitoring();
530 if (get_latmus_conf(len, &req, conf) < 0) {
531 /* Send the histogram */
532 if (send_trailing_data(msg->latmus, conf, data) < 0) {
533 break;
534 }
535 memset(conf, 0, sizeof(*conf));
536 continue;
537 }
538 start_monitoring(msg, conf, data);
539 }
540 out:
541 k_free(conf);
542 k_free(data);
543 zsock_close(msg->latmus);
544 k_sem_give(&latmon_done);
545 #else
546 LOG_ERR("No heap configured");
547 #endif
548 }
549
latmon_server_thread_func(void * p1,void * p2,void * p3)550 static int latmon_server_thread_func(void *p1, void *p2, void *p3)
551 {
552 struct latmon_message msg = { };
553
554 LOG_INF("Latmon server thread priority: %d", LATMON_THREAD_PRIORITY);
555
556 for (;;) {
557 k_msgq_get(&latmon_msgq, &msg, K_FOREVER);
558
559 /* Only latmus can stop the monitoring, so hang in there */
560 handle_connection(&msg);
561 }
562
563 return 0;
564 }
565
566 K_THREAD_DEFINE(latmon_server_id, LATMON_STACK_SIZE,
567 latmon_server_thread_func, NULL, NULL, NULL,
568 LATMON_THREAD_PRIORITY, 0, 0);
569