1 /*
2  * Copyright (c) 2015 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
9 
10 #include <zephyr/kernel.h>
11 
12 #include <errno.h>
13 
14 #include <zephyr/net/socket.h>
15 #include <zephyr/net/zperf.h>
16 
17 #include "zperf_internal.h"
18 #include "zperf_session.h"
19 
20 static char sample_packet[PACKET_SIZE_MAX];
21 
22 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
23 static struct zperf_async_upload_context tcp_async_upload_ctx;
24 #endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
25 
sendall(int sock,const void * buf,size_t len)26 static ssize_t sendall(int sock, const void *buf, size_t len)
27 {
28 	while (len) {
29 		ssize_t out_len = zsock_send(sock, buf, len, 0);
30 
31 		if (out_len < 0) {
32 			return out_len;
33 		}
34 
35 		buf = (const char *)buf + out_len;
36 		len -= out_len;
37 	}
38 
39 	return 0;
40 }
41 
tcp_upload(int sock,unsigned int duration_in_ms,const struct zperf_upload_params * param,struct zperf_results * results,uint64_t * data_offset)42 static int tcp_upload(int sock,
43 		      unsigned int duration_in_ms,
44 		      const struct zperf_upload_params *param,
45 		      struct zperf_results *results,
46 		      uint64_t *data_offset)
47 {
48 	k_timepoint_t end = sys_timepoint_calc(K_MSEC(duration_in_ms));
49 	int64_t start_time, end_time;
50 	uint32_t nb_packets = 0U, nb_errors = 0U;
51 	uint32_t packet_size = param->packet_size;
52 	uint32_t alloc_errors = 0U;
53 	int ret = 0;
54 
55 	if (packet_size > PACKET_SIZE_MAX) {
56 		NET_WARN("Packet size too large! max size: %u\n", PACKET_SIZE_MAX);
57 		packet_size = PACKET_SIZE_MAX;
58 	}
59 
60 	/* Start the loop */
61 	start_time = k_uptime_ticks();
62 
63 	/* Default data payload */
64 	(void)memset(sample_packet, 'z', sizeof(sample_packet));
65 
66 	/* Set the "flags" field in start of the packet to be 0.
67 	 * As the protocol is not properly described anywhere, it is
68 	 * not certain if this is a proper thing to do.
69 	 */
70 	(void)memset(sample_packet, 0, sizeof(uint32_t));
71 
72 	do {
73 		/* Load custom data payload if requested */
74 		if (param->data_loader != NULL) {
75 			ret = param->data_loader(param->data_loader_ctx, *data_offset,
76 				sample_packet, packet_size);
77 			if (ret < 0) {
78 				NET_ERR("Failed to load data for offset %llu", *data_offset);
79 				return ret;
80 			}
81 		}
82 		*data_offset += packet_size;
83 
84 		/* Send the packet */
85 		ret = sendall(sock, sample_packet, packet_size);
86 		if (ret < 0) {
87 			if (nb_errors == 0 && ret != -ENOMEM) {
88 				NET_ERR("Failed to send the packet (%d)", errno);
89 			}
90 
91 			nb_errors++;
92 
93 			if (errno == -ENOMEM) {
94 				/* Ignore memory errors as we just run out of
95 				 * buffers which is kind of expected if the
96 				 * buffer count is not optimized for the test
97 				 * and device.
98 				 */
99 				alloc_errors++;
100 			} else {
101 				ret = -errno;
102 				break;
103 			}
104 		} else {
105 			nb_packets++;
106 		}
107 
108 #if defined(CONFIG_ARCH_POSIX)
109 		k_busy_wait(100 * USEC_PER_MSEC);
110 #else
111 		k_yield();
112 #endif
113 
114 	} while (!sys_timepoint_expired(end));
115 
116 	end_time = k_uptime_ticks();
117 
118 	/* Add result coming from the client */
119 	results->nb_packets_sent = nb_packets;
120 	results->client_time_in_us =
121 				k_ticks_to_us_ceil64(end_time - start_time);
122 	results->packet_size = packet_size;
123 	results->nb_packets_errors = nb_errors;
124 	results->total_len = (uint64_t)nb_packets * packet_size;
125 
126 	if (alloc_errors > 0) {
127 		NET_WARN("There was %u network buffer allocation "
128 			 "errors during send.\nConsider increasing the "
129 			 "value of CONFIG_NET_BUF_TX_COUNT and\n"
130 			 "optionally CONFIG_NET_PKT_TX_COUNT Kconfig "
131 			 "options.",
132 			 alloc_errors);
133 	}
134 
135 	if (ret < 0) {
136 		return ret;
137 	}
138 
139 	return 0;
140 }
141 
zperf_tcp_upload(const struct zperf_upload_params * param,struct zperf_results * result)142 int zperf_tcp_upload(const struct zperf_upload_params *param,
143 		     struct zperf_results *result)
144 {
145 	uint64_t data_offset = 0;
146 	int sock;
147 	int ret;
148 
149 	if (param == NULL || result == NULL) {
150 		return -EINVAL;
151 	}
152 
153 	sock = zperf_prepare_upload_sock(&param->peer_addr, param->options.tos,
154 					 param->options.priority, param->options.tcp_nodelay,
155 					 IPPROTO_TCP);
156 	if (sock < 0) {
157 		return sock;
158 	}
159 
160 	ret = tcp_upload(sock, param->duration_ms, param, result, &data_offset);
161 
162 	zsock_close(sock);
163 
164 	return ret;
165 }
166 
tcp_upload_async_work(struct k_work * work)167 static void tcp_upload_async_work(struct k_work *work)
168 {
169 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
170 	struct session *ses;
171 	struct zperf_async_upload_context *upload_ctx;
172 	struct zperf_results *result;
173 
174 	ses = CONTAINER_OF(work, struct session, async_upload_ctx.work);
175 	upload_ctx = &ses->async_upload_ctx;
176 
177 	if (ses->wait_for_start) {
178 		NET_INFO("[%d] %s waiting for start", ses->id, "TCP");
179 
180 		/* Wait for the start event to be set */
181 		k_event_wait(ses->zperf->start_event, START_EVENT, true, K_FOREVER);
182 
183 		NET_INFO("[%d] %s starting", ses->id, "TCP");
184 	}
185 
186 	NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
187 		k_thread_priority_get(k_current_get()),
188 		k_thread_name_get(k_current_get()));
189 
190 	result = &ses->result;
191 
192 	ses->in_progress = true;
193 #else
194 	struct zperf_async_upload_context *upload_ctx = &tcp_async_upload_ctx;
195 	struct zperf_results result_storage = { 0 };
196 	struct zperf_results *result = &result_storage;
197 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
198 
199 	int ret;
200 	struct zperf_upload_params param = upload_ctx->param;
201 	uint64_t data_offset = 0;
202 	int sock;
203 
204 	upload_ctx->callback(ZPERF_SESSION_STARTED, NULL,
205 			     upload_ctx->user_data);
206 
207 	sock = zperf_prepare_upload_sock(&param.peer_addr, param.options.tos,
208 					 param.options.priority, param.options.tcp_nodelay,
209 					 IPPROTO_TCP);
210 
211 	if (sock < 0) {
212 		upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
213 				     upload_ctx->user_data);
214 		return;
215 	}
216 
217 	if (param.options.report_interval_ms > 0) {
218 		uint32_t report_interval = param.options.report_interval_ms;
219 		uint32_t duration = param.duration_ms;
220 
221 		/* Compute how many upload rounds will be executed and the duration
222 		 * of the last round when total duration isn't divisible by interval
223 		 */
224 		uint32_t rounds = (duration + report_interval - 1) / report_interval;
225 		uint32_t last_round_duration = duration - ((rounds - 1) * report_interval);
226 
227 		struct zperf_results periodic_result;
228 
229 		for (; rounds > 0; rounds--) {
230 			uint32_t round_duration;
231 
232 			if (rounds == 1) {
233 				round_duration = last_round_duration;
234 			} else {
235 				round_duration = report_interval;
236 			}
237 			ret = tcp_upload(sock, round_duration, &param, &periodic_result,
238 				&data_offset);
239 			if (ret < 0) {
240 				upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
241 						     upload_ctx->user_data);
242 				goto cleanup;
243 			}
244 			upload_ctx->callback(ZPERF_SESSION_PERIODIC_RESULT, &periodic_result,
245 					     upload_ctx->user_data);
246 
247 			result->nb_packets_sent += periodic_result.nb_packets_sent;
248 			result->client_time_in_us += periodic_result.client_time_in_us;
249 			result->nb_packets_errors += periodic_result.nb_packets_errors;
250 		}
251 
252 		result->packet_size = periodic_result.packet_size;
253 
254 	} else {
255 		ret = tcp_upload(sock, param.duration_ms, &param, result, &data_offset);
256 		if (ret < 0) {
257 			upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
258 					     upload_ctx->user_data);
259 			goto cleanup;
260 		}
261 	}
262 
263 	upload_ctx->callback(ZPERF_SESSION_FINISHED, result,
264 			     upload_ctx->user_data);
265 cleanup:
266 	zsock_close(sock);
267 }
268 
zperf_tcp_upload_async(const struct zperf_upload_params * param,zperf_callback callback,void * user_data)269 int zperf_tcp_upload_async(const struct zperf_upload_params *param,
270 			   zperf_callback callback, void *user_data)
271 {
272 	if (param == NULL || callback == NULL) {
273 		return -EINVAL;
274 	}
275 
276 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
277 	struct k_work_q *queue;
278 	struct zperf_work *zperf;
279 	struct session *ses;
280 	k_tid_t tid;
281 
282 	ses = get_free_session(&param->peer_addr, SESSION_TCP);
283 	if (ses == NULL) {
284 		NET_ERR("Cannot get a session!");
285 		return -ENOENT;
286 	}
287 
288 	if (k_work_is_pending(&ses->async_upload_ctx.work)) {
289 		NET_ERR("[%d] upload already in progress", ses->id);
290 		return -EBUSY;
291 	}
292 
293 	memcpy(&ses->async_upload_ctx.param, param, sizeof(*param));
294 
295 	ses->proto = SESSION_TCP;
296 	ses->async_upload_ctx.callback = callback;
297 	ses->async_upload_ctx.user_data = user_data;
298 
299 	zperf = get_queue(SESSION_TCP, ses->id);
300 
301 	queue = zperf->queue;
302 	if (queue == NULL) {
303 		NET_ERR("Cannot get a work queue!");
304 		return -ENOENT;
305 	}
306 
307 	tid = k_work_queue_thread_get(queue);
308 	k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority);
309 
310 	k_work_init(&ses->async_upload_ctx.work, tcp_upload_async_work);
311 
312 	ses->start_time = k_uptime_ticks();
313 	ses->zperf = zperf;
314 	ses->wait_for_start = param->options.wait_for_start;
315 
316 	zperf_async_work_submit(SESSION_TCP, ses->id, &ses->async_upload_ctx.work);
317 
318 	NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
319 		k_thread_priority_get(k_current_get()),
320 		k_thread_name_get(k_current_get()));
321 
322 #else /* CONFIG_ZPERF_SESSION_PER_THREAD */
323 
324 	if (k_work_is_pending(&tcp_async_upload_ctx.work)) {
325 		return -EBUSY;
326 	}
327 
328 	memcpy(&tcp_async_upload_ctx.param, param, sizeof(*param));
329 	tcp_async_upload_ctx.callback = callback;
330 	tcp_async_upload_ctx.user_data = user_data;
331 
332 	zperf_async_work_submit(SESSION_TCP, -1, &tcp_async_upload_ctx.work);
333 
334 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
335 
336 	return 0;
337 }
338 
zperf_tcp_uploader_init(void)339 void zperf_tcp_uploader_init(void)
340 {
341 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
342 	k_work_init(&tcp_async_upload_ctx.work, tcp_upload_async_work);
343 #endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
344 }
345