Lines Matching refs:con
120 struct connection *con; member
151 int (*connect)(struct connection *con, struct socket *sock,
159 void (*shutdown_action)(struct connection *con);
161 bool (*eof_condition)(struct connection *con);
193 static struct writequeue_entry *con_next_wq(struct connection *con) in con_next_wq() argument
197 if (list_empty(&con->writequeue)) in con_next_wq()
200 e = list_first_entry(&con->writequeue, struct writequeue_entry, in con_next_wq()
210 struct connection *con; in __find_con() local
212 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
213 if (con->nodeid == nodeid) in __find_con()
214 return con; in __find_con()
220 static bool tcp_eof_condition(struct connection *con) in tcp_eof_condition() argument
222 return atomic_read(&con->writequeue_cnt); in tcp_eof_condition()
225 static int dlm_con_init(struct connection *con, int nodeid) in dlm_con_init() argument
227 con->rx_buflen = dlm_config.ci_buffer_size; in dlm_con_init()
228 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); in dlm_con_init()
229 if (!con->rx_buf) in dlm_con_init()
232 con->nodeid = nodeid; in dlm_con_init()
233 mutex_init(&con->sock_mutex); in dlm_con_init()
234 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
235 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
236 atomic_set(&con->writequeue_cnt, 0); in dlm_con_init()
237 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
238 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
239 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
250 struct connection *con, *tmp; in nodeid2con() local
254 con = __find_con(nodeid, r); in nodeid2con()
255 if (con || !alloc) in nodeid2con()
256 return con; in nodeid2con()
258 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
259 if (!con) in nodeid2con()
262 ret = dlm_con_init(con, nodeid); in nodeid2con()
264 kfree(con); in nodeid2con()
268 mutex_init(&con->wq_alloc); in nodeid2con()
280 kfree(con->rx_buf); in nodeid2con()
281 kfree(con); in nodeid2con()
285 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
288 return con; in nodeid2con()
295 struct connection *con; in foreach_conn() local
298 hlist_for_each_entry_rcu(con, &connection_hash[i], list) in foreach_conn()
299 conn_func(con); in foreach_conn()
487 struct connection *con; in lowcomms_data_ready() local
490 con = sock2con(sk); in lowcomms_data_ready()
491 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) in lowcomms_data_ready()
492 queue_work(recv_workqueue, &con->rwork); in lowcomms_data_ready()
506 struct connection *con; in lowcomms_write_space() local
509 con = sock2con(sk); in lowcomms_write_space()
510 if (!con) in lowcomms_write_space()
513 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { in lowcomms_write_space()
514 log_print("successful connected to node %d", con->nodeid); in lowcomms_write_space()
515 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
519 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
521 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
522 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
523 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
526 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
531 static inline void lowcomms_connect_sock(struct connection *con) in lowcomms_connect_sock() argument
533 if (test_bit(CF_CLOSE, &con->flags)) in lowcomms_connect_sock()
535 queue_work(send_workqueue, &con->swork); in lowcomms_connect_sock()
556 struct connection *con; in dlm_lowcomms_connect_node() local
563 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_connect_node()
564 if (!con) { in dlm_lowcomms_connect_node()
569 lowcomms_connect_sock(con); in dlm_lowcomms_connect_node()
594 struct connection *con; in lowcomms_error_report() local
599 con = sock2con(sk); in lowcomms_error_report()
600 if (con == NULL) in lowcomms_error_report()
608 con->nodeid, dlm_config.ci_tcp_port, in lowcomms_error_report()
616 con->nodeid, &sin4->sin_addr.s_addr, in lowcomms_error_report()
625 con->nodeid, sin6->sin6_addr.s6_addr32[0], in lowcomms_error_report()
634 if (test_bit(CF_IS_OTHERCON, &con->flags)) in lowcomms_error_report()
635 con = con->sendcon; in lowcomms_error_report()
639 set_bit(CF_DELAY_CONNECT, &con->flags); in lowcomms_error_report()
645 if (!test_and_set_bit(CF_RECONNECT, &con->flags)) in lowcomms_error_report()
646 queue_work(send_workqueue, &con->swork); in lowcomms_error_report()
678 static void add_listen_sock(struct socket *sock, struct listen_connection *con) in add_listen_sock() argument
684 con->sock = sock; in add_listen_sock()
686 sk->sk_user_data = con; in add_listen_sock()
694 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
699 con->sock = sock; in add_sock()
701 sk->sk_user_data = con; in add_sock()
762 atomic_dec(&e->con->writequeue_cnt); in free_entry()
776 static void close_connection(struct connection *con, bool and_other, in close_connection() argument
779 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); in close_connection()
782 if (tx && !closing && cancel_work_sync(&con->swork)) { in close_connection()
783 log_print("canceled swork for node %d", con->nodeid); in close_connection()
784 clear_bit(CF_WRITE_PENDING, &con->flags); in close_connection()
786 if (rx && !closing && cancel_work_sync(&con->rwork)) { in close_connection()
787 log_print("canceled rwork for node %d", con->nodeid); in close_connection()
788 clear_bit(CF_READ_PENDING, &con->flags); in close_connection()
791 mutex_lock(&con->sock_mutex); in close_connection()
792 dlm_close_sock(&con->sock); in close_connection()
794 if (con->othercon && and_other) { in close_connection()
796 close_connection(con->othercon, false, tx, rx); in close_connection()
810 spin_lock(&con->writequeue_lock); in close_connection()
811 if (!list_empty(&con->writequeue)) { in close_connection()
812 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
817 spin_unlock(&con->writequeue_lock); in close_connection()
819 con->rx_leftover = 0; in close_connection()
820 con->retries = 0; in close_connection()
821 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
822 clear_bit(CF_CONNECTED, &con->flags); in close_connection()
823 clear_bit(CF_DELAY_CONNECT, &con->flags); in close_connection()
824 clear_bit(CF_RECONNECT, &con->flags); in close_connection()
825 clear_bit(CF_EOF, &con->flags); in close_connection()
826 mutex_unlock(&con->sock_mutex); in close_connection()
827 clear_bit(CF_CLOSING, &con->flags); in close_connection()
830 static void shutdown_connection(struct connection *con) in shutdown_connection() argument
834 flush_work(&con->swork); in shutdown_connection()
836 mutex_lock(&con->sock_mutex); in shutdown_connection()
838 if (!con->sock) { in shutdown_connection()
839 mutex_unlock(&con->sock_mutex); in shutdown_connection()
843 set_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
844 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
845 mutex_unlock(&con->sock_mutex); in shutdown_connection()
848 con, ret); in shutdown_connection()
851 ret = wait_event_timeout(con->shutdown_wait, in shutdown_connection()
852 !test_bit(CF_SHUTDOWN, &con->flags), in shutdown_connection()
856 con); in shutdown_connection()
864 clear_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
865 close_connection(con, false, true, true); in shutdown_connection()
868 static void dlm_tcp_shutdown(struct connection *con) in dlm_tcp_shutdown() argument
870 if (con->othercon) in dlm_tcp_shutdown()
871 shutdown_connection(con->othercon); in dlm_tcp_shutdown()
872 shutdown_connection(con); in dlm_tcp_shutdown()
875 static int con_realloc_receive_buf(struct connection *con, int newlen) in con_realloc_receive_buf() argument
884 if (con->rx_leftover) in con_realloc_receive_buf()
885 memmove(newbuf, con->rx_buf, con->rx_leftover); in con_realloc_receive_buf()
888 kfree(con->rx_buf); in con_realloc_receive_buf()
889 con->rx_buflen = newlen; in con_realloc_receive_buf()
890 con->rx_buf = newbuf; in con_realloc_receive_buf()
896 static int receive_from_sock(struct connection *con) in receive_from_sock() argument
902 mutex_lock(&con->sock_mutex); in receive_from_sock()
904 if (con->sock == NULL) { in receive_from_sock()
911 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { in receive_from_sock()
912 ret = con_realloc_receive_buf(con, buflen); in receive_from_sock()
921 iov.iov_base = con->rx_buf + con->rx_leftover; in receive_from_sock()
922 iov.iov_len = con->rx_buflen - con->rx_leftover; in receive_from_sock()
926 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
934 buflen = ret + con->rx_leftover; in receive_from_sock()
935 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); in receive_from_sock()
943 con->rx_leftover = buflen - ret; in receive_from_sock()
944 if (con->rx_leftover) { in receive_from_sock()
945 memmove(con->rx_buf, con->rx_buf + ret, in receive_from_sock()
946 con->rx_leftover); in receive_from_sock()
950 dlm_midcomms_receive_done(con->nodeid); in receive_from_sock()
951 mutex_unlock(&con->sock_mutex); in receive_from_sock()
955 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) in receive_from_sock()
956 queue_work(recv_workqueue, &con->rwork); in receive_from_sock()
957 mutex_unlock(&con->sock_mutex); in receive_from_sock()
963 con, con->nodeid); in receive_from_sock()
966 dlm_proto_ops->eof_condition(con)) { in receive_from_sock()
967 set_bit(CF_EOF, &con->flags); in receive_from_sock()
968 mutex_unlock(&con->sock_mutex); in receive_from_sock()
970 mutex_unlock(&con->sock_mutex); in receive_from_sock()
971 close_connection(con, false, true, false); in receive_from_sock()
974 clear_bit(CF_SHUTDOWN, &con->flags); in receive_from_sock()
975 wake_up(&con->shutdown_wait); in receive_from_sock()
981 mutex_unlock(&con->sock_mutex); in receive_from_sock()
987 static int accept_from_sock(struct listen_connection *con) in accept_from_sock() argument
998 if (!con->sock) in accept_from_sock()
1001 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); in accept_from_sock()
1180 static struct writequeue_entry *new_writequeue_entry(struct connection *con, in new_writequeue_entry() argument
1195 entry->con = con; in new_writequeue_entry()
1203 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, in new_wq_entry() argument
1210 spin_lock(&con->writequeue_lock); in new_wq_entry()
1211 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1212 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1222 spin_unlock(&con->writequeue_lock); in new_wq_entry()
1227 spin_unlock(&con->writequeue_lock); in new_wq_entry()
1229 e = new_writequeue_entry(con, allocation); in new_wq_entry()
1236 atomic_inc(&con->writequeue_cnt); in new_wq_entry()
1238 spin_lock(&con->writequeue_lock); in new_wq_entry()
1242 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1243 spin_unlock(&con->writequeue_lock); in new_wq_entry()
1248 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, in dlm_lowcomms_new_msg_con() argument
1270 mutex_lock(&con->wq_alloc); in dlm_lowcomms_new_msg_con()
1274 e = new_wq_entry(con, len, allocation, ppc, cb, mh); in dlm_lowcomms_new_msg_con()
1277 mutex_unlock(&con->wq_alloc); in dlm_lowcomms_new_msg_con()
1284 mutex_unlock(&con->wq_alloc); in dlm_lowcomms_new_msg_con()
1297 struct connection *con; in dlm_lowcomms_new_msg() local
1310 con = nodeid2con(nodeid, allocation); in dlm_lowcomms_new_msg()
1311 if (!con) { in dlm_lowcomms_new_msg()
1316 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh); in dlm_lowcomms_new_msg()
1330 struct connection *con = e->con; in _dlm_lowcomms_commit_msg() local
1333 spin_lock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1342 spin_unlock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1344 queue_work(send_workqueue, &con->swork); in _dlm_lowcomms_commit_msg()
1348 spin_unlock(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1372 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, in dlm_lowcomms_resend_msg()
1389 static void send_to_sock(struct connection *con) in send_to_sock() argument
1396 mutex_lock(&con->sock_mutex); in send_to_sock()
1397 if (con->sock == NULL) in send_to_sock()
1400 spin_lock(&con->writequeue_lock); in send_to_sock()
1402 e = con_next_wq(con); in send_to_sock()
1406 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); in send_to_sock()
1410 spin_unlock(&con->writequeue_lock); in send_to_sock()
1412 ret = kernel_sendpage(con->sock, e->page, offset, len, in send_to_sock()
1416 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1417 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1421 set_bit(SOCK_NOSPACE, &con->sock->flags); in send_to_sock()
1422 con->sock->sk->sk_write_pending++; in send_to_sock()
1435 spin_lock(&con->writequeue_lock); in send_to_sock()
1438 spin_unlock(&con->writequeue_lock); in send_to_sock()
1441 if (test_and_clear_bit(CF_EOF, &con->flags)) { in send_to_sock()
1442 mutex_unlock(&con->sock_mutex); in send_to_sock()
1443 close_connection(con, false, false, true); in send_to_sock()
1446 clear_bit(CF_SHUTDOWN, &con->flags); in send_to_sock()
1447 wake_up(&con->shutdown_wait); in send_to_sock()
1449 mutex_unlock(&con->sock_mutex); in send_to_sock()
1455 mutex_unlock(&con->sock_mutex); in send_to_sock()
1459 mutex_unlock(&con->sock_mutex); in send_to_sock()
1460 queue_work(send_workqueue, &con->swork); in send_to_sock()
1464 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1468 spin_lock(&con->writequeue_lock); in clean_one_writequeue()
1469 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1472 spin_unlock(&con->writequeue_lock); in clean_one_writequeue()
1479 struct connection *con; in dlm_lowcomms_close() local
1485 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1486 if (con) { in dlm_lowcomms_close()
1487 set_bit(CF_CLOSE, &con->flags); in dlm_lowcomms_close()
1488 close_connection(con, true, true, true); in dlm_lowcomms_close()
1489 clean_one_writequeue(con); in dlm_lowcomms_close()
1490 if (con->othercon) in dlm_lowcomms_close()
1491 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1511 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1513 clear_bit(CF_READ_PENDING, &con->flags); in process_recv_sockets()
1514 receive_from_sock(con); in process_recv_sockets()
1522 static void dlm_connect(struct connection *con) in dlm_connect() argument
1530 if (con->retries++ > MAX_CONNECT_RETRIES) in dlm_connect()
1533 if (con->sock) { in dlm_connect()
1534 log_print("node %d already connected.", con->nodeid); in dlm_connect()
1539 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1542 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1555 add_sock(sock, con); in dlm_connect()
1561 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1563 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, in dlm_connect()
1571 dlm_close_sock(&con->sock); in dlm_connect()
1583 log_print("connect %d try %d error %d", con->nodeid, in dlm_connect()
1584 con->retries, result); in dlm_connect()
1586 lowcomms_connect_sock(con); in dlm_connect()
1593 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1595 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1597 clear_bit(CF_WRITE_PENDING, &con->flags); in process_send_sockets()
1599 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { in process_send_sockets()
1600 close_connection(con, false, false, true); in process_send_sockets()
1601 dlm_midcomms_unack_msg_resend(con->nodeid); in process_send_sockets()
1604 if (con->sock == NULL) { in process_send_sockets()
1605 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) in process_send_sockets()
1608 mutex_lock(&con->sock_mutex); in process_send_sockets()
1609 dlm_connect(con); in process_send_sockets()
1610 mutex_unlock(&con->sock_mutex); in process_send_sockets()
1613 if (!list_empty(&con->writequeue)) in process_send_sockets()
1614 send_to_sock(con); in process_send_sockets()
1649 static void shutdown_conn(struct connection *con) in shutdown_conn() argument
1652 dlm_proto_ops->shutdown_action(con); in shutdown_conn()
1676 static void _stop_conn(struct connection *con, bool and_other) in _stop_conn() argument
1678 mutex_lock(&con->sock_mutex); in _stop_conn()
1679 set_bit(CF_CLOSE, &con->flags); in _stop_conn()
1680 set_bit(CF_READ_PENDING, &con->flags); in _stop_conn()
1681 set_bit(CF_WRITE_PENDING, &con->flags); in _stop_conn()
1682 if (con->sock && con->sock->sk) { in _stop_conn()
1683 write_lock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1684 con->sock->sk->sk_user_data = NULL; in _stop_conn()
1685 write_unlock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1687 if (con->othercon && and_other) in _stop_conn()
1688 _stop_conn(con->othercon, false); in _stop_conn()
1689 mutex_unlock(&con->sock_mutex); in _stop_conn()
1692 static void stop_conn(struct connection *con) in stop_conn() argument
1694 _stop_conn(con, true); in stop_conn()
1699 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1701 kfree(con->rx_buf); in connection_release()
1702 kfree(con); in connection_release()
1705 static void free_conn(struct connection *con) in free_conn() argument
1707 close_connection(con, true, true, true); in free_conn()
1709 hlist_del_rcu(&con->list); in free_conn()
1711 if (con->othercon) { in free_conn()
1712 clean_one_writequeue(con->othercon); in free_conn()
1713 call_srcu(&connections_srcu, &con->othercon->rcu, in free_conn()
1716 clean_one_writequeue(con); in free_conn()
1717 call_srcu(&connections_srcu, &con->rcu, connection_release); in free_conn()
1724 struct connection *con; in work_flush() local
1734 hlist_for_each_entry_rcu(con, &connection_hash[i], in work_flush()
1736 ok &= test_bit(CF_READ_PENDING, &con->flags); in work_flush()
1737 ok &= test_bit(CF_WRITE_PENDING, &con->flags); in work_flush()
1738 if (con->othercon) { in work_flush()
1740 &con->othercon->flags); in work_flush()
1742 &con->othercon->flags); in work_flush()
1827 static int dlm_tcp_connect(struct connection *con, struct socket *sock, in dlm_tcp_connect() argument
1894 static int dlm_sctp_connect(struct connection *con, struct socket *sock, in dlm_sctp_connect() argument
1910 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) in dlm_sctp_connect()
1911 log_print("successful connected to node %d", con->nodeid); in dlm_sctp_connect()