ソースを参照

Merge branch 'tipc-next'

Ying Xue says:

====================
tipc: fix two corner issues

The patch set aims at resolving the following two critical issues:

Patch #1: Resolve a deadlock which happens while all links are reset
Patch #2: Correct a mistake usage of RCU lock which is used to protect
          node list
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 10 年 前
コミット
ae7633c841
7 ファイル変更87 行追加62 行削除
  1. 4 24
      net/tipc/bcast.c
  2. 0 4
      net/tipc/bcast.h
  3. 1 0
      net/tipc/discover.c
  4. 6 6
      net/tipc/link.c
  5. 2 0
      net/tipc/name_distr.c
  6. 65 25
      net/tipc/node.c
  7. 9 3
      net/tipc/node.h

+ 4 - 24
net/tipc/bcast.c

@@ -62,21 +62,8 @@ static void tipc_bclink_lock(struct net *net)
 static void tipc_bclink_unlock(struct net *net)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct tipc_node *node = NULL;
 
-	if (likely(!tn->bclink->flags)) {
-		spin_unlock_bh(&tn->bclink->lock);
-		return;
-	}
-
-	if (tn->bclink->flags & TIPC_BCLINK_RESET) {
-		tn->bclink->flags &= ~TIPC_BCLINK_RESET;
-		node = tipc_bclink_retransmit_to(net);
-	}
 	spin_unlock_bh(&tn->bclink->lock);
-
-	if (node)
-		tipc_link_reset_all(node);
 }
 
 void tipc_bclink_input(struct net *net)
@@ -91,13 +78,6 @@ uint  tipc_bclink_get_mtu(void)
 	return MAX_PKT_DEFAULT_MCAST;
 }
 
-void tipc_bclink_set_flags(struct net *net, unsigned int flags)
-{
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-
-	tn->bclink->flags |= flags;
-}
-
 static u32 bcbuf_acks(struct sk_buff *buf)
 {
 	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
@@ -156,7 +136,6 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
 						seqno : node->bclink.last_sent;
 }
 
