1 /*
2 * Copyright (c) 2015 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
9
10 #include <zephyr/kernel.h>
11
12 #include <errno.h>
13
14 #include <zephyr/net/socket.h>
15 #include <zephyr/net/zperf.h>
16
17 #include "zperf_internal.h"
18 #include "zperf_session.h"
19
20 static char sample_packet[PACKET_SIZE_MAX];
21
22 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
23 static struct zperf_async_upload_context tcp_async_upload_ctx;
24 #endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
25
sendall(int sock,const void * buf,size_t len)26 static ssize_t sendall(int sock, const void *buf, size_t len)
27 {
28 while (len) {
29 ssize_t out_len = zsock_send(sock, buf, len, 0);
30
31 if (out_len < 0) {
32 return out_len;
33 }
34
35 buf = (const char *)buf + out_len;
36 len -= out_len;
37 }
38
39 return 0;
40 }
41
tcp_upload(int sock,unsigned int duration_in_ms,const struct zperf_upload_params * param,struct zperf_results * results,uint64_t * data_offset)42 static int tcp_upload(int sock,
43 unsigned int duration_in_ms,
44 const struct zperf_upload_params *param,
45 struct zperf_results *results,
46 uint64_t *data_offset)
47 {
48 k_timepoint_t end = sys_timepoint_calc(K_MSEC(duration_in_ms));
49 int64_t start_time, end_time;
50 uint32_t nb_packets = 0U, nb_errors = 0U;
51 uint32_t packet_size = param->packet_size;
52 uint32_t alloc_errors = 0U;
53 int ret = 0;
54
55 if (packet_size > PACKET_SIZE_MAX) {
56 NET_WARN("Packet size too large! max size: %u\n", PACKET_SIZE_MAX);
57 packet_size = PACKET_SIZE_MAX;
58 }
59
60 /* Start the loop */
61 start_time = k_uptime_ticks();
62
63 /* Default data payload */
64 (void)memset(sample_packet, 'z', sizeof(sample_packet));
65
66 /* Set the "flags" field in start of the packet to be 0.
67 * As the protocol is not properly described anywhere, it is
68 * not certain if this is a proper thing to do.
69 */
70 (void)memset(sample_packet, 0, sizeof(uint32_t));
71
72 do {
73 /* Load custom data payload if requested */
74 if (param->data_loader != NULL) {
75 ret = param->data_loader(param->data_loader_ctx, *data_offset,
76 sample_packet, packet_size);
77 if (ret < 0) {
78 NET_ERR("Failed to load data for offset %llu", *data_offset);
79 return ret;
80 }
81 }
82 *data_offset += packet_size;
83
84 /* Send the packet */
85 ret = sendall(sock, sample_packet, packet_size);
86 if (ret < 0) {
87 if (nb_errors == 0 && ret != -ENOMEM) {
88 NET_ERR("Failed to send the packet (%d)", errno);
89 }
90
91 nb_errors++;
92
93 if (errno == -ENOMEM) {
94 /* Ignore memory errors as we just run out of
95 * buffers which is kind of expected if the
96 * buffer count is not optimized for the test
97 * and device.
98 */
99 alloc_errors++;
100 } else {
101 ret = -errno;
102 break;
103 }
104 } else {
105 nb_packets++;
106 }
107
108 #if defined(CONFIG_ARCH_POSIX)
109 k_busy_wait(100 * USEC_PER_MSEC);
110 #else
111 k_yield();
112 #endif
113
114 } while (!sys_timepoint_expired(end));
115
116 end_time = k_uptime_ticks();
117
118 /* Add result coming from the client */
119 results->nb_packets_sent = nb_packets;
120 results->client_time_in_us =
121 k_ticks_to_us_ceil64(end_time - start_time);
122 results->packet_size = packet_size;
123 results->nb_packets_errors = nb_errors;
124 results->total_len = (uint64_t)nb_packets * packet_size;
125
126 if (alloc_errors > 0) {
127 NET_WARN("There was %u network buffer allocation "
128 "errors during send.\nConsider increasing the "
129 "value of CONFIG_NET_BUF_TX_COUNT and\n"
130 "optionally CONFIG_NET_PKT_TX_COUNT Kconfig "
131 "options.",
132 alloc_errors);
133 }
134
135 if (ret < 0) {
136 return ret;
137 }
138
139 return 0;
140 }
141
zperf_tcp_upload(const struct zperf_upload_params * param,struct zperf_results * result)142 int zperf_tcp_upload(const struct zperf_upload_params *param,
143 struct zperf_results *result)
144 {
145 uint64_t data_offset = 0;
146 int sock;
147 int ret;
148
149 if (param == NULL || result == NULL) {
150 return -EINVAL;
151 }
152
153 sock = zperf_prepare_upload_sock(¶m->peer_addr, param->options.tos,
154 param->options.priority, param->options.tcp_nodelay,
155 IPPROTO_TCP);
156 if (sock < 0) {
157 return sock;
158 }
159
160 ret = tcp_upload(sock, param->duration_ms, param, result, &data_offset);
161
162 zsock_close(sock);
163
164 return ret;
165 }
166
tcp_upload_async_work(struct k_work * work)167 static void tcp_upload_async_work(struct k_work *work)
168 {
169 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
170 struct session *ses;
171 struct zperf_async_upload_context *upload_ctx;
172 struct zperf_results *result;
173
174 ses = CONTAINER_OF(work, struct session, async_upload_ctx.work);
175 upload_ctx = &ses->async_upload_ctx;
176
177 if (ses->wait_for_start) {
178 NET_INFO("[%d] %s waiting for start", ses->id, "TCP");
179
180 /* Wait for the start event to be set */
181 k_event_wait(ses->zperf->start_event, START_EVENT, true, K_FOREVER);
182
183 NET_INFO("[%d] %s starting", ses->id, "TCP");
184 }
185
186 NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
187 k_thread_priority_get(k_current_get()),
188 k_thread_name_get(k_current_get()));
189
190 result = &ses->result;
191
192 ses->in_progress = true;
193 #else
194 struct zperf_async_upload_context *upload_ctx = &tcp_async_upload_ctx;
195 struct zperf_results result_storage = { 0 };
196 struct zperf_results *result = &result_storage;
197 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
198
199 int ret;
200 struct zperf_upload_params param = upload_ctx->param;
201 uint64_t data_offset = 0;
202 int sock;
203
204 upload_ctx->callback(ZPERF_SESSION_STARTED, NULL,
205 upload_ctx->user_data);
206
207 sock = zperf_prepare_upload_sock(¶m.peer_addr, param.options.tos,
208 param.options.priority, param.options.tcp_nodelay,
209 IPPROTO_TCP);
210
211 if (sock < 0) {
212 upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
213 upload_ctx->user_data);
214 return;
215 }
216
217 if (param.options.report_interval_ms > 0) {
218 uint32_t report_interval = param.options.report_interval_ms;
219 uint32_t duration = param.duration_ms;
220
221 /* Compute how many upload rounds will be executed and the duration
222 * of the last round when total duration isn't divisible by interval
223 */
224 uint32_t rounds = (duration + report_interval - 1) / report_interval;
225 uint32_t last_round_duration = duration - ((rounds - 1) * report_interval);
226
227 struct zperf_results periodic_result;
228
229 for (; rounds > 0; rounds--) {
230 uint32_t round_duration;
231
232 if (rounds == 1) {
233 round_duration = last_round_duration;
234 } else {
235 round_duration = report_interval;
236 }
237 ret = tcp_upload(sock, round_duration, ¶m, &periodic_result,
238 &data_offset);
239 if (ret < 0) {
240 upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
241 upload_ctx->user_data);
242 goto cleanup;
243 }
244 upload_ctx->callback(ZPERF_SESSION_PERIODIC_RESULT, &periodic_result,
245 upload_ctx->user_data);
246
247 result->nb_packets_sent += periodic_result.nb_packets_sent;
248 result->client_time_in_us += periodic_result.client_time_in_us;
249 result->nb_packets_errors += periodic_result.nb_packets_errors;
250 }
251
252 result->packet_size = periodic_result.packet_size;
253
254 } else {
255 ret = tcp_upload(sock, param.duration_ms, ¶m, result, &data_offset);
256 if (ret < 0) {
257 upload_ctx->callback(ZPERF_SESSION_ERROR, NULL,
258 upload_ctx->user_data);
259 goto cleanup;
260 }
261 }
262
263 upload_ctx->callback(ZPERF_SESSION_FINISHED, result,
264 upload_ctx->user_data);
265 cleanup:
266 zsock_close(sock);
267 }
268
zperf_tcp_upload_async(const struct zperf_upload_params * param,zperf_callback callback,void * user_data)269 int zperf_tcp_upload_async(const struct zperf_upload_params *param,
270 zperf_callback callback, void *user_data)
271 {
272 if (param == NULL || callback == NULL) {
273 return -EINVAL;
274 }
275
276 #ifdef CONFIG_ZPERF_SESSION_PER_THREAD
277 struct k_work_q *queue;
278 struct zperf_work *zperf;
279 struct session *ses;
280 k_tid_t tid;
281
282 ses = get_free_session(¶m->peer_addr, SESSION_TCP);
283 if (ses == NULL) {
284 NET_ERR("Cannot get a session!");
285 return -ENOENT;
286 }
287
288 if (k_work_is_pending(&ses->async_upload_ctx.work)) {
289 NET_ERR("[%d] upload already in progress", ses->id);
290 return -EBUSY;
291 }
292
293 memcpy(&ses->async_upload_ctx.param, param, sizeof(*param));
294
295 ses->proto = SESSION_TCP;
296 ses->async_upload_ctx.callback = callback;
297 ses->async_upload_ctx.user_data = user_data;
298
299 zperf = get_queue(SESSION_TCP, ses->id);
300
301 queue = zperf->queue;
302 if (queue == NULL) {
303 NET_ERR("Cannot get a work queue!");
304 return -ENOENT;
305 }
306
307 tid = k_work_queue_thread_get(queue);
308 k_thread_priority_set(tid, ses->async_upload_ctx.param.options.thread_priority);
309
310 k_work_init(&ses->async_upload_ctx.work, tcp_upload_async_work);
311
312 ses->start_time = k_uptime_ticks();
313 ses->zperf = zperf;
314 ses->wait_for_start = param->options.wait_for_start;
315
316 zperf_async_work_submit(SESSION_TCP, ses->id, &ses->async_upload_ctx.work);
317
318 NET_DBG("[%d] thread %p priority %d name %s", ses->id, k_current_get(),
319 k_thread_priority_get(k_current_get()),
320 k_thread_name_get(k_current_get()));
321
322 #else /* CONFIG_ZPERF_SESSION_PER_THREAD */
323
324 if (k_work_is_pending(&tcp_async_upload_ctx.work)) {
325 return -EBUSY;
326 }
327
328 memcpy(&tcp_async_upload_ctx.param, param, sizeof(*param));
329 tcp_async_upload_ctx.callback = callback;
330 tcp_async_upload_ctx.user_data = user_data;
331
332 zperf_async_work_submit(SESSION_TCP, -1, &tcp_async_upload_ctx.work);
333
334 #endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
335
336 return 0;
337 }
338
zperf_tcp_uploader_init(void)339 void zperf_tcp_uploader_init(void)
340 {
341 #if !defined(CONFIG_ZPERF_SESSION_PER_THREAD)
342 k_work_init(&tcp_async_upload_ctx.work, tcp_upload_async_work);
343 #endif /* !CONFIG_ZPERF_SESSION_PER_THREAD */
344 }
345