Lines Matching refs:con
134 struct connection *con; member
215 static void lowcomms_queue_swork(struct connection *con) in lowcomms_queue_swork() argument
217 assert_spin_locked(&con->writequeue_lock); in lowcomms_queue_swork()
219 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_swork()
220 !test_bit(CF_APP_LIMITED, &con->flags) && in lowcomms_queue_swork()
221 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) in lowcomms_queue_swork()
222 queue_work(io_workqueue, &con->swork); in lowcomms_queue_swork()
225 static void lowcomms_queue_rwork(struct connection *con) in lowcomms_queue_rwork() argument
228 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); in lowcomms_queue_rwork()
231 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_rwork()
232 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) in lowcomms_queue_rwork()
233 queue_work(io_workqueue, &con->rwork); in lowcomms_queue_rwork()
255 static struct writequeue_entry *con_next_wq(struct connection *con) in con_next_wq() argument
259 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, in con_next_wq()
272 struct connection *con; in __find_con() local
274 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
275 if (con->nodeid == nodeid) in __find_con()
276 return con; in __find_con()
282 static void dlm_con_init(struct connection *con, int nodeid) in dlm_con_init() argument
284 con->nodeid = nodeid; in dlm_con_init()
285 init_rwsem(&con->sock_lock); in dlm_con_init()
286 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
287 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
288 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
289 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
290 spin_lock_init(&con->addrs_lock); in dlm_con_init()
291 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
300 struct connection *con, *tmp; in nodeid2con() local
304 con = __find_con(nodeid, r); in nodeid2con()
305 if (con || !alloc) in nodeid2con()
306 return con; in nodeid2con()
308 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
309 if (!con) in nodeid2con()
312 dlm_con_init(con, nodeid); in nodeid2con()
324 kfree(con); in nodeid2con()
328 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
331 return con; in nodeid2con()
367 struct connection *con; in nodeid_to_addr() local
374 con = nodeid2con(nodeid, 0); in nodeid_to_addr()
375 if (!con) { in nodeid_to_addr()
380 spin_lock(&con->addrs_lock); in nodeid_to_addr()
381 if (!con->addr_count) { in nodeid_to_addr()
382 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
387 memcpy(&sas, &con->addr[con->curr_addr_index], in nodeid_to_addr()
391 con->curr_addr_index++; in nodeid_to_addr()
392 if (con->curr_addr_index == con->addr_count) in nodeid_to_addr()
393 con->curr_addr_index = 0; in nodeid_to_addr()
396 *mark = con->mark; in nodeid_to_addr()
397 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
424 struct connection *con; in addr_to_nodeid() local
429 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in addr_to_nodeid()
430 WARN_ON_ONCE(!con->addr_count); in addr_to_nodeid()
432 spin_lock(&con->addrs_lock); in addr_to_nodeid()
433 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { in addr_to_nodeid()
434 if (addr_compare(&con->addr[addr_i], addr)) { in addr_to_nodeid()
435 *nodeid = con->nodeid; in addr_to_nodeid()
436 *mark = con->mark; in addr_to_nodeid()
437 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
442 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
450 static bool dlm_lowcomms_con_has_addr(const struct connection *con, in dlm_lowcomms_con_has_addr() argument
455 for (i = 0; i < con->addr_count; i++) { in dlm_lowcomms_con_has_addr()
456 if (addr_compare(&con->addr[i], addr)) in dlm_lowcomms_con_has_addr()
465 struct connection *con; in dlm_lowcomms_addr() local
470 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_addr()
471 if (!con) { in dlm_lowcomms_addr()
476 spin_lock(&con->addrs_lock); in dlm_lowcomms_addr()
477 if (!con->addr_count) { in dlm_lowcomms_addr()
478 memcpy(&con->addr[0], addr, sizeof(*addr)); in dlm_lowcomms_addr()
479 con->addr_count = 1; in dlm_lowcomms_addr()
480 con->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
481 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
486 ret = dlm_lowcomms_con_has_addr(con, addr); in dlm_lowcomms_addr()
488 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
493 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
494 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
499 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); in dlm_lowcomms_addr()
501 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
508 struct connection *con = sock2con(sk); in lowcomms_data_ready() local
512 set_bit(CF_RECV_INTR, &con->flags); in lowcomms_data_ready()
513 lowcomms_queue_rwork(con); in lowcomms_data_ready()
518 struct connection *con = sock2con(sk); in lowcomms_write_space() local
520 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
522 spin_lock_bh(&con->writequeue_lock); in lowcomms_write_space()
523 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
524 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
525 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
528 lowcomms_queue_swork(con); in lowcomms_write_space()
529 spin_unlock_bh(&con->writequeue_lock); in lowcomms_write_space()
550 struct connection *con; in dlm_lowcomms_connect_node() local
554 con = nodeid2con(nodeid, 0); in dlm_lowcomms_connect_node()
555 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_connect_node()
560 down_read(&con->sock_lock); in dlm_lowcomms_connect_node()
561 if (!con->sock) { in dlm_lowcomms_connect_node()
562 spin_lock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
563 lowcomms_queue_swork(con); in dlm_lowcomms_connect_node()
564 spin_unlock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
566 up_read(&con->sock_lock); in dlm_lowcomms_connect_node()
575 struct connection *con; in dlm_lowcomms_nodes_set_mark() local
579 con = nodeid2con(nodeid, 0); in dlm_lowcomms_nodes_set_mark()
580 if (!con) { in dlm_lowcomms_nodes_set_mark()
585 spin_lock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
586 con->mark = mark; in dlm_lowcomms_nodes_set_mark()
587 spin_unlock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
594 struct connection *con = sock2con(sk); in lowcomms_error_report() local
603 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
612 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
626 dlm_midcomms_unack_msg_resend(con->nodeid); in lowcomms_error_report()
645 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
650 con->sock = sock; in add_sock()
652 sk->sk_user_data = con; in add_sock()
727 static void allow_connection_io(struct connection *con) in allow_connection_io() argument
729 if (con->othercon) in allow_connection_io()
730 clear_bit(CF_IO_STOP, &con->othercon->flags); in allow_connection_io()
731 clear_bit(CF_IO_STOP, &con->flags); in allow_connection_io()
734 static void stop_connection_io(struct connection *con) in stop_connection_io() argument
736 if (con->othercon) in stop_connection_io()
737 stop_connection_io(con->othercon); in stop_connection_io()
739 spin_lock_bh(&con->writequeue_lock); in stop_connection_io()
740 set_bit(CF_IO_STOP, &con->flags); in stop_connection_io()
741 spin_unlock_bh(&con->writequeue_lock); in stop_connection_io()
743 down_write(&con->sock_lock); in stop_connection_io()
744 if (con->sock) { in stop_connection_io()
745 lock_sock(con->sock->sk); in stop_connection_io()
746 restore_callbacks(con->sock->sk); in stop_connection_io()
747 release_sock(con->sock->sk); in stop_connection_io()
749 up_write(&con->sock_lock); in stop_connection_io()
751 cancel_work_sync(&con->swork); in stop_connection_io()
752 cancel_work_sync(&con->rwork); in stop_connection_io()
756 static void close_connection(struct connection *con, bool and_other) in close_connection() argument
760 if (con->othercon && and_other) in close_connection()
761 close_connection(con->othercon, false); in close_connection()
763 down_write(&con->sock_lock); in close_connection()
764 if (!con->sock) { in close_connection()
765 up_write(&con->sock_lock); in close_connection()
769 dlm_close_sock(&con->sock); in close_connection()
782 spin_lock_bh(&con->writequeue_lock); in close_connection()
783 if (!list_empty(&con->writequeue)) { in close_connection()
784 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
789 spin_unlock_bh(&con->writequeue_lock); in close_connection()
791 con->rx_leftover = 0; in close_connection()
792 con->retries = 0; in close_connection()
793 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
794 clear_bit(CF_RECV_PENDING, &con->flags); in close_connection()
795 clear_bit(CF_SEND_PENDING, &con->flags); in close_connection()
796 up_write(&con->sock_lock); in close_connection()
799 static void shutdown_connection(struct connection *con, bool and_other) in shutdown_connection() argument
803 if (con->othercon && and_other) in shutdown_connection()
804 shutdown_connection(con->othercon, false); in shutdown_connection()
807 down_read(&con->sock_lock); in shutdown_connection()
809 if (!con->sock) { in shutdown_connection()
810 up_read(&con->sock_lock); in shutdown_connection()
814 ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how); in shutdown_connection()
815 up_read(&con->sock_lock); in shutdown_connection()
818 con, ret); in shutdown_connection()
821 ret = wait_event_timeout(con->shutdown_wait, !con->sock, in shutdown_connection()
825 con); in shutdown_connection()
833 close_connection(con, false); in shutdown_connection()
901 static int receive_from_sock(struct connection *con, int buflen) in receive_from_sock() argument
908 pentry = new_processqueue_entry(con->nodeid, buflen); in receive_from_sock()
912 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); in receive_from_sock()
917 iov.iov_base = pentry->buf + con->rx_leftover; in receive_from_sock()
918 iov.iov_len = buflen - con->rx_leftover; in receive_from_sock()
922 clear_bit(CF_RECV_INTR, &con->flags); in receive_from_sock()
924 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
926 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
928 lock_sock(con->sock->sk); in receive_from_sock()
929 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { in receive_from_sock()
930 release_sock(con->sock->sk); in receive_from_sock()
934 clear_bit(CF_RECV_PENDING, &con->flags); in receive_from_sock()
935 release_sock(con->sock->sk); in receive_from_sock()
948 buflen_real = ret + con->rx_leftover; in receive_from_sock()
949 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, in receive_from_sock()
962 con->rx_leftover = buflen_real - ret; in receive_from_sock()
963 memmove(con->rx_leftover_buf, pentry->buf + ret, in receive_from_sock()
964 con->rx_leftover); in receive_from_sock()
1165 static struct writequeue_entry *new_writequeue_entry(struct connection *con) in new_writequeue_entry() argument
1183 entry->con = con; in new_writequeue_entry()
1189 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, in new_wq_entry() argument
1195 spin_lock_bh(&con->writequeue_lock); in new_wq_entry()
1196 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1197 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1211 e = new_writequeue_entry(con); in new_wq_entry()
1221 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1224 spin_unlock_bh(&con->writequeue_lock); in new_wq_entry()
1228 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, in dlm_lowcomms_new_msg_con() argument
1241 e = new_wq_entry(con, len, ppc, cb, data); in dlm_lowcomms_new_msg_con()
1263 struct connection *con; in dlm_lowcomms_new_msg() local
1276 con = nodeid2con(nodeid, 0); in dlm_lowcomms_new_msg()
1277 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_new_msg()
1282 msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); in dlm_lowcomms_new_msg()
1299 struct connection *con = e->con; in _dlm_lowcomms_commit_msg() local
1302 spin_lock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1312 lowcomms_queue_swork(con); in _dlm_lowcomms_commit_msg()
1315 spin_unlock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1346 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, in dlm_lowcomms_resend_msg()
1363 static int send_to_sock(struct connection *con) in send_to_sock() argument
1372 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1373 e = con_next_wq(con); in send_to_sock()
1375 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1376 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1383 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1387 ret = sock_sendmsg(con->sock, &msg); in send_to_sock()
1388 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1390 lock_sock(con->sock->sk); in send_to_sock()
1391 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1392 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1393 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1397 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); in send_to_sock()
1398 con->sock->sk->sk_write_pending++; in send_to_sock()
1400 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1401 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1402 release_sock(con->sock->sk); in send_to_sock()
1407 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1408 release_sock(con->sock->sk); in send_to_sock()
1415 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1417 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1422 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1426 spin_lock_bh(&con->writequeue_lock); in clean_one_writequeue()
1427 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1430 spin_unlock_bh(&con->writequeue_lock); in clean_one_writequeue()
1435 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1437 WARN_ON_ONCE(!list_empty(&con->writequeue)); in connection_release()
1438 WARN_ON_ONCE(con->sock); in connection_release()
1439 kfree(con); in connection_release()
1446 struct connection *con; in dlm_lowcomms_close() local
1452 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1453 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_close()
1458 stop_connection_io(con); in dlm_lowcomms_close()
1460 close_connection(con, true); in dlm_lowcomms_close()
1463 hlist_del_rcu(&con->list); in dlm_lowcomms_close()
1466 clean_one_writequeue(con); in dlm_lowcomms_close()
1467 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_close()
1468 if (con->othercon) { in dlm_lowcomms_close()
1469 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1470 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); in dlm_lowcomms_close()
1486 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1489 down_read(&con->sock_lock); in process_recv_sockets()
1490 if (!con->sock) { in process_recv_sockets()
1491 up_read(&con->sock_lock); in process_recv_sockets()
1497 ret = receive_from_sock(con, buflen); in process_recv_sockets()
1499 up_read(&con->sock_lock); in process_recv_sockets()
1506 close_connection(con, false); in process_recv_sockets()
1507 wake_up(&con->shutdown_wait); in process_recv_sockets()
1528 queue_work(io_workqueue, &con->rwork); in process_recv_sockets()
1533 if (test_bit(CF_IS_OTHERCON, &con->flags)) { in process_recv_sockets()
1534 close_connection(con, false); in process_recv_sockets()
1536 spin_lock_bh(&con->writequeue_lock); in process_recv_sockets()
1537 lowcomms_queue_swork(con); in process_recv_sockets()
1538 spin_unlock_bh(&con->writequeue_lock); in process_recv_sockets()
1568 static int dlm_connect(struct connection *con) in dlm_connect() argument
1576 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1579 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1598 add_sock(sock, con); in dlm_connect()
1600 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1611 dlm_close_sock(&con->sock); in dlm_connect()
1622 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1625 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1627 down_read(&con->sock_lock); in process_send_sockets()
1628 if (!con->sock) { in process_send_sockets()
1629 up_read(&con->sock_lock); in process_send_sockets()
1630 down_write(&con->sock_lock); in process_send_sockets()
1631 if (!con->sock) { in process_send_sockets()
1632 ret = dlm_connect(con); in process_send_sockets()
1638 up_write(&con->sock_lock); in process_send_sockets()
1640 con->nodeid, con->retries++, ret); in process_send_sockets()
1647 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1651 downgrade_write(&con->sock_lock); in process_send_sockets()
1655 ret = send_to_sock(con); in process_send_sockets()
1657 up_read(&con->sock_lock); in process_send_sockets()
1666 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1670 close_connection(con, false); in process_send_sockets()
1673 spin_lock_bh(&con->writequeue_lock); in process_send_sockets()
1674 lowcomms_queue_swork(con); in process_send_sockets()
1675 spin_unlock_bh(&con->writequeue_lock); in process_send_sockets()
1719 struct connection *con; in dlm_lowcomms_shutdown() local
1732 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_shutdown()
1733 shutdown_connection(con, true); in dlm_lowcomms_shutdown()
1734 stop_connection_io(con); in dlm_lowcomms_shutdown()
1736 close_connection(con, true); in dlm_lowcomms_shutdown()
1738 clean_one_writequeue(con); in dlm_lowcomms_shutdown()
1739 if (con->othercon) in dlm_lowcomms_shutdown()
1740 clean_one_writequeue(con->othercon); in dlm_lowcomms_shutdown()
1741 allow_connection_io(con); in dlm_lowcomms_shutdown()
1966 struct connection *con; in dlm_lowcomms_exit() local
1971 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_exit()
1973 hlist_del_rcu(&con->list); in dlm_lowcomms_exit()
1976 if (con->othercon) in dlm_lowcomms_exit()
1977 call_srcu(&connections_srcu, &con->othercon->rcu, in dlm_lowcomms_exit()
1979 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_exit()