-
 /**
  * tipc_bclink_retransmit_to - get most recent node to request retransmission
  *
@@ -350,13 +329,12 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
 		return;
 
 	tipc_node_lock(n_ptr);
-
 	if (n_ptr->bclink.recv_permitted &&
 	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
 	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
 		n_ptr->bclink.oos_state = 2;
-
 	tipc_node_unlock(n_ptr);
+	tipc_node_put(n_ptr);
 }
 
 /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
@@ -476,17 +454,18 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
 			goto unlock;
 		if (msg_destnode(msg) == tn->own_addr) {
 			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
-			tipc_node_unlock(node);
 			tipc_bclink_lock(net);
 			bcl->stats.recv_nacks++;
 			tn->bclink->retransmit_to = node;
 			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
 					      msg_bcgap_to(msg));
 			tipc_bclink_unlock(net);
+			tipc_node_unlock(node);
 		} else {
 			tipc_node_unlock(node);
 			bclink_peek_nack(net, msg);
 		}
+		tipc_node_put(node);
 		goto exit;
 	}
 
@@ -591,6 +570,7 @@ receive:
 
 unlock:
 	tipc_node_unlock(node);
+	tipc_node_put(node);
 exit:
 	kfree_skb(buf);
 }

+ 0 - 4
net/tipc/bcast.h

@@ -55,7 +55,6 @@ struct tipc_bcbearer_pair {
 	struct tipc_bearer *secondary;
 };
 
-#define TIPC_BCLINK_RESET	1
 #define	BCBEARER		MAX_BEARERS
 
 /**
@@ -86,7 +85,6 @@ struct tipc_bcbearer {
  * @lock: spinlock governing access to structure
  * @link: (non-standard) broadcast link structure
  * @node: (non-standard) node structure representing b'cast link's peer node
- * @flags: represent bclink states
  * @bcast_nodes: map of broadcast-capable nodes
  * @retransmit_to: node that most recently requested a retransmit
  *
@@ -96,7 +94,6 @@ struct tipc_bclink {
 	spinlock_t lock;
 	struct tipc_link link;
 	struct tipc_node node;
-	unsigned int flags;
 	struct sk_buff_head arrvq;
 	struct sk_buff_head inputq;
 	struct tipc_node_map bcast_nodes;
@@ -117,7 +114,6 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
 
 int tipc_bclink_init(struct net *net);
 void tipc_bclink_stop(struct net *net);
-void tipc_bclink_set_flags(struct net *tn, unsigned int flags);
 void tipc_bclink_add_node(struct net *net, u32 addr);
 void tipc_bclink_remove_node(struct net *net, u32 addr);
 struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);

+ 1 - 0
net/tipc/discover.c

@@ -260,6 +260,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 		}
 	}
 	tipc_node_unlock(node);
+	tipc_node_put(node);
 }
 
 /**

+ 6 - 6
net/tipc/link.c

@@ -854,6 +854,7 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 		if (link)
 			rc = __tipc_link_xmit(net, link, list);
 		tipc_node_unlock(node);
+		tipc_node_put(node);
 	}
 	if (link)
 		return rc;
@@ -980,7 +981,6 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 			(unsigned long) TIPC_SKB_CB(buf)->handle);
 
 		n_ptr = tipc_bclink_retransmit_to(net);
-		tipc_node_lock(n_ptr);
 
 		tipc_addr_string_fill(addr_string, n_ptr->addr);
 		pr_info("Broadcast link info for %s\n", addr_string);
@@ -992,9 +992,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 			n_ptr->bclink.oos_state,
 			n_ptr->bclink.last_sent);
 
-		tipc_node_unlock(n_ptr);
-
-		tipc_bclink_set_flags(net, TIPC_BCLINK_RESET);
+		n_ptr->action_flags |= TIPC_BCAST_RESET;
 		l_ptr->stale_count = 0;
 	}
 }
@@ -1119,8 +1117,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		n_ptr = tipc_node_find(net, msg_prevnode(msg));
 		if (unlikely(!n_ptr))
 			goto discard;
-		tipc_node_lock(n_ptr);
 
+		tipc_node_lock(n_ptr);
 		/* Locate unicast link endpoint that should handle message */
 		l_ptr = n_ptr->links[b_ptr->identity];
 		if (unlikely(!l_ptr))
@@ -1208,6 +1206,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 		skb = NULL;
 unlock:
 		tipc_node_unlock(n_ptr);
+		tipc_node_put(n_ptr);
 discard:
 		if (unlikely(skb))
 			kfree_skb(skb);
@@ -2239,7 +2238,6 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
 	msg.seq = cb->nlh->nlmsg_seq;
 
 	rcu_read_lock();
