1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC Tx data buffering.
3  *
4  * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/slab.h>
11 #include "ar-internal.h"
12 
13 static atomic_t rxrpc_txbuf_debug_ids;
14 atomic_t rxrpc_nr_txbuf;
15 
16 /*
17  * Allocate and partially initialise an I/O request structure.
18  */
rxrpc_alloc_txbuf(struct rxrpc_call * call,u8 packet_type,gfp_t gfp)19 struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type,
20 				      gfp_t gfp)
21 {
22 	struct rxrpc_txbuf *txb;
23 
24 	txb = kmalloc(sizeof(*txb), gfp);
25 	if (txb) {
26 		INIT_LIST_HEAD(&txb->call_link);
27 		INIT_LIST_HEAD(&txb->tx_link);
28 		refcount_set(&txb->ref, 1);
29 		txb->call_debug_id	= call->debug_id;
30 		txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
31 		txb->space		= sizeof(txb->data);
32 		txb->len		= 0;
33 		txb->offset		= 0;
34 		txb->flags		= 0;
35 		txb->ack_why		= 0;
36 		txb->seq		= call->tx_prepared + 1;
37 		txb->wire.epoch		= htonl(call->conn->proto.epoch);
38 		txb->wire.cid		= htonl(call->cid);
39 		txb->wire.callNumber	= htonl(call->call_id);
40 		txb->wire.seq		= htonl(txb->seq);
41 		txb->wire.type		= packet_type;
42 		txb->wire.flags		= call->conn->out_clientflag;
43 		txb->wire.userStatus	= 0;
44 		txb->wire.securityIndex	= call->security_ix;
45 		txb->wire._rsvd		= 0;
46 		txb->wire.serviceId	= htons(call->dest_srx.srx_service);
47 
48 		trace_rxrpc_txbuf(txb->debug_id,
49 				  txb->call_debug_id, txb->seq, 1,
50 				  packet_type == RXRPC_PACKET_TYPE_DATA ?
51 				  rxrpc_txbuf_alloc_data :
52 				  rxrpc_txbuf_alloc_ack);
53 		atomic_inc(&rxrpc_nr_txbuf);
54 	}
55 
56 	return txb;
57 }
58 
rxrpc_get_txbuf(struct rxrpc_txbuf * txb,enum rxrpc_txbuf_trace what)59 void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
60 {
61 	int r;
62 
63 	__refcount_inc(&txb->ref, &r);
64 	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
65 }
66 
rxrpc_see_txbuf(struct rxrpc_txbuf * txb,enum rxrpc_txbuf_trace what)67 void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
68 {
69 	int r = refcount_read(&txb->ref);
70 
71 	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
72 }
73 
rxrpc_free_txbuf(struct rcu_head * rcu)74 static void rxrpc_free_txbuf(struct rcu_head *rcu)
75 {
76 	struct rxrpc_txbuf *txb = container_of(rcu, struct rxrpc_txbuf, rcu);
77 
78 	trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
79 			  rxrpc_txbuf_free);
80 	kfree(txb);
81 	atomic_dec(&rxrpc_nr_txbuf);
82 }
83 
rxrpc_put_txbuf(struct rxrpc_txbuf * txb,enum rxrpc_txbuf_trace what)84 void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
85 {
86 	unsigned int debug_id, call_debug_id;
87 	rxrpc_seq_t seq;
88 	bool dead;
89 	int r;
90 
91 	if (txb) {
92 		debug_id = txb->debug_id;
93 		call_debug_id = txb->call_debug_id;
94 		seq = txb->seq;
95 		dead = __refcount_dec_and_test(&txb->ref, &r);
96 		trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
97 		if (dead)
98 			call_rcu(&txb->rcu, rxrpc_free_txbuf);
99 	}
100 }
101 
102 /*
103  * Shrink the transmit buffer.
104  */
rxrpc_shrink_call_tx_buffer(struct rxrpc_call * call)105 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
106 {
107 	struct rxrpc_txbuf *txb;
108 	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
109 	bool wake = false;
110 
111 	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
112 
113 	while ((txb = list_first_entry_or_null(&call->tx_buffer,
114 					       struct rxrpc_txbuf, call_link))) {
115 		hard_ack = smp_load_acquire(&call->acks_hard_ack);
116 		if (before(hard_ack, txb->seq))
117 			break;
118 
119 		if (txb->seq != call->tx_bottom + 1)
120 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
121 		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
122 		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
123 		list_del_rcu(&txb->call_link);
124 
125 		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
126 
127 		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
128 		if (after(call->acks_hard_ack, call->tx_bottom + 128))
129 			wake = true;
130 	}
131 
132 	if (wake)
133 		wake_up(&call->waitq);
134 }
135