1 /*
2  *
3  * SPDX-License-Identifier: Apache-2.0
4  *
5  * Copyright (c) 2025 Jorge Ramirez-Ortiz <jorge.ramirez@oss.qualcomm.com>
6  */
7 
8  #include <zephyr/logging/log.h>
9 LOG_MODULE_REGISTER(latmon, CONFIG_LATMON_LOG_LEVEL);
10 
11 #include <zephyr/net/latmon.h>
12 #include <zephyr/net/socket.h>
13 
14 /* Latmon < -- > Latmus Interface */
15 #define LATMON_NET_PORT CONFIG_NET_LATMON_PORT
16 
17 struct latmon_net_request {
18 	uint32_t period_usecs;
19 	uint32_t histogram_cells;
20 } __packed;
21 
22 struct latmon_net_data {
23 	int32_t sum_lat_hi;
24 	int32_t sum_lat_lo;
25 	int32_t min_lat;
26 	int32_t max_lat;
27 	uint32_t overruns;
28 	uint32_t samples;
29 } __packed;
30 
31 /* Private IPC: Zephyr application to Latmon service */
32 struct latmon_message {
33 	net_latmon_measure_t measure_func;
34 	int latmus; /* latmus connection */
35 };
36 
37 K_MSGQ_DEFINE(latmon_msgq, sizeof(struct latmon_message), 2, 4);
38 
39 /*
40  * Note: Using a small period (e.g., less than 100 microseconds) may result in
41  * the reporting too good interrupt latencies during a short test due to cache
42  * effects.
43  */
44 struct latmus_conf {
45 	uint32_t max_samples;
46 	uint32_t period; /* in usecs */
47 	uint32_t cells;
48 };
49 
50 /*
51  * Each cell represents a 1 usec timespan.
52  * note: the sampling period cannot be longer than 1 sec.
53  */
54 #define MAX_SAMPLING_PERIOD_USEC 1000000
55 #define HISTOGRAM_CELLS_MAX 1000
56 struct latmon_data {
57 	bool warmed; /* sample data can be used */
58 	uint32_t histogram[HISTOGRAM_CELLS_MAX];
59 	uint32_t current_samples;
60 	uint32_t overruns;
61 	uint32_t min_lat;
62 	uint32_t max_lat;
63 	uint64_t sum_lat;
64 };
65 
66 /* Message queue for sample data transfers */
67 K_MSGQ_DEFINE(xfer_msgq, sizeof(struct latmon_data), 10, 4);
68 
69 /* Network transfer thread: sends data to Latmus  */
70 #define XFER_THREAD_STACK_SIZE CONFIG_NET_LATMON_XFER_THREAD_STACK_SIZE
71 #define XFER_THREAD_PRIORITY CONFIG_NET_LATMON_XFER_THREAD_PRIORITY
72 K_THREAD_STACK_DEFINE(xfer_thread_stack, XFER_THREAD_STACK_SIZE);
73 static struct k_thread xfer_thread;
74 
75 /* Latmon thread: receives application requests */
76 #define LATMON_THREAD_PRIORITY CONFIG_NET_LATMON_THREAD_PRIORITY
77 #define LATMON_STACK_SIZE CONFIG_NET_LATMON_THREAD_STACK_SIZE
78 
79 /* Monitor thread: performs the sampling */
80 #define MONITOR_THREAD_PRIORITY CONFIG_NET_LATMON_MONITOR_THREAD_PRIORITY
81 #define MONITOR_STACK_SIZE CONFIG_NET_LATMON_MONITOR_THREAD_STACK_SIZE
82 static K_THREAD_STACK_DEFINE(monitor_stack, MONITOR_STACK_SIZE);
83 
84 static struct k_thread monitor_thread;
85 static k_tid_t monitor_tid;
86 static bool abort_monitor;
87 
88 /* Synchronization */
89 static K_SEM_DEFINE(latmon_done, 0, 1);
90 static K_SEM_DEFINE(monitor_done, 0, 1);
91 
send_net_data(int latmus,const void * buf,size_t count)92 static ssize_t send_net_data(int latmus, const void *buf, size_t count)
93 {
94 	ssize_t total_written = 0;
95 	ssize_t bytes_written;
96 
97 	while (count > 0) {
98 		const char *p = (const char *)buf + total_written;
99 
100 		bytes_written = zsock_send(latmus, p, count, 0);
101 		if (bytes_written < 0) {
102 			if (errno == EINTR) {
103 				continue;
104 			}
105 			return -1;
106 		}
107 		if (bytes_written == 0) {
108 			break;
109 		}
110 
111 		total_written += bytes_written;
112 		count -= bytes_written;
113 	}
114 
115 	return total_written;
116 }
117 
send_sample_data(int latmus,struct latmon_data * data)118 static int send_sample_data(int latmus, struct latmon_data *data)
119 {
120 	struct latmon_net_data ndata = {
121 		.sum_lat_lo = htonl(data->sum_lat & 0xffffffff),
122 		.sum_lat_hi = htonl(data->sum_lat >> 32),
123 		.samples = htonl(data->current_samples),
124 		.overruns = htonl(data->overruns),
125 		.min_lat = htonl(data->min_lat),
126 		.max_lat = htonl(data->max_lat),
127 	};
128 
129 	/* Reset the data */
130 	data->min_lat = UINT32_MAX;
131 	data->current_samples = 0;
132 	data->overruns = 0;
133 	data->max_lat = 0;
134 	data->sum_lat = 0;
135 
136 	return (send_net_data(latmus, &ndata, sizeof(ndata)) <= 0 ? -1 : 0);
137 }
138 
send_trailing_data(int latmus,struct latmus_conf * conf,struct latmon_data * data)139 static int send_trailing_data(int latmus, struct latmus_conf *conf,
140 			struct latmon_data *data)
141 {
142 	int count = conf->cells * sizeof(data->histogram[0]);
143 	ssize_t ret = 0;
144 
145 	if (data->current_samples != 0 && send_sample_data(latmus, data) < 0) {
146 		return -1;
147 	}
148 
149 	/* send empty frame */
150 	if (send_sample_data(latmus, data) < 0) {
151 		return -1;
152 	}
153 
154 	/* send histogram if enabled (ie, conf->cells > 0) */
155 	for (int cell = 0; cell < conf->cells; cell++) {
156 		data->histogram[cell] = htonl(data->histogram[cell]);
157 	}
158 
159 	ret = send_net_data(latmus, data->histogram, count);
160 	memset(data->histogram, 0, count);
161 
162 	if (ret < 0) {
163 		LOG_INF("failed tx histogram (ret=%d, errno %d)", ret, errno);
164 		return -1;
165 	}
166 
167 	return 0;
168 }
169 
prepare_sample_data(uint32_t delta,struct latmus_conf * conf,struct latmon_data * data)170 static int prepare_sample_data(uint32_t delta, struct latmus_conf *conf,
171 			struct latmon_data *data)
172 {
173 	uint32_t delta_ns = k_cyc_to_ns_floor64(delta);
174 	uint32_t delta_us = delta_ns / 1000;
175 
176 	data->sum_lat += delta_ns;
177 
178 	if (delta_ns < data->min_lat) {
179 		data->min_lat = delta_ns;
180 	}
181 
182 	if (delta_ns > data->max_lat) {
183 		data->max_lat = delta_ns;
184 	}
185 
186 	while (delta_us > conf->period) {
187 		data->overruns++;
188 		delta_us -= conf->period;
189 	}
190 
191 	if (conf->cells != 0) {
192 		if (delta_us >= conf->cells) {
193 			/* histogram outlier */
194 			delta_us = conf->cells - 1;
195 		}
196 
197 		data->histogram[delta_us]++;
198 	}
199 
200 	return ++data->current_samples < conf->max_samples ? -EAGAIN : 0;
201 }
202 
enqueue_sample_data(struct latmon_data * data)203 static int enqueue_sample_data(struct latmon_data *data)
204 {
205 	int ret = 0;
206 
207 	/* Drop the warming samples */
208 	if (data->warmed == false) {
209 		data->warmed = true;
210 		goto out;
211 	}
212 
213 	/* Enqueue the data for transfer */
214 	ret = k_msgq_put(&xfer_msgq, data, K_NO_WAIT);
215 	if (ret < 0) {
216 		LOG_ERR("Failed to enqueue netdata (queue full)");
217 	}
218 out:
219 	/* Reset the data */
220 	data->min_lat = UINT32_MAX;
221 	data->current_samples = 0;
222 	data->overruns = 0;
223 	data->max_lat = 0;
224 	data->sum_lat = 0;
225 
226 	return ret;
227 }
228 
xfer_thread_func(void * p1,void * p2,void * p3)229 static void xfer_thread_func(void *p1, void *p2, void *p3)
230 {
231 	int latmus = *(int *)p1;
232 	struct latmon_data sample;
233 
234 	LOG_INF("Transfer thread priority: %d", XFER_THREAD_PRIORITY);
235 
236 	for (;;) {
237 		if (k_msgq_get(&xfer_msgq, &sample, K_FOREVER) != 0) {
238 			LOG_ERR("Failed to get sample data to transfer");
239 			continue;
240 		}
241 
242 		if (send_sample_data(latmus, &sample) < 0) {
243 			LOG_ERR("Failed to transfer sample data");
244 			break;
245 		}
246 	}
247 }
248 
start_xfer_thread(int * latmus)249 static void start_xfer_thread(int *latmus)
250 {
251 	k_thread_create(&xfer_thread, xfer_thread_stack, XFER_THREAD_STACK_SIZE,
252 			xfer_thread_func, latmus, NULL, NULL,
253 			XFER_THREAD_PRIORITY, 0, K_MSEC(10));
254 }
255 
abort_xfer_thread(void)256 static void abort_xfer_thread(void)
257 {
258 	k_thread_abort(&xfer_thread);
259 }
260 
measure(uint32_t * delta,struct latmon_message * msg,struct latmon_data * data,struct latmus_conf * conf)261 static int measure(uint32_t *delta, struct latmon_message *msg,
262 		struct latmon_data *data,
263 		struct latmus_conf *conf)
264 {
265 	if (data->warmed == true) {
266 		k_usleep(conf->period);
267 	}
268 
269 	if (msg->measure_func(delta) < 0) {
270 		if (data->overruns++ > conf->max_samples / 2) {
271 			return -1;
272 		}
273 		/* Just an overrun */
274 		return 1;
275 	}
276 	return 0;
277 }
278 
monitor_thread_func(void * p1,void * p2,void * p3)279 static void monitor_thread_func(void *p1, void *p2, void *p3)
280 {
281 	struct latmon_message *msg = p1;
282 	struct latmus_conf *conf = p2;
283 	struct latmon_data *data = p3;
284 	uint32_t delta = 0;
285 	int ret = 0;
286 
287 	LOG_INF("Monitor thread priority: %d", MONITOR_THREAD_PRIORITY);
288 
289 	/* Prepare transfer thread */
290 	start_xfer_thread(&msg->latmus);
291 
292 	LOG_INF("\tmonitoring started:");
293 	LOG_INF("\t - samples per period: %u", conf->max_samples);
294 	LOG_INF("\t - period: %u usecs", conf->period);
295 	LOG_INF("\t - histogram cells: %u", conf->cells);
296 
297 	/* Sampling loop */
298 	memset(data, 0, sizeof(*data));
299 	data->min_lat = UINT32_MAX;
300 	data->warmed = false;
301 	delta = 0;
302 	do {
303 		ret = measure(&delta, msg, data, conf);
304 		if (ret != 0) {
305 			if (ret < 0) {
306 				LOG_ERR("\tExcessive overruns, abort!");
307 				goto out;
308 			}
309 			continue;
310 		}
311 
312 		if (prepare_sample_data(delta, conf, data) == -EAGAIN) {
313 			continue;
314 		}
315 
316 		ret = enqueue_sample_data(data);
317 		/* Abort allowed after all samples have been queued */
318 	} while (abort_monitor == false && ret == 0);
319 out:
320 	abort_xfer_thread();
321 	k_sem_give(&monitor_done);
322 	monitor_tid = NULL;
323 
324 	LOG_INF("\tmonitoring stopped");
325 }
326 
broadcast_ip_address(struct in_addr * ip_addr)327 static int broadcast_ip_address(struct in_addr *ip_addr)
328 {
329 	char ip_str[NET_IPV4_ADDR_LEN];
330 	struct sockaddr_in broadcast;
331 	int sock = -1;
332 	int ret = -1;
333 
334 	if (ip_addr == NULL || ip_addr->s_addr == INADDR_ANY) {
335 		LOG_ERR("Invalid IP address for broadcast");
336 		return -1;
337 	}
338 
339 	sock = zsock_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
340 	if (sock < 0) {
341 		LOG_ERR("Failed to create broadcast socket : %d", errno);
342 		return -1;
343 	}
344 
345 	broadcast.sin_addr.s_addr = htonl(INADDR_BROADCAST);
346 	broadcast.sin_port = htons(LATMON_NET_PORT);
347 	broadcast.sin_family = AF_INET;
348 
349 	if (net_addr_ntop(AF_INET, ip_addr, ip_str, sizeof(ip_str)) == NULL) {
350 		LOG_ERR("Failed to convert IP address to string");
351 		ret = -1;
352 		goto out;
353 	}
354 
355 	ret = zsock_sendto(sock, ip_str, strlen(ip_str), 0,
356 			(struct sockaddr *)&broadcast, sizeof(broadcast));
357 
358 out:
359 	zsock_close(sock);
360 
361 	return ret;
362 }
363 
364 /* Get a socket to listen to Latmus requests */
net_latmon_get_socket(struct sockaddr * connection_addr)365 int net_latmon_get_socket(struct sockaddr *connection_addr)
366 {
367 	struct sockaddr_in addr = {
368 		.sin_family = AF_INET,
369 		.sin_addr.s_addr = htonl(INADDR_ANY),
370 		.sin_port = htons(LATMON_NET_PORT)
371 	};
372 	int s, on = 1;
373 
374 	if (connection_addr != NULL) {
375 		memcpy(&addr, connection_addr, sizeof(addr));
376 	}
377 
378 	s = zsock_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
379 	if (s < 0) {
380 		LOG_ERR("failed to create latmon socket : %d", errno);
381 		return -1;
382 	}
383 
384 	zsock_setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
385 	if (zsock_bind(s, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
386 		LOG_ERR("failed to bind latmon socket : %d", errno);
387 		zsock_close(s);
388 		return -1;
389 	}
390 
391 	if (zsock_listen(s, 1) < 0) {
392 		LOG_ERR("failed to listen on latmon socket : %d", errno);
393 		zsock_close(s);
394 		return -1;
395 	}
396 
397 	return s;
398 }
399 
400 /* Waits for connection from Latmus */
net_latmon_connect(int socket,struct in_addr * ip)401 int net_latmon_connect(int socket, struct in_addr *ip)
402 {
403 	struct zsock_pollfd fd[1] = { {.fd = socket, .events = ZSOCK_POLLIN } };
404 	struct sockaddr_in clnt_addr;
405 	const int timeout = 5000;
406 	socklen_t len;
407 	int latmus = -1;
408 	int ret;
409 
410 	LOG_INF("Waiting for Latmus ... ");
411 
412 	/* Broadcast Latmon's address every timeout seconds until connected */
413 	ret = zsock_poll(fd, 1, timeout);
414 	if (ret < 0) {
415 		LOG_ERR("Poll error: %d", errno);
416 		return -1;
417 	} else if (ret == 0) {
418 		/* Timeout waiting for connection */
419 		if (broadcast_ip_address(ip) < 0) {
420 			LOG_ERR("Broadcast error");
421 			return -1;
422 		}
423 
424 		/* Client should retry the connection if broadcast succeeded */
425 		return -EAGAIN;
426 	}
427 
428 	/*
429 	 * As per MISRA guidelines, an 'else' clause is required. However, we
430 	 * chose to prioritize adherence to the project's code style guidelines.
431 	 */
432 	len = sizeof(clnt_addr);
433 	latmus = zsock_accept(socket, (struct sockaddr *)&clnt_addr, &len);
434 	if (latmus < 0) {
435 		LOG_INF("Failed accepting new connection...");
436 		return -1;
437 	}
438 
439 	return latmus;
440 }
441 
net_latmon_start(int latmus,net_latmon_measure_t measure_f)442 void net_latmon_start(int latmus, net_latmon_measure_t measure_f)
443 {
444 	struct latmon_message msg = {
445 		.measure_func = measure_f,
446 		.latmus = latmus,
447 	};
448 
449 	k_msgq_put(&latmon_msgq, &msg, K_NO_WAIT);
450 	k_sem_take(&latmon_done, K_FOREVER);
451 }
452 
net_latmon_running(void)453 bool net_latmon_running(void)
454 {
455 	return monitor_tid ? true : false;
456 }
457 
get_latmus_conf(ssize_t len,struct latmon_net_request * req,struct latmus_conf * conf)458 static int get_latmus_conf(ssize_t len, struct latmon_net_request *req,
459 			struct latmus_conf *conf)
460 {
461 	if (len != sizeof(*req)) {
462 		return -1;
463 	}
464 
465 	if (ntohl(req->period_usecs) == 0) {
466 		LOG_ERR("null period received, invalid\n");
467 		return -1;
468 	}
469 
470 	if (ntohl(req->period_usecs) > MAX_SAMPLING_PERIOD_USEC) {
471 		LOG_ERR("invalid period received: %u usecs\n",
472 			ntohl(req->period_usecs));
473 		return -1;
474 	}
475 
476 	if (ntohl(req->histogram_cells) > HISTOGRAM_CELLS_MAX) {
477 		LOG_ERR("invalid histogram size received: %u > %u cells\n",
478 			ntohl(req->histogram_cells), HISTOGRAM_CELLS_MAX);
479 		return -1;
480 	}
481 
482 	conf->period = ntohl(req->period_usecs);
483 	conf->cells = ntohl(req->histogram_cells);
484 	conf->max_samples = MAX_SAMPLING_PERIOD_USEC / conf->period;
485 
486 	return 0;
487 }
488 
start_monitoring(struct latmon_message * msg,struct latmus_conf * conf,struct latmon_data * data)489 static void start_monitoring(struct latmon_message *msg,
490 			struct latmus_conf *conf,
491 			struct latmon_data *data)
492 {
493 	k_sem_reset(&monitor_done);
494 	abort_monitor = false;
495 
496 	memset(data, 0, sizeof(*data));
497 	monitor_tid = k_thread_create(&monitor_thread, monitor_stack,
498 			MONITOR_STACK_SIZE, monitor_thread_func,
499 			msg, conf, data, MONITOR_THREAD_PRIORITY, 0, K_NO_WAIT);
500 }
501 
stop_monitoring(void)502 static void stop_monitoring(void)
503 {
504 	if (monitor_tid == 0) {
505 		return;
506 	}
507 
508 	abort_monitor = true;
509 	k_sem_take(&monitor_done, K_FOREVER);
510 }
511 
handle_connection(struct latmon_message * msg)512 static void handle_connection(struct latmon_message *msg)
513 {
514 #if (K_HEAP_MEM_POOL_SIZE > 0)
515 	struct latmus_conf *conf = k_malloc(sizeof(*conf));
516 	struct latmon_data *data = k_malloc(sizeof(*data));
517 	struct latmon_net_request req;
518 	ssize_t len;
519 
520 	if (conf == 0 || data == 0) {
521 		LOG_ERR("Failed to allocate memory, check HEAP_MEM_POOL_SIZE");
522 		goto out;
523 	}
524 
525 	memset(conf, 0, sizeof(*conf));
526 
527 	for (;;) {
528 		len = zsock_recv(msg->latmus, &req, sizeof(req), 0);
529 		stop_monitoring();
530 		if (get_latmus_conf(len, &req, conf) < 0) {
531 			/* Send the histogram */
532 			if (send_trailing_data(msg->latmus, conf, data) < 0) {
533 				break;
534 			}
535 			memset(conf, 0, sizeof(*conf));
536 			continue;
537 		}
538 		start_monitoring(msg, conf, data);
539 	}
540 out:
541 	k_free(conf);
542 	k_free(data);
543 	zsock_close(msg->latmus);
544 	k_sem_give(&latmon_done);
545 #else
546 	LOG_ERR("No heap configured");
547 #endif
548 }
549 
latmon_server_thread_func(void * p1,void * p2,void * p3)550 static int latmon_server_thread_func(void *p1, void *p2, void *p3)
551 {
552 	struct latmon_message msg = { };
553 
554 	LOG_INF("Latmon server thread priority: %d", LATMON_THREAD_PRIORITY);
555 
556 	for (;;) {
557 		k_msgq_get(&latmon_msgq, &msg, K_FOREVER);
558 
559 		/* Only latmus can stop the monitoring, so hang in there */
560 		handle_connection(&msg);
561 	}
562 
563 	return 0;
564 }
565 
566 K_THREAD_DEFINE(latmon_server_id, LATMON_STACK_SIZE,
567 		latmon_server_thread_func, NULL, NULL, NULL,
568 		LATMON_THREAD_PRIORITY, 0, 0);
569