-
 	if (prev_node) {
 		node = tipc_node_find(net, prev_node);
 		if (!node) {
@@ -2252,6 +2250,7 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
 			cb->prev_seq = 1;
 			goto out;
 		}
+		tipc_node_put(node);
 
 		list_for_each_entry_continue_rcu(node, &tn->node_list,
 						 list) {
@@ -2259,6 +2258,7 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
 			err = __tipc_nl_add_node_links(net, &msg, node,
 						       &prev_link);
 			tipc_node_unlock(node);
+			tipc_node_put(node);
 			if (err)
 				goto out;
 

+ 2 - 0
net/tipc/name_distr.c

@@ -244,6 +244,7 @@ static void tipc_publ_subscribe(struct net *net, struct publication *publ,
 	tipc_node_lock(node);
 	list_add_tail(&publ->nodesub_list, &node->publ_list);
 	tipc_node_unlock(node);
+	tipc_node_put(node);
 }
 
 static void tipc_publ_unsubscribe(struct net *net, struct publication *publ,
@@ -258,6 +259,7 @@ static void tipc_publ_unsubscribe(struct net *net, struct publication *publ,
 	tipc_node_lock(node);
 	list_del_init(&publ->nodesub_list);
 	tipc_node_unlock(node);
+	tipc_node_put(node);
 }
 
 /**

+ 65 - 25
net/tipc/node.c

@@ -42,6 +42,7 @@
 
 static void node_lost_contact(struct tipc_node *n_ptr);
 static void node_established_contact(struct tipc_node *n_ptr);
+static void tipc_node_delete(struct tipc_node *node);
 
 struct tipc_sock_conn {
 	u32 port;
@@ -67,6 +68,23 @@ static unsigned int tipc_hashfn(u32 addr)
 	return addr & (NODE_HTABLE_SIZE - 1);
 }
 
+static void tipc_node_kref_release(struct kref *kref)
+{
+	struct tipc_node *node = container_of(kref, struct tipc_node, kref);
+
+	tipc_node_delete(node);
+}
+
+void tipc_node_put(struct tipc_node *node)
+{
+	kref_put(&node->kref, tipc_node_kref_release);
+}
+
+static void tipc_node_get(struct tipc_node *node)
+{
+	kref_get(&node->kref);
+}
+
 /*
  * tipc_node_find - locate specified node object, if it exists
  */
@@ -82,6 +100,7 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
 	hlist_for_each_entry_rcu(node, &tn->node_htable[tipc_hashfn(addr)],
 				 hash) {
 		if (node->addr == addr) {
+			tipc_node_get(node);
 			rcu_read_unlock();
 			return node;
 		}
@@ -106,6 +125,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 	}
 	n_ptr->addr = addr;
 	n_ptr->net = net;
+	kref_init(&n_ptr->kref);
 	spin_lock_init(&n_ptr->lock);
 	INIT_HLIST_NODE(&n_ptr->hash);
 	INIT_LIST_HEAD(&n_ptr->list);
@@ -120,16 +140,17 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
 	n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
 	n_ptr->signature = INVALID_NODE_SIG;
+	tipc_node_get(n_ptr);
 exit:
 	spin_unlock_bh(&tn->node_list_lock);
 	return n_ptr;
 }
 
-static void tipc_node_delete(struct tipc_net *tn, struct tipc_node *n_ptr)
+static void tipc_node_delete(struct tipc_node *node)
 {
-	list_del_rcu(&n_ptr->list);
-	hlist_del_rcu(&n_ptr->hash);
-	kfree_rcu(n_ptr, rcu);
+	list_del_rcu(&node->list);
+	hlist_del_rcu(&node->hash);
+	kfree_rcu(node, rcu);
 }
 
 void tipc_node_stop(struct net *net)
@@ -139,7 +160,7 @@ void tipc_node_stop(struct net *net)
 
 	spin_lock_bh(&tn->node_list_lock);
 	list_for_each_entry_safe(node, t_node, &tn->node_list, list)
-		tipc_node_delete(tn, node);
+		tipc_node_put(node);
 	spin_unlock_bh(&tn->node_list_lock);
 }
 
@@ -147,6 +168,7 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 {
 	struct tipc_node *node;
 	struct tipc_sock_conn *conn;
+	int err = 0;
 
 	if (in_own_node(net, dnode))
 		return 0;
@@ -157,8 +179,10 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 		return -EHOSTUNREACH;
 	}
 	conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
-	if (!conn)
-		return -EHOSTUNREACH;
+	if (!conn) {
+		err = -EHOSTUNREACH;
+		goto exit;
+	}
 	conn->peer_node = dnode;
 	conn->port = port;
 	conn->peer_port = peer_port;
@@ -166,7 +190,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 	tipc_node_lock(node);
 	list_add_tail(&conn->list, &node->conn_sks);
 	tipc_node_unlock(node);
-	return 0;
+exit:
+	tipc_node_put(node);
+	return err;
 }
 
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
@@ -189,6 +215,7 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 		kfree(conn);
 	}
 	tipc_node_unlock(node);
+	tipc_node_put(node);
 }
 
 /**
@@ -417,19 +444,25 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
 			   char *linkname, size_t len)
 {
 	struct tipc_link *link;
+	int err = -EINVAL;
 	struct tipc_node *node = tipc_node_find(net, addr);
 
-	if ((bearer_id >= MAX_BEARERS) || !node)
-		return -EINVAL;
+	if (!node)
+		return err;
+
+	if (bearer_id >= MAX_BEARERS)
+		goto exit;
+
 	tipc_node_lock(node);
 	link = node->links[bearer_id];
 	if (link) {
 		strncpy(linkname, link->name, len);
-		tipc_node_unlock(node);
-		return 0;
+		err = 0;
 	}
+exit:
 	tipc_node_unlock(node);
-	return -EINVAL;
+	tipc_node_put(node);
+	return err;
 }
 
 void tipc_node_unlock(struct tipc_node *node)
@@ -459,7 +492,7 @@ void tipc_node_unlock(struct tipc_node *node)
 				TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
 				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
 				TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
-				TIPC_NAMED_MSG_EVT);
+				TIPC_NAMED_MSG_EVT | TIPC_BCAST_RESET);
 
 	spin_unlock_bh(&node->lock);
 
@@ -488,6 +521,9 @@ void tipc_node_unlock(struct tipc_node *node)
 
 	if (flags & TIPC_BCAST_MSG_EVT)
 		tipc_bclink_input(net);
+
+	if (flags & TIPC_BCAST_RESET)
+		tipc_link_reset_all(node);
 }
 
 /* Caller should hold node lock for the passed node */
@@ -542,17 +578,21 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 	msg.seq = cb->nlh->nlmsg_seq;
 
 	rcu_read_lock();
-
-	if (last_addr && !tipc_node_find(net, last_addr)) {
-		rcu_read_unlock();
-		/* We never set seq or call nl_dump_check_consistent() this
-		 * means that setting prev_seq here will cause the consistence
-		 * check to fail in the netlink callback handler. Resulting in
-		 * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if
-		 * the node state changed while we released the lock.
-		 */
-		cb->prev_seq = 1;
-		return -EPIPE;
+	if (last_addr) {
+		node = tipc_node_find(net, last_addr);
+		if (!node) {
+			rcu_read_unlock();
+			/* We never set seq or call nl_dump_check_consistent()
+			 * this means that setting prev_seq here will cause the
+			 * consistence check to fail in the netlink callback
+			 * handler. Resulting in the NLMSG_DONE message having
+			 * the NLM_F_DUMP_INTR flag set if the node state
+			 * changed while we released the lock.
+			 */
+			cb->prev_seq = 1;
+			return -EPIPE;
+		}
+		tipc_node_put(node);
 	}
 
 	list_for_each_entry_rcu(node, &tn->node_list, list) {

+ 9 - 3
net/tipc/node.h

@@ -64,7 +64,8 @@ enum {
 	TIPC_NOTIFY_LINK_UP		= (1 << 6),
 	TIPC_NOTIFY_LINK_DOWN		= (1 << 7),
 	TIPC_NAMED_MSG_EVT		= (1 << 8),
-	TIPC_BCAST_MSG_EVT		= (1 << 9)
+	TIPC_BCAST_MSG_EVT		= (1 << 9),
+	TIPC_BCAST_RESET		= (1 << 10)
 };
 
 /**
@@ -93,6 +94,7 @@ struct tipc_node_bclink {
 /**
  * struct tipc_node - TIPC node structure
  * @addr: network address of node
+ * @ref: reference counter to node object
  * @lock: spinlock governing access to structure
  * @net: the applicable net namespace
  * @hash: links to adjacent nodes in unsorted hash chain
@@ -114,6 +116,7 @@ struct tipc_node_bclink {
  */
 struct tipc_node {
 	u32 addr;
+	struct kref kref;
 	spinlock_t lock;
 	struct net *net;
 	struct hlist_node hash;
@@ -136,6 +139,7 @@ struct tipc_node {
 };
 
 struct tipc_node *tipc_node_find(struct net *net, u32 addr);
+void tipc_node_put(struct tipc_node *node);
 struct tipc_node *tipc_node_create(struct net *net, u32 addr);
 void tipc_node_stop(struct net *net);
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
@@ -170,10 +174,12 @@ static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)
 
 	node = tipc_node_find(net, addr);
 
-	if (likely(node))
+	if (likely(node)) {
 		mtu = node->act_mtus[selector & 1];
-	else
+		tipc_node_put(node);
+	} else {
 		mtu = MAX_MSG_SIZE;
+	}
 
 	return mtu;
 }