1 /*
2  * Copyright (c) 2015 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
9 
10 #include <zephyr/kernel.h>
11 
12 #include <zephyr/net/socket.h>
13 #include <zephyr/net/zperf.h>
14 
15 #include "zperf_internal.h"
16 #include "zperf_session.h"
17 
18 static uint8_t sample_packet[sizeof(struct zperf_udp_datagram) +
19 			     sizeof(struct zperf_client_hdr_v1) +
20 			     PACKET_SIZE_MAX];
21 
22 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
23 static struct zperf_async_upload_context udp_async_upload_ctx;
24 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
25 
zperf_upload_decode_stat(const uint8_t * data,size_t datalen,struct zperf_results * results)26 static inline void zperf_upload_decode_stat(const uint8_t *data,
27 					    size_t datalen,
28 					    struct zperf_results *results)
29 {
30 	struct zperf_server_hdr *stat;
31 	uint32_t flags;
32 
33 	if (datalen < sizeof(struct zperf_udp_datagram) +
34 		      sizeof(struct zperf_server_hdr)) {
35 		NET_WARN("Network packet too short");
36 	}
37 
38 	stat = (struct zperf_server_hdr *)
39 			(data + sizeof(struct zperf_udp_datagram));
40 	flags = ntohl(UNALIGNED_GET(&stat->flags));
41 	if (!(flags & ZPERF_FLAGS_VERSION1)) {
42 		NET_WARN("Unexpected response flags");
43 	}
44 
45 	results->nb_packets_rcvd = ntohl(UNALIGNED_GET(&stat->datagrams));
46 	results->nb_packets_lost = ntohl(UNALIGNED_GET(&stat->error_cnt));
47 	results->nb_packets_outorder =
48 		ntohl(UNALIGNED_GET(&stat->outorder_cnt));
49 	results->total_len = (((uint64_t)ntohl(UNALIGNED_GET(&stat->total_len1))) << 32) +
50 		ntohl(UNALIGNED_GET(&stat->total_len2));
51 	results->time_in_us = ntohl(UNALIGNED_GET(&stat->stop_usec)) +
52 		ntohl(UNALIGNED_GET(&stat->stop_sec)) * USEC_PER_SEC;
53 	results->jitter_in_us = ntohl(UNALIGNED_GET(&stat->jitter2)) +
54 		ntohl(UNALIGNED_GET(&stat->jitter1)) * USEC_PER_SEC;
55 }
56 
zperf_upload_fin(int sock,uint32_t nb_packets,uint64_t end_time_us,uint32_t packet_size,struct zperf_results * results,bool is_mcast_pkt)57 static inline int zperf_upload_fin(int sock,
58 				   uint32_t nb_packets,
59 				   uint64_t end_time_us,
60 				   uint32_t packet_size,
61 				   struct zperf_results *results,
62 				   bool is_mcast_pkt)
63 {
64 	uint8_t stats[sizeof(struct zperf_udp_datagram) +
65 		      sizeof(struct zperf_server_hdr)] = { 0 };
66 	struct zperf_udp_datagram *datagram;
67 	struct zperf_client_hdr_v1 *hdr;
68 	uint32_t secs = end_time_us / USEC_PER_SEC;
69 	uint32_t usecs = end_time_us % USEC_PER_SEC;
70 	int loop = CONFIG_NET_ZPERF_UDP_REPORT_RETANSMISSION_COUNT;
71 	int ret = 0;
72 	struct timeval rcvtimeo = {
73 		.tv_sec = 2,
74 		.tv_usec = 0,
75 	};
76 
77 	while (ret <= 0 && loop-- > 0) {
78 		datagram = (struct zperf_udp_datagram *)sample_packet;
79 
80 		/* Fill the packet header */
81 		datagram->id = htonl(-nb_packets);
82 		datagram->tv_sec = htonl(secs);
83 		datagram->tv_usec = htonl(usecs);
84 
85 		hdr = (struct zperf_client_hdr_v1 *)(sample_packet +
86 						     sizeof(*datagram));
87 
88 		/* According to iperf documentation (in include/Settings.hpp),
89 		 * if the flags == 0, then the other values are ignored.
90 		 * But even if the values in the header are ignored, try
91 		 * to set there some meaningful values.
92 		 */
93 		hdr->flags = 0;
94 		hdr->num_of_threads = htonl(1);
95 		hdr->port = 0;
96 		hdr->buffer_len = sizeof(sample_packet) -
97 			sizeof(*datagram) - sizeof(*hdr);
98 		hdr->bandwidth = 0;
99 		hdr->num_of_bytes = htonl(packet_size);
100 
101 		/* Send the packet */
102 		ret = zsock_send(sock, sample_packet, packet_size, 0);
103 		if (ret < 0) {
104 			NET_ERR("Failed to send the packet (%d)", errno);
105 			continue;
106 		}
107 
108 		/* For multicast, do not wait for a server ack. Keep resending FIN
109 		 * for the configured number of attempts by forcing another loop
110 		 * iteration.
111 		 */
112 		if (is_mcast_pkt) {
113 			ret = 0;
114 			continue;
115 		} else {
116 			/* Receive statistics */
117 			ret = zsock_setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &rcvtimeo,
118 					       sizeof(rcvtimeo));
119 			if (ret < 0) {
120 				NET_ERR("setsockopt error (%d)", errno);
121 				continue;
122 			}
123 
124 			ret = zsock_recv(sock, stats, sizeof(stats), 0);
125 			if (ret < 0 && errno == EAGAIN) {
126 				NET_WARN("Stats receive timeout");
127 			} else if (ret < 0) {
128 				NET_ERR("Failed to receive packet (%d)", errno);
129 			}
130 		}
131 	}
132 
133 	/* In multicast, we never expect a stats reply. Stop here. */
134 	if (is_mcast_pkt) {
135 		return 0;
136 	}
137 
138 	/* Decode statistics */
139 	if (ret > 0) {
140 		zperf_upload_decode_stat(stats, ret, results);
141 	} else {
142 		return ret;
143 	}
144 
145 	/* Drain RX */
146 	while (true) {
147 		ret = zsock_recv(sock, stats, sizeof(stats), ZSOCK_MSG_DONTWAIT);
148 		if (ret < 0) {
149 			break;
150 		}
151 
152 		NET_WARN("Drain one spurious stat packet!");
153 	}
154 
155 	return 0;
156 }
157 
udp_upload(int sock,int port,const struct zperf_upload_params * param,struct zperf_results * results)158 static int udp_upload(int sock, int port,
159 		      const struct zperf_upload_params *param,
160 		      struct zperf_results *results)
161 {
162 	size_t header_size =
163 		sizeof(struct zperf_udp_datagram) + sizeof(struct zperf_client_hdr_v1);
164 	uint32_t duration_in_ms = param->duration_ms;
165 	uint32_t packet_size = param->packet_size;
166 	uint32_t rate_in_kbps = param->rate_kbps;
167 	uint32_t packet_duration_us = zperf_packet_duration(packet_size, rate_in_kbps);
168 	uint32_t packet_duration = k_us_to_ticks_ceil32(packet_duration_us);
169 	uint32_t delay = packet_duration;
170 	uint64_t data_offset = 0U;
171 	uint32_t nb_packets = 0U;
172 	uint64_t usecs64;
173 	int64_t start_time, end_time;
174 	int64_t print_time, last_loop_time;
175 	uint32_t print_period;
176 	bool is_mcast_pkt = false;
177 	int ret;
178 
179 	if (packet_size > PACKET_SIZE_MAX) {
180 		NET_WARN("Packet size too large! max size: %u", PACKET_SIZE_MAX);
181 		packet_size = PACKET_SIZE_MAX;
182 	} else if (packet_size < sizeof(struct zperf_udp_datagram)) {
183 		NET_WARN("Packet size set to the min size: %zu", header_size);
184 		packet_size = header_size;
185 	}
186 
187 	/* Start the loop */
188 	start_time = k_uptime_ticks();
189 	last_loop_time = start_time;
190 	end_time = start_time + k_ms_to_ticks_ceil64(duration_in_ms);
191 
192 	/* Print log every seconds */
193 	print_period = k_ms_to_ticks_ceil32(MSEC_PER_SEC);
194 	print_time = start_time + print_period;
195 
196 	/* Default data payload */
197 	(void)memset(sample_packet, 'z', sizeof(sample_packet));
198 
199 	do {
200 		struct zperf_udp_datagram *datagram;
201 		struct zperf_client_hdr_v1 *hdr;
202 		uint32_t secs, usecs;
203 		int64_t loop_time;
204 		int32_t adjust;
205 
206 		/* Timestamp */
207 		loop_time = k_uptime_ticks();
208 
209 		/* Algorithm to maintain a given baud rate */
210 		if (last_loop_time != loop_time) {
211 			adjust = packet_duration;
212 			adjust -= (int32_t)(loop_time - last_loop_time);
213 		} else {
214 			/* It's the first iteration so no need for adjustment
215 			 */
216 			adjust = 0;
217 		}
218 
219 		if ((adjust >= 0) || (-adjust < delay)) {
220 			delay += adjust;
221 		} else {
222 			delay = 0U; /* delay should never be negative */
223 		}
224 
225 		last_loop_time = loop_time;
226 
227 		usecs64 = param->unix_offset_us + k_ticks_to_us_floor64(loop_time - start_time);
228 		secs = usecs64 / USEC_PER_SEC;
229 		usecs = usecs64 % USEC_PER_SEC;
230 
231 		/* Fill the packet header */
232 		datagram = (struct zperf_udp_datagram *)sample_packet;
233 
234 		datagram->id = htonl(nb_packets);
235 		datagram->tv_sec = htonl(secs);
236 		datagram->tv_usec = htonl(usecs);
237 
238 		hdr = (struct zperf_client_hdr_v1 *)(sample_packet +
239 						     sizeof(*datagram));
240 		hdr->flags = 0;
241 		hdr->num_of_threads = htonl(1);
242 		hdr->port = htonl(port);
243 		hdr->buffer_len = sizeof(sample_packet) -
244 			sizeof(*datagram) - sizeof(*hdr);
245 		hdr->bandwidth = htonl(rate_in_kbps);
246 		hdr->num_of_bytes = htonl(packet_size);
247 
248 		/* Load custom data payload if requested */
249 		if (param->data_loader != NULL) {
250 			ret = param->data_loader(param->data_loader_ctx, data_offset,
251 				sample_packet + header_size, packet_size - header_size);
252 			if (ret < 0) {
253 				NET_ERR("Failed to load data for offset %llu", data_offset);
254 				return ret;
255 			}
256 		}
257 		data_offset += packet_size - header_size;
258 
259 		/* Send the packet */
260 		ret = zsock_send(sock, sample_packet, packet_size, 0);
261 		if (ret < 0) {
262 			NET_ERR("Failed to send the packet (%d)", errno);
263 			return -errno;
264 		} else {
265 			nb_packets++;
266 		}
267 
268 		if (IS_ENABLED(CONFIG_NET_ZPERF_LOG_LEVEL_DBG)) {
269 			if (print_time >= loop_time) {
270 				NET_DBG("nb_packets=%u\tdelay=%u\tadjust=%d",
271 					nb_packets, (unsigned int)delay,
272 					(int)adjust);
273 				print_time += print_period;
274 			}
275 		}
276 
277 		/* Wait */
278 #if defined(CONFIG_ARCH_POSIX)
279 		k_busy_wait(USEC_PER_MSEC);
280 #else
281 		if (delay != 0) {
282 			k_sleep(K_TICKS(delay));
283 		}
284 #endif
285 	} while (last_loop_time < end_time);
286 
287 	end_time = k_uptime_ticks();
288 	usecs64 = param->unix_offset_us + k_ticks_to_us_floor64(end_time - start_time);
289 
290 	if (param->peer_addr.sa_family == AF_INET) {
291 		if (net_ipv4_is_addr_mcast(&net_sin(&param->peer_addr)->sin_addr)) {
292 			is_mcast_pkt = true;
293 		}
294 	} else if (param->peer_addr.sa_family == AF_INET6) {
295 		if (net_ipv6_is_addr_mcast(&net_sin6(&param->peer_addr)->sin6_addr)) {
296 			is_mcast_pkt = true;
297 		}
298 	} else {
299 		return -EINVAL;
300 	}
301 	ret = zperf_upload_fin(sock, nb_packets, usecs64, packet_size, results, is_mcast_pkt);
302 	if (ret < 0) {
303 		return ret;
304 	}
305 
306 	/* Add result coming from the client */
307 	results->nb_packets_sent = nb_packets;
308 	results->client_time_in_us =
309 				k_ticks_to_us_ceil64(end_time - start_time);
310 	results->packet_size = packet_size;
311 	results->is_multicast = is_mcast_pkt;
312 
313 	return 0;
314 }
315 
zperf_udp_upload(const struct zperf_upload_params * param,struct zperf_results * result)316 int zperf_udp_upload(const struct zperf_upload_params *param,
317 		     struct zperf_results *result)
318 {
319 	int port = 0;
320 	int sock;
321 	int ret;
322 	struct ifreq req;
323 
324 	if (param == NULL || result == NULL) {
325 		return -EINVAL;
326 	}
327 
328 	if (param->peer_addr.sa_family == AF_INET) {
329 		port = ntohs(net_sin(&param->peer_addr)->sin_port);
330 	} else if (param->peer_addr.sa_family == AF_INET6) {
331 		port = ntohs(net_sin6(&param->peer_addr)->sin6_port);
332 	} else {
333 		NET_ERR("Invalid address family (%d)",
334 			param->peer_addr.sa_family);
335 		return -EINVAL;
336 	}
337 
338 	sock = zperf_prepare_upload_sock(&param->peer_addr, param->options.tos,
339 					 param->options.priority, 0, IPPROTO_UDP);
340 	if (sock < 0) {
341 		return sock;
342 	}
343 
344 	if (param->if_name[0]) {
345 		(void)memset(req.ifr_name, 0, sizeof(req.ifr_name));
346 		strncpy(req.ifr_name, param->if_name, IFNAMSIZ);
347 		req.ifr_name[IFNAMSIZ - 1] = 0;
348 
349 		if (zsock_setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &req,
350 				     sizeof(struct ifreq)) != 0) {
351 			NET_WARN("setsockopt SO_BINDTODEVICE error (%d)", -errno);
352 		}
353 	}
354 
355 	ret = udp_upload(sock, port, param, result);
356 
357 	zsock_close(sock);
358 
359 	return ret;
360 }
361 
udp_upload_async_work(struct k_work * work)362 static void udp_upload_async_work(struct k_work *work)
363 {
364 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
365 	struct session *ses;
366 	struct zperf_async_upload_context *upload_ctx;
367 	struct zperf_results *result;
368 
369 	ses = CONTAINER_OF(work, struct session, async_upload_ctx.work);
370 	upload_ctx = &ses->async_upload_ctx;
371 
372 	if (ses->wait_for_start) {
373 		NET_INFO("[%d] %s waiting for start", ses->id, "UDP");
374 
375 		/* Wait for the start event to be set */
376 		k_event_wait(ses->zperf->start_event, START_EVENT, true, K_FOREVER);
377 
378 		NET_INFO("[%d] %s starting", ses->id, "UDP");
379 	}
380 
381 	NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
382 		k_thread_priority_get(k_current_get()),
383 		k_thread_name_get(k_current_get()));
384 
385 	result = &ses->result;
386 
387 	ses->in_progress = true;
388 #else
389 	struct zperf_async_upload_context *upload_ctx = &udp_async_upload_ctx;
390 	struct zperf_results result_storage = { 0 };
391 	struct zperf_results *result = &result_storage;
392 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
393 
394 	int ret;
395 
396 	upload_ctx->callback(ZPERF_SESSION_STARTED, NULL,
397 			     upload_ctx->user_data);
398 
399 	ret = zperf_udp_upload(&upload_ctx->param, result);
400 	if (ret < 0) {
401 		upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
402 				     upload_ctx->user_data);
403 	} else {
404 		upload_ctx->callback(ZPERF_SESSION_FINISHED, result,
405 				     upload_ctx->user_data);
406 	}
407 }
408 
zperf_udp_upload_async(const struct zperf_upload_params * param,zperf_callback callback,void * user_data)409 int zperf_udp_upload_async(const struct zperf_upload_params *param,
410 			   zperf_callback callback, void *user_data)
411 {
412 	if (param == NULL || callback == NULL) {
413 		return -EINVAL;
414 	}
415 
416 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
417 	struct k_work_q *queue;
418 	struct zperf_work *zperf;
419 	struct session *ses;
420 	k_tid_t tid;
421 
422 	ses = get_free_session(&param->peer_addr, SESSION_UDP);
423 	if (ses == NULL) {
424 		NET_ERR("Cannot get a session!");
425 		return -ENOENT;
426 	}
427 
428 	if (k_work_is_pending(&ses->async_upload_ctx.work)) {
429 		NET_ERR("[%d] upload already in progress", ses->id);
430 		return -EBUSY;
431 	}
432 
433 	memcpy(&ses->async_upload_ctx.param, param, sizeof(*param));
434 
435 	ses->proto = SESSION_UDP;
436 	ses->async_upload_ctx.callback = callback;
437 	ses->async_upload_ctx.user_data = user_data;
438 
439 	zperf = get_queue(SESSION_UDP, ses->id);
440 
441 	queue = zperf->queue;
442 	if (queue == NULL) {
443 		NET_ERR("Cannot get a work queue!");
444 		return -ENOENT;
445 	}
446 
447 	tid = k_work_queue_thread_get(queue);
448 	k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority);
449 
450 	k_work_init(&ses->async_upload_ctx.work, udp_upload_async_work);
451 
452 	ses->start_time = k_uptime_ticks();
453 	ses->zperf = zperf;
454 	ses->wait_for_start = param->options.wait_for_start;
455 
456 	zperf_async_work_submit(SESSION_UDP, ses->id, &ses->async_upload_ctx.work);
457 
458 	NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
459 		k_thread_priority_get(k_current_get()),
460 		k_thread_name_get(k_current_get()));
461 
462 #else /* CONFIG_ZPERF_SESSION_PER_THREAD */
463 
464 	if (k_work_is_pending(&udp_async_upload_ctx.work)) {
465 		return -EBUSY;
466 	}
467 
468 	memcpy(&udp_async_upload_ctx.param, param, sizeof(*param));
469 	udp_async_upload_ctx.callback = callback;
470 	udp_async_upload_ctx.user_data = user_data;
471 
472 	zperf_async_work_submit(SESSION_UDP, -1, &udp_async_upload_ctx.work);
473 
474 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
475 
476 	return 0;
477 }
478 
zperf_udp_uploader_init(void)479 void zperf_udp_uploader_init(void)
480 {
481 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
482 	k_work_init(&udp_async_upload_ctx.work, udp_upload_async_work);
483 #endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
484 }
485