Browse Source

Merge branch 'tipc-separate-link-and-aggregation'

Jon Maloy says:

====================
tipc: separate link and link aggregation layer

This is the first batch of a longer series that has two main objectives:

o Finer lock granularity during message sending and reception,
  especially regarding usage of the node spinlock.

o Better separation between the link layer implementation and the link
  aggregation layer, represented by node.c::struct tipc_node.

Hopefully these changes also make this part of code somewhat easier
to comprehend and maintain.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 10 years ago
parent
commit
7781e5d10a
13 changed files with 1431 additions and 1018 deletions
  1. 26 5
      net/tipc/bcast.c
  2. 1 0
      net/tipc/bcast.h
  3. 26 0
      net/tipc/bearer.c
  4. 3 0
      net/tipc/bearer.h
  5. 5 0
      net/tipc/core.h
  6. 4 16
      net/tipc/discover.c
  7. 733 784
      net/tipc/link.c
  8. 27 47
      net/tipc/link.h
  9. 51 2
      net/tipc/msg.h
  10. 3 3
      net/tipc/name_distr.c
  11. 450 99
      net/tipc/node.c
  12. 65 28
      net/tipc/node.h
  13. 37 34
      net/tipc/socket.c

+ 26 - 5
net/tipc/bcast.c

@@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
 	}
 }
 
+void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
+{
+	u16 last = msg_last_bcast(hdr);
+	int mtyp = msg_type(hdr);
+
+	if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
+		return;
+	if (mtyp == STATE_MSG) {
+		tipc_bclink_update_link_state(n, last);
+		return;
+	}
+	/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
+	 * and transfer synch info in LINK_PROTOCOL messages.
+	 */
+	if (tipc_node_is_up(n))
+		return;
+	if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
+		return;
+	n->bclink.last_sent = last;
+	n->bclink.last_in = last;
+	n->bclink.oos_state = 0;
+}
+
 /**
  * bclink_peek_nack - monitor retransmission requests sent by other nodes
  *
@@ -358,10 +381,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
 
 	/* Prepare clone of message for local node */
 	skb = tipc_msg_reassemble(list);
-	if (unlikely(!skb)) {
-		__skb_queue_purge(list);
+	if (unlikely(!skb))
 		return -EHOSTUNREACH;
-	}
+
 	/* Broadcast to all nodes */
 	if (likely(bclink)) {
 		tipc_bclink_lock(net);
@@ -413,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
 	 * all nodes in the cluster don't ACK at the same time
 	 */
 	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
-		tipc_link_proto_xmit(node->active_links[node->addr & 1],
+		tipc_link_proto_xmit(node_active_link(node, node->addr),
 				     STATE_MSG, 0, 0, 0, 0);
 		tn->bcl->stats.sent_acks++;
 	}
@@ -925,7 +947,6 @@ int tipc_bclink_init(struct net *net)
 	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
 	bcl->bearer_id = MAX_BEARERS;
 	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
-	bcl->state = WORKING_WORKING;
 	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
 	msg_set_prevnode(bcl->pmsg, tn->own_addr);
 	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);

+ 1 - 0
net/tipc/bcast.h

@@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);
 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 void tipc_bclink_input(struct net *net);
+void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
 
 #endif

+ 26 - 0
net/tipc/bearer.c

@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
 	rcu_read_unlock();
 }
 
+/* tipc_bearer_xmit() -send buffer to destination over bearer
+ */
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+		      struct sk_buff_head *xmitq,
+		      struct tipc_media_addr *dst)
+{
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_bearer *b;
+	struct sk_buff *skb, *tmp;
+
+	if (skb_queue_empty(xmitq))
+		return;
+
+	rcu_read_lock();
+	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+	if (likely(b)) {
+		skb_queue_walk_safe(xmitq, skb, tmp) {
+			__skb_dequeue(xmitq);
+			b->media->send_msg(net, skb, b, dst);
+			/* Until we remove cloning in tipc_l2_send_msg(): */
+			kfree_skb(skb);
+		}
+	}
+	rcu_read_unlock();
+}
+
 /**
  * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
  * @buf: the received packet

+ 3 - 0
net/tipc/bearer.h

@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
 void tipc_bearer_stop(struct net *net);
 void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
 		      struct tipc_media_addr *dest);
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+		      struct sk_buff_head *xmitq,
+		      struct tipc_media_addr *dst);
 
 #endif	/* _TIPC_BEARER_H */

+ 5 - 0
net/tipc/core.h

@@ -129,6 +129,11 @@ static inline int less(u16 left, u16 right)
 	return less_eq(left, right) && (mod(right) != mod(left));
 }
 
+static inline int in_range(u16 val, u16 min, u16 max)
+{
+	return !less(val, min) && !more(val, max);
+}
+
 #ifdef CONFIG_SYSCTL
 int tipc_register_sysctl(void);
 void tipc_unregister_sysctl(void);

+ 4 - 16
net/tipc/discover.c

@@ -35,7 +35,7 @@
  */
 
 #include "core.h"
-#include "link.h"
+#include "node.h"
 #include "discover.h"
 
 /* min delay during bearer start up */
@@ -125,7 +125,6 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *node;
-	struct tipc_link *link;
 	struct tipc_media_addr maddr;
 	struct sk_buff *rbuf;
 	struct tipc_msg *msg = buf_msg(buf);
@@ -170,13 +169,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 		return;
 	tipc_node_lock(node);
 	node->capabilities = caps;
-	link = node->links[bearer->identity];
 
 	/* Prepare to validate requesting node's signature and media address */
 	sign_match = (signature == node->signature);
-	addr_match = link && !memcmp(&link->media_addr, &maddr, sizeof(maddr));
-	link_up = link && tipc_link_is_up(link);
-
+	tipc_node_check_dest(node, bearer, &link_up, &addr_match, &maddr);
 
 	/* These three flags give us eight permutations: */
 
@@ -239,16 +235,8 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
 	if (accept_sign)
 		node->signature = signature;
 
-	if (accept_addr) {
-		if (!link)
-			link = tipc_link_create(node, bearer, &maddr);
-		if (link) {
-			memcpy(&link->media_addr, &maddr, sizeof(maddr));
-			tipc_link_reset(link);
-		} else {
-			respond = false;
-		}
-	}
+	if (accept_addr && !tipc_node_update_dest(node, bearer, &maddr))
+		respond = false;
 
 	/* Send response, if necessary */
 	if (respond && (mtyp == DSC_REQ_MSG)) {

+ 733 - 784
net/tipc/link.c

@@ -77,36 +77,70 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
 };
 
 /*
- * Out-of-range value for link session numbers
+ * Interval between NACKs when packets arrive out of order
  */
-#define INVALID_SESSION 0x10000
-
+#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2)
 /*
- * Link state events:
+ * Out-of-range value for link session numbers
  */
-#define  STARTING_EVT    856384768	/* link processing trigger */
-#define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
-#define  SILENCE_EVT     560817u	/* timer dicovered silence from peer */
+#define WILDCARD_SESSION 0x10000
 
-/*
- * State value stored in 'failover_pkts'
+/* State value stored in 'failover_pkts'
  */
 #define FIRST_FAILOVER 0xffffu
 
-static void link_handle_out_of_seq_msg(struct tipc_link *link,
-				       struct sk_buff *skb);
-static void tipc_link_proto_rcv(struct tipc_link *link,
-				struct sk_buff *skb);
-static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
-static void link_state_event(struct tipc_link *l_ptr, u32 event);
+/* Link FSM states and events:
+ */
+enum {
+	TIPC_LINK_WORKING,
+	TIPC_LINK_PROBING,
+	TIPC_LINK_RESETTING,
+	TIPC_LINK_ESTABLISHING
+};
+
+enum {
+	PEER_RESET_EVT    = RESET_MSG,
+	ACTIVATE_EVT      = ACTIVATE_MSG,
+	TRAFFIC_EVT,      /* Any other valid msg from peer */
+	SILENCE_EVT       /* Peer was silent during last timer interval*/
+};
+
+/* Link FSM state checking routines
+ */
+static int link_working(struct tipc_link *l)
+{
+	return l->state == TIPC_LINK_WORKING;
+}
+
+static int link_probing(struct tipc_link *l)
+{
+	return l->state == TIPC_LINK_PROBING;
+}
+
+static int link_resetting(struct tipc_link *l)
+{
+	return l->state == TIPC_LINK_RESETTING;
+}
+
+static int link_establishing(struct tipc_link *l)
+{
+	return l->state == TIPC_LINK_ESTABLISHING;
+}
+
+static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
+			       struct sk_buff_head *xmitq);
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+				      u16 rcvgap, int tolerance, int priority,
+				      struct sk_buff_head *xmitq);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static void tipc_link_sync_xmit(struct tipc_link *l);
+static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
+					   struct sk_buff_head *xmitq);
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
-static void link_set_timer(struct tipc_link *link, unsigned long time);
+
 /*
  *  Simple link routines
  */
@@ -115,26 +149,13 @@ static unsigned int align(unsigned int i)
 	return (i + 3) & ~3u;
 }
 
-static void tipc_link_release(struct kref *kref)
-{
-	kfree(container_of(kref, struct tipc_link, ref));
-}
-
-static void tipc_link_get(struct tipc_link *l_ptr)
-{
-	kref_get(&l_ptr->ref);
-}
-
-static void tipc_link_put(struct tipc_link *l_ptr)
-{
-	kref_put(&l_ptr->ref, tipc_link_release);
-}
-
 static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
 {
-	if (l->owner->active_links[0] != l)
-		return l->owner->active_links[0];
-	return l->owner->active_links[1];
+	struct tipc_node *n = l->owner;
+
+	if (node_active_link(n, 0) != l)
+		return node_active_link(n, 0);
+	return node_active_link(n, 1);
 }
 
 /*
@@ -144,74 +165,14 @@ int tipc_link_is_up(struct tipc_link *l_ptr)
 {
 	if (!l_ptr)
 		return 0;
-	return link_working_working(l_ptr) || link_working_unknown(l_ptr);
+	return link_working(l_ptr) || link_probing(l_ptr);
 }
 
-int tipc_link_is_active(struct tipc_link *l_ptr)
+int tipc_link_is_active(struct tipc_link *l)
 {
-	return	(l_ptr->owner->active_links[0] == l_ptr) ||
-		(l_ptr->owner->active_links[1] == l_ptr);
-}
-
-/**
- * link_timeout - handle expiration of link timer
- * @l_ptr: pointer to link
- */
-static void link_timeout(unsigned long data)
-{
-	struct tipc_link *l_ptr = (struct tipc_link *)data;
-	struct sk_buff *skb;
-
-	tipc_node_lock(l_ptr->owner);
-
-	/* update counters used in statistical profiling of send traffic */
-	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
-	l_ptr->stats.queue_sz_counts++;
-
-	skb = skb_peek(&l_ptr->transmq);
-	if (skb) {
-		struct tipc_msg *msg = buf_msg(skb);
-		u32 length = msg_size(msg);
-
-		if ((msg_user(msg) == MSG_FRAGMENTER) &&
-		    (msg_type(msg) == FIRST_FRAGMENT)) {
-			length = msg_size(msg_get_wrapped(msg));
-		}
-		if (length) {
-			l_ptr->stats.msg_lengths_total += length;
-			l_ptr->stats.msg_length_counts++;
-			if (length <= 64)
-				l_ptr->stats.msg_length_profile[0]++;
-			else if (length <= 256)
-				l_ptr->stats.msg_length_profile[1]++;
-			else if (length <= 1024)
-				l_ptr->stats.msg_length_profile[2]++;
-			else if (length <= 4096)
-				l_ptr->stats.msg_length_profile[3]++;
-			else if (length <= 16384)
-				l_ptr->stats.msg_length_profile[4]++;
-			else if (length <= 32768)
-				l_ptr->stats.msg_length_profile[5]++;
-			else
-				l_ptr->stats.msg_length_profile[6]++;
-		}
-	}
-
-	/* do all other link processing performed on a periodic basis */
-	if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner))
-		link_state_event(l_ptr, SILENCE_EVT);
-	l_ptr->silent_intv_cnt++;
-	if (skb_queue_len(&l_ptr->backlogq))
-		tipc_link_push_packets(l_ptr);
-	link_set_timer(l_ptr, l_ptr->keepalive_intv);
-	tipc_node_unlock(l_ptr->owner);
-	tipc_link_put(l_ptr);
-}
+	struct tipc_node *n = l->owner;
 
-static void link_set_timer(struct tipc_link *link, unsigned long time)
-{
-	if (!mod_timer(&link->timer, jiffies + time))
-		tipc_link_get(link);
+	return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
 }
 
 /**
@@ -224,7 +185,9 @@ static void link_set_timer(struct tipc_link *link, unsigned long time)
  */
 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 				   struct tipc_bearer *b_ptr,
-				   const struct tipc_media_addr *media_addr)
+				   const struct tipc_media_addr *media_addr,
+				   struct sk_buff_head *inputq,
+				   struct sk_buff_head *namedq)
 {
 	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
 	struct tipc_link *l_ptr;
@@ -240,7 +203,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 		return NULL;
 	}
 
-	if (n_ptr->links[b_ptr->identity]) {
+	if (n_ptr->links[b_ptr->identity].link) {
 		tipc_addr_string_fill(addr_string, n_ptr->addr);
 		pr_err("Attempt to establish second link on <%s> to %s\n",
 		       b_ptr->name, addr_string);
@@ -252,7 +215,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 		pr_warn("Link creation failed, no memory\n");
 		return NULL;
 	}
-	kref_init(&l_ptr->ref);
 	l_ptr->addr = peer;
 	if_name = strchr(b_ptr->name, ':') + 1;
 	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
@@ -263,10 +225,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 		/* note: peer i/f name is updated by reset/activate message */
 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 	l_ptr->owner = n_ptr;
-	l_ptr->peer_session = INVALID_SESSION;
+	l_ptr->peer_session = WILDCARD_SESSION;
 	l_ptr->bearer_id = b_ptr->identity;
-	link_set_supervision_props(l_ptr, b_ptr->tolerance);
-	l_ptr->state = RESET_UNKNOWN;
+	l_ptr->tolerance = b_ptr->tolerance;
+	l_ptr->state = TIPC_LINK_RESETTING;
 
 	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 	msg = l_ptr->pmsg;
@@ -286,13 +248,11 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 	__skb_queue_head_init(&l_ptr->backlogq);
 	__skb_queue_head_init(&l_ptr->deferdq);
 	skb_queue_head_init(&l_ptr->wakeupq);
-	skb_queue_head_init(&l_ptr->inputq);
-	skb_queue_head_init(&l_ptr->namedq);
+	l_ptr->inputq = inputq;
+	l_ptr->namedq = namedq;
+	skb_queue_head_init(l_ptr->inputq);
 	link_reset_statistics(l_ptr);
 	tipc_node_attach_link(n_ptr, l_ptr);
-	setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
-	link_state_event(l_ptr, STARTING_EVT);
-
 	return l_ptr;
 }
 
@@ -303,13 +263,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 void tipc_link_delete(struct tipc_link *l)
 {
 	tipc_link_reset(l);
-	if (del_timer(&l->timer))
-		tipc_link_put(l);
-	l->flags |= LINK_STOPPED;
-	/* Delete link now, or when timer is finished: */
 	tipc_link_reset_fragments(l);
 	tipc_node_detach_link(l->owner, l);
-	tipc_link_put(l);
 }
 
 void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
@@ -321,7 +276,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
 	rcu_read_lock();
 	list_for_each_entry_rcu(node, &tn->node_list, list) {
 		tipc_node_lock(node);
-		link = node->links[bearer_id];
+		link = node->links[bearer_id].link;
 		if (link)
 			tipc_link_delete(link);
 		tipc_node_unlock(node);
@@ -329,12 +284,219 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
 	rcu_read_unlock();
 }
 
+/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
+ *
+ * Give a newly added peer node the sequence number where it should
+ * start receiving and acking broadcast packets.
+ */
+static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
+					   struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb;
+	struct sk_buff_head list;
+
+	skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
+			      0, l->addr, link_own_addr(l), 0, 0, 0);
+	if (!skb)
+		return;
+	__skb_queue_head_init(&list);
+	__skb_queue_tail(&list, skb);
+	tipc_link_xmit(l, &list, xmitq);
+}
+
+/**
+ * tipc_link_fsm_evt - link finite state machine
+ * @l: pointer to link
+ * @evt: state machine event to be processed
+ * @xmitq: queue to prepend created protocol message, if any
+ */
+static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
+			     struct sk_buff_head *xmitq)
+{
+	int mtyp = 0, rc = 0;
+	struct tipc_link *pl;
+	enum {
+		LINK_RESET     = 1,
+		LINK_ACTIVATE  = (1 << 1),
+		SND_PROBE      = (1 << 2),
+		SND_STATE      = (1 << 3),
+		SND_RESET      = (1 << 4),
+		SND_ACTIVATE   = (1 << 5),
+		SND_BCAST_SYNC = (1 << 6)
+	} actions = 0;
+
+	if (l->exec_mode == TIPC_LINK_BLOCKED)
+		return rc;
+
+	switch (l->state) {
+	case TIPC_LINK_WORKING:
+		switch (evt) {
+		case TRAFFIC_EVT:
+		case ACTIVATE_EVT:
+			break;
+		case SILENCE_EVT:
+			l->state = TIPC_LINK_PROBING;
+			actions |= SND_PROBE;
+			break;
+		case PEER_RESET_EVT:
+			actions |= LINK_RESET | SND_ACTIVATE;
+			break;
+		default:
+			pr_debug("%s%u WORKING\n", link_unk_evt, evt);
+		}
+		break;
+	case TIPC_LINK_PROBING:
+		switch (evt) {
+		case TRAFFIC_EVT:
+		case ACTIVATE_EVT:
+			l->state = TIPC_LINK_WORKING;
+			break;
+		case PEER_RESET_EVT:
+			actions |= LINK_RESET | SND_ACTIVATE;
+			break;
+		case SILENCE_EVT:
+			if (l->silent_intv_cnt <= l->abort_limit) {
+				actions |= SND_PROBE;
+				break;
+			}
+			actions |= LINK_RESET | SND_RESET;
+			break;
+		default:
+			pr_err("%s%u PROBING\n", link_unk_evt, evt);
+		}
+		break;
+	case TIPC_LINK_RESETTING:
+		switch (evt) {
+		case TRAFFIC_EVT:
+			break;
+		case ACTIVATE_EVT:
+			pl = node_active_link(l->owner, 0);
+			if (pl && link_probing(pl))
+				break;
+			actions |= LINK_ACTIVATE;
+			if (!l->owner->working_links)
+				actions |= SND_BCAST_SYNC;
+			break;
+		case PEER_RESET_EVT:
+			l->state = TIPC_LINK_ESTABLISHING;
+			actions |= SND_ACTIVATE;
+			break;
+		case SILENCE_EVT:
+			actions |= SND_RESET;
+			break;
+		default:
+			pr_err("%s%u in RESETTING\n", link_unk_evt, evt);
+		}
+		break;
+	case TIPC_LINK_ESTABLISHING:
+		switch (evt) {
+		case TRAFFIC_EVT:
+		case ACTIVATE_EVT:
+			pl = node_active_link(l->owner, 0);
+			if (pl && link_probing(pl))
+				break;
+			actions |= LINK_ACTIVATE;
+			if (!l->owner->working_links)
+				actions |= SND_BCAST_SYNC;
+			break;
+		case PEER_RESET_EVT:
+			break;
+		case SILENCE_EVT:
+			actions |= SND_ACTIVATE;
+			break;
+		default:
+			pr_err("%s%u ESTABLISHING\n", link_unk_evt, evt);
+		}
+		break;
+	default:
+		pr_err("Unknown link state %u/%u\n", l->state, evt);
+	}
+
+	/* Perform actions as decided by FSM */
+	if (actions & LINK_RESET) {
+		l->exec_mode = TIPC_LINK_BLOCKED;
+		rc |= TIPC_LINK_DOWN_EVT;
+	}
+	if (actions & LINK_ACTIVATE) {
+		l->exec_mode = TIPC_LINK_OPEN;
+		rc |= TIPC_LINK_UP_EVT;
+	}
+	if (actions & (SND_STATE | SND_PROBE))
+		mtyp = STATE_MSG;
+	if (actions & SND_RESET)
+		mtyp = RESET_MSG;
+	if (actions & SND_ACTIVATE)
+		mtyp = ACTIVATE_MSG;
+	if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
+		tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
+					  0, 0, 0, xmitq);
+	if (actions & SND_BCAST_SYNC)
+		tipc_link_build_bcast_sync_msg(l, xmitq);
+	return rc;
+}
+
+/* link_profile_stats - update statistical profiling of traffic
+ */
+static void link_profile_stats(struct tipc_link *l)
+{
+	struct sk_buff *skb;
+	struct tipc_msg *msg;
+	int length;
+
+	/* Update counters used in statistical profiling of send traffic */
+	l->stats.accu_queue_sz += skb_queue_len(&l->transmq);
+	l->stats.queue_sz_counts++;
+
+	skb = skb_peek(&l->transmq);
+	if (!skb)
+		return;
+	msg = buf_msg(skb);
+	length = msg_size(msg);
+
+	if (msg_user(msg) == MSG_FRAGMENTER) {
+		if (msg_type(msg) != FIRST_FRAGMENT)
+			return;
+		length = msg_size(msg_get_wrapped(msg));
+	}
+	l->stats.msg_lengths_total += length;
+	l->stats.msg_length_counts++;
+	if (length <= 64)
+		l->stats.msg_length_profile[0]++;
+	else if (length <= 256)
+		l->stats.msg_length_profile[1]++;
+	else if (length <= 1024)
+		l->stats.msg_length_profile[2]++;
+	else if (length <= 4096)
+		l->stats.msg_length_profile[3]++;
+	else if (length <= 16384)
+		l->stats.msg_length_profile[4]++;
+	else if (length <= 32768)
+		l->stats.msg_length_profile[5]++;
+	else
+		l->stats.msg_length_profile[6]++;
+}
+
+/* tipc_link_timeout - perform periodic task as instructed from node timeout
+ */
+int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
+{
+	int rc = 0;
+
+	link_profile_stats(l);
+	if (l->silent_intv_cnt)
+		rc = tipc_link_fsm_evt(l, SILENCE_EVT, xmitq);
+	else if (link_working(l) && tipc_bclink_acks_missing(l->owner))
+		tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
+	l->silent_intv_cnt++;
+	return rc;
+}
+
 /**
  * link_schedule_user - schedule a message sender for wakeup after congestion
  * @link: congested link
  * @list: message that was attempted sent
  * Create pseudo msg to send back to user when congestion abates
- * Only consumes message if there is an error
+ * Does not consume buffer list
  */
 static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 {
@@ -347,8 +509,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 	/* This really cannot happen...  */
 	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
 		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-		tipc_link_reset(link);
-		goto err;
+		return -ENOBUFS;
 	}
 	/* Non-blocking sender: */
 	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
@@ -358,15 +519,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
 			      addr, addr, oport, 0, 0);
 	if (!skb)
-		goto err;
+		return -ENOBUFS;
 	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
 	TIPC_SKB_CB(skb)->chain_imp = imp;
 	skb_queue_tail(&link->wakeupq, skb);
 	link->stats.link_congs++;
 	return -ELINKCONG;
-err:
-	__skb_queue_purge(list);
-	return -ENOBUFS;
 }
 
 /**
@@ -388,8 +546,8 @@ void link_prepare_wakeup(struct tipc_link *l)
 		if ((pnd[imp] + l->backlog[imp].len) >= lim)
 			break;
 		skb_unlink(skb, &l->wakeupq);
-		skb_queue_tail(&l->inputq, skb);
-		l->owner->inputq = &l->inputq;
+		skb_queue_tail(l->inputq, skb);
+		l->owner->inputq = l->inputq;
 		l->owner->action_flags |= TIPC_MSG_EVT;
 	}
 }
@@ -436,21 +594,22 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 
 	/* Link is down, accept any session */
-	l_ptr->peer_session = INVALID_SESSION;
+	l_ptr->peer_session = WILDCARD_SESSION;
 
 	/* Prepare for renewed mtu size negotiation */
 	l_ptr->mtu = l_ptr->advertised_mtu;
 
-	l_ptr->state = RESET_UNKNOWN;
+	l_ptr->state = TIPC_LINK_RESETTING;
 
-	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
+	if ((prev_state == TIPC_LINK_RESETTING) ||
+	    (prev_state == TIPC_LINK_ESTABLISHING))
 		return;
 
-	tipc_node_link_down(l_ptr->owner, l_ptr);
+	tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
 	tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
 
 	if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
-		l_ptr->flags |= LINK_FAILINGOVER;
+		l_ptr->exec_mode = TIPC_LINK_BLOCKED;
 		l_ptr->failover_checkpt = l_ptr->rcv_nxt;
 		pl->failover_pkts = FIRST_FAILOVER;
 		pl->failover_checkpt = l_ptr->rcv_nxt;
@@ -462,7 +621,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 	__skb_queue_purge(&l_ptr->transmq);
 	__skb_queue_purge(&l_ptr->deferdq);
 	if (!owner->inputq)
-		owner->inputq = &l_ptr->inputq;
+		owner->inputq = l_ptr->inputq;
 	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 	if (!skb_queue_empty(owner->inputq))
 		owner->action_flags |= TIPC_MSG_EVT;
@@ -470,173 +629,32 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 	l_ptr->reasm_buf = NULL;
 	l_ptr->rcv_unacked = 0;
 	l_ptr->snd_nxt = 1;
+	l_ptr->rcv_nxt = 1;
 	l_ptr->silent_intv_cnt = 0;
+	l_ptr->stats.recv_info = 0;
 	l_ptr->stale_count = 0;
 	link_reset_statistics(l_ptr);
 }
 
-static void link_activate(struct tipc_link *link)
+void tipc_link_activate(struct tipc_link *link)
 {
 	struct tipc_node *node = link->owner;
 
 	link->rcv_nxt = 1;
 	link->stats.recv_info = 1;
 	link->silent_intv_cnt = 0;
-	tipc_node_link_up(node, link);
+	link->state = TIPC_LINK_WORKING;
+	link->exec_mode = TIPC_LINK_OPEN;
+	tipc_node_link_up(node, link->bearer_id);
 	tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
 }
 
-/**
- * link_state_event - link finite state machine
- * @l_ptr: pointer to link
- * @event: state machine event to process
- */
-static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
-{
-	struct tipc_link *other;
-	unsigned long timer_intv = l_ptr->keepalive_intv;
-
-	if (l_ptr->flags & LINK_STOPPED)
-		return;
-
-	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
-		return;		/* Not yet. */
-
-	if (l_ptr->flags & LINK_FAILINGOVER)
-		return;
-
-	switch (l_ptr->state) {
-	case WORKING_WORKING:
-		switch (event) {
-		case TRAFFIC_MSG_EVT:
-		case ACTIVATE_MSG:
-			l_ptr->silent_intv_cnt = 0;
-			break;
-		case SILENCE_EVT:
-			if (!l_ptr->silent_intv_cnt) {
-				if (tipc_bclink_acks_missing(l_ptr->owner))
-					tipc_link_proto_xmit(l_ptr, STATE_MSG,
-							     0, 0, 0, 0);
-				break;
-			}
-			l_ptr->state = WORKING_UNKNOWN;
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-			break;
-		case RESET_MSG:
-			pr_debug("%s<%s>, requested by peer\n",
-				 link_rst_msg, l_ptr->name);
-			tipc_link_reset(l_ptr);
-			l_ptr->state = RESET_RESET;
-			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-					     0, 0, 0, 0);
-			break;
-		default:
-			pr_debug("%s%u in WW state\n", link_unk_evt, event);
-		}
-		break;
-	case WORKING_UNKNOWN:
-		switch (event) {
-		case TRAFFIC_MSG_EVT:
-		case ACTIVATE_MSG:
-			l_ptr->state = WORKING_WORKING;
-			l_ptr->silent_intv_cnt = 0;
-			break;
-		case RESET_MSG:
-			pr_debug("%s<%s>, requested by peer while probing\n",
-				 link_rst_msg, l_ptr->name);
-			tipc_link_reset(l_ptr);
-			l_ptr->state = RESET_RESET;
-			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-					     0, 0, 0, 0);
-			break;
-		case SILENCE_EVT:
-			if (!l_ptr->silent_intv_cnt) {
-				l_ptr->state = WORKING_WORKING;
-				if (tipc_bclink_acks_missing(l_ptr->owner))
-					tipc_link_proto_xmit(l_ptr, STATE_MSG,
-							     0, 0, 0, 0);
-			} else if (l_ptr->silent_intv_cnt <
-				   l_ptr->abort_limit) {
-				tipc_link_proto_xmit(l_ptr, STATE_MSG,
-						     1, 0, 0, 0);
-			} else {	/* Link has failed */
-				pr_debug("%s<%s>, peer not responding\n",
-					 link_rst_msg, l_ptr->name);
-				tipc_link_reset(l_ptr);
-				l_ptr->state = RESET_UNKNOWN;
-				tipc_link_proto_xmit(l_ptr, RESET_MSG,
-						     0, 0, 0, 0);
-			}
-			break;
-		default:
-			pr_err("%s%u in WU state\n", link_unk_evt, event);
-		}
-		break;
-	case RESET_UNKNOWN:
-		switch (event) {
-		case TRAFFIC_MSG_EVT:
-			break;
-		case ACTIVATE_MSG:
-			other = l_ptr->owner->active_links[0];
-			if (other && link_working_unknown(other))
-				break;
-			l_ptr->state = WORKING_WORKING;
-			link_activate(l_ptr);
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-			if (l_ptr->owner->working_links == 1)
-				tipc_link_sync_xmit(l_ptr);
-			break;
-		case RESET_MSG:
-			l_ptr->state = RESET_RESET;
-			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-					     1, 0, 0, 0);
-			break;
-		case STARTING_EVT:
-			l_ptr->flags |= LINK_STARTED;
-			link_set_timer(l_ptr, timer_intv);
-			break;
-		case SILENCE_EVT:
-			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);
-			break;
-		default:
-			pr_err("%s%u in RU state\n", link_unk_evt, event);
-		}
-		break;
-	case RESET_RESET:
-		switch (event) {
-		case TRAFFIC_MSG_EVT:
-		case ACTIVATE_MSG:
-			other = l_ptr->owner->active_links[0];
-			if (other && link_working_unknown(other))
-				break;
-			l_ptr->state = WORKING_WORKING;
-			link_activate(l_ptr);
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-			if (l_ptr->owner->working_links == 1)
-				tipc_link_sync_xmit(l_ptr);
-			break;
-		case RESET_MSG:
-			break;
-		case SILENCE_EVT:
-			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-					     0, 0, 0, 0);
-			break;
-		default:
-			pr_err("%s%u in RR state\n", link_unk_evt, event);
-		}
-		break;
-	default:
-		pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
-	}
-}
-
 /**
  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
  * @link: link to use
  * @list: chain of buffers containing message
  *
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain, except when returning an error code,
  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
@@ -660,10 +678,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
 			return link_schedule_user(link, list);
 	}
-	if (unlikely(msg_size(msg) > mtu)) {
-		__skb_queue_purge(list);
+	if (unlikely(msg_size(msg) > mtu))
 		return -EMSGSIZE;
-	}
+
 	/* Prepare each packet for sending, and add to relevant queue: */
 	while (skb_queue_len(list)) {
 		skb = skb_peek(list);
@@ -700,101 +717,90 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 	return 0;
 }
 
-static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
-{
-	skb_queue_head_init(list);
-	__skb_queue_tail(list, skb);
-}
-
-static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
-{
-	struct sk_buff_head head;
-
-	skb2list(skb, &head);
-	return __tipc_link_xmit(link->owner->net, link, &head);
-}
-
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not be rejected
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
-		       u32 selector)
-{
-	struct sk_buff_head head;
-	int rc;
-
-	skb2list(skb, &head);
-	rc = tipc_link_xmit(net, &head, dnode, selector);
-	if (rc == -ELINKCONG)
-		kfree_skb(skb);
-	return 0;
-}
-
 /**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
  * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
-		   u32 selector)
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+		   struct sk_buff_head *xmitq)
 {
-	struct tipc_link *link = NULL;
-	struct tipc_node *node;
-	int rc = -EHOSTUNREACH;
+	struct tipc_msg *hdr = buf_msg(skb_peek(list));
+	unsigned int maxwin = l->window;
+	unsigned int i, imp = msg_importance(hdr);
+	unsigned int mtu = l->mtu;
+	u16 ack = l->rcv_nxt - 1;
+	u16 seqno = l->snd_nxt;
+	u16 bc_last_in = l->owner->bclink.last_in;
+	struct sk_buff_head *transmq = &l->transmq;
+	struct sk_buff_head *backlogq = &l->backlogq;
+	struct sk_buff *skb, *_skb, *bskb;
 
-	node = tipc_node_find(net, dnode);
-	if (node) {
-		tipc_node_lock(node);
-		link = node->active_links[selector & 1];
-		if (link)
-			rc = __tipc_link_xmit(net, link, list);
-		tipc_node_unlock(node);
-		tipc_node_put(node);
+	/* Match msg importance against this and all higher backlog limits: */
+	for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+		if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+			return link_schedule_user(l, list);
 	}
-	if (link)
-		return rc;
+	if (unlikely(msg_size(hdr) > mtu))
+		return -EMSGSIZE;
 
-	if (likely(in_own_node(net, dnode))) {
-		tipc_sk_rcv(net, list);
-		return 0;
-	}
+	/* Prepare each packet for sending, and add to relevant queue: */
+	while (skb_queue_len(list)) {
+		skb = skb_peek(list);
+		hdr = buf_msg(skb);
+		msg_set_seqno(hdr, seqno);
+		msg_set_ack(hdr, ack);
+		msg_set_bcast_ack(hdr, bc_last_in);
 
-	__skb_queue_purge(list);
-	return rc;
+		if (likely(skb_queue_len(transmq) < maxwin)) {
+			_skb = skb_clone(skb, GFP_ATOMIC);
+			if (!_skb)
+				return -ENOBUFS;
+			__skb_dequeue(list);
+			__skb_queue_tail(transmq, skb);
+			__skb_queue_tail(xmitq, _skb);
+			l->rcv_unacked = 0;
+			seqno++;
+			continue;
+		}
+		if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+			kfree_skb(__skb_dequeue(list));
+			l->stats.sent_bundled++;
+			continue;
+		}
+		if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+			kfree_skb(__skb_dequeue(list));
+			__skb_queue_tail(backlogq, bskb);
+			l->backlog[msg_importance(buf_msg(bskb))].len++;
+			l->stats.sent_bundled++;
+			l->stats.sent_bundles++;
+			continue;
+		}
+		l->backlog[imp].len += skb_queue_len(list);
+		skb_queue_splice_tail_init(list, backlogq);
+	}
+	l->snd_nxt = seqno;
+	return 0;
 }
 
-/*
- * tipc_link_sync_xmit - synchronize broadcast link endpoints.
- *
- * Give a newly added peer node the sequence number where it should
- * start receiving and acking broadcast packets.
- *
- * Called with node locked
- */
-static void tipc_link_sync_xmit(struct tipc_link *link)
+static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 {
-	struct sk_buff *skb;
-	struct tipc_msg *msg;
+	skb_queue_head_init(list);
+	__skb_queue_tail(list, skb);
+}
 
-	skb = tipc_buf_acquire(INT_H_SIZE);
-	if (!skb)
-		return;
+static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
+{
+	struct sk_buff_head head;
 
-	msg = buf_msg(skb);
-	tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
-		      INT_H_SIZE, link->addr);
-	msg_set_last_bcast(msg, link->owner->bclink.acked);
-	__tipc_link_xmit_skb(link, skb);
+	skb2list(skb, &head);
+	return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
 /*
@@ -847,6 +853,34 @@ void tipc_link_push_packets(struct tipc_link *link)
 	link->snd_nxt = seqno;
 }
 
+void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb, *_skb;
+	struct tipc_msg *hdr;
+	u16 seqno = l->snd_nxt;
+	u16 ack = l->rcv_nxt - 1;
+
+	while (skb_queue_len(&l->transmq) < l->window) {
+		skb = skb_peek(&l->backlogq);
+		if (!skb)
+			break;
+		_skb = skb_clone(skb, GFP_ATOMIC);
+		if (!_skb)
+			break;
+		__skb_dequeue(&l->backlogq);
+		hdr = buf_msg(skb);
+		l->backlog[msg_importance(hdr)].len--;
+		__skb_queue_tail(&l->transmq, skb);
+		__skb_queue_tail(xmitq, _skb);
+		msg_set_ack(hdr, ack);
+		msg_set_seqno(hdr, seqno);
+		msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+		l->rcv_unacked = 0;
+		seqno++;
+	}
+	l->snd_nxt = seqno;
+}
+
 void tipc_link_reset_all(struct tipc_node *node)
 {
 	char addr_string[16];
@@ -858,9 +892,9 @@ void tipc_link_reset_all(struct tipc_node *node)
 		tipc_addr_string_fill(addr_string, node->addr));
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		if (node->links[i]) {
-			link_print(node->links[i], "Resetting link\n");
-			tipc_link_reset(node->links[i]);
+		if (node->links[i].link) {
+			link_print(node->links[i].link, "Resetting link\n");
+			tipc_link_reset(node->links[i].link);
 		}
 	}
 
@@ -877,9 +911,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 
 	if (l_ptr->addr) {
 		/* Handle failure on standard link */
-		link_print(l_ptr, "Resetting link\n");
+		link_print(l_ptr, "Resetting link ");
+		pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
+			msg_user(msg), msg_type(msg), msg_size(msg),
+			msg_errcode(msg));
+		pr_info("sqno %u, prev: %x, src: %x\n",
+			msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
 		tipc_link_reset(l_ptr);
-
 	} else {
 		/* Handle failure on broadcast link */
 		struct tipc_node *n_ptr;
@@ -940,6 +978,41 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
 	}
 }
 
+static int tipc_link_retransm(struct tipc_link *l, int retransm,
+			      struct sk_buff_head *xmitq)
+{
+	struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
+	struct tipc_msg *hdr;
+
+	if (!skb)
+		return 0;
+
+	/* Detect repeated retransmit failures on same packet */
+	if (likely(l->last_retransm != buf_seqno(skb))) {
+		l->last_retransm = buf_seqno(skb);
+		l->stale_count = 1;
+	} else if (++l->stale_count > 100) {
+		link_retransmit_failure(l, skb);
+		return TIPC_LINK_DOWN_EVT;
+	}
+	skb_queue_walk(&l->transmq, skb) {
+		if (!retransm)
+			return 0;
+		hdr = buf_msg(skb);
+		_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+		if (!_skb)
+			return 0;
+		hdr = buf_msg(_skb);
+		msg_set_ack(hdr, l->rcv_nxt - 1);
+		msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+		_skb->priority = TC_PRIO_CONTROL;
+		__skb_queue_tail(xmitq, _skb);
+		retransm--;
+		l->stats.retransmitted++;
+	}
+	return 0;
+}
+
 /* link_synch(): check if all packets arrived before the synch
  *               point have been consumed
  * Returns true if the parallel links are synched, otherwise false
@@ -959,168 +1032,13 @@ static bool link_synch(struct tipc_link *l)
 
 	/* Is it still in the input queue ? */
 	post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
-	if (skb_queue_len(&pl->inputq) > post_synch)
+	if (skb_queue_len(pl->inputq) > post_synch)
 		return false;
 synched:
-	l->flags &= ~LINK_SYNCHING;
+	l->exec_mode = TIPC_LINK_OPEN;
 	return true;
 }
 
-static void link_retrieve_defq(struct tipc_link *link,
-			       struct sk_buff_head *list)
-{
-	u16 seq_no;
-
-	if (skb_queue_empty(&link->deferdq))
-		return;
-
-	seq_no = buf_seqno(skb_peek(&link->deferdq));
-	if (seq_no == link->rcv_nxt)
-		skb_queue_splice_tail_init(&link->deferdq, list);
-}
-
-/**
- * tipc_rcv - process TIPC packets/messages arriving from off-node
- * @net: the applicable net namespace
- * @skb: TIPC packet
- * @b_ptr: pointer to bearer message arrived on
- *
- * Invoked with no locks held.  Bearer pointer must point to a valid bearer
- * structure (i.e. cannot be NULL), but bearer can be inactive.
- */
-void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
-{
-	struct tipc_net *tn = net_generic(net, tipc_net_id);
-	struct sk_buff_head head;
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
-	struct sk_buff *skb1, *tmp;
-	struct tipc_msg *msg;
-	u16 seq_no;
-	u16 ackd;
-	u32 released;
-
-	skb2list(skb, &head);
-
-	while ((skb = __skb_dequeue(&head))) {
-		/* Ensure message is well-formed */
-		if (unlikely(!tipc_msg_validate(skb)))
-			goto discard;
-
-		/* Handle arrival of a non-unicast link message */
-		msg = buf_msg(skb);
-		if (unlikely(msg_non_seq(msg))) {
-			if (msg_user(msg) ==  LINK_CONFIG)
-				tipc_disc_rcv(net, skb, b_ptr);
-			else
-				tipc_bclink_rcv(net, skb);
-			continue;
-		}
-
-		/* Discard unicast link messages destined for another node */
-		if (unlikely(!msg_short(msg) &&
-			     (msg_destnode(msg) != tn->own_addr)))
-			goto discard;
-
-		/* Locate neighboring node that sent message */
-		n_ptr = tipc_node_find(net, msg_prevnode(msg));
-		if (unlikely(!n_ptr))
-			goto discard;
-
-		tipc_node_lock(n_ptr);
-		/* Locate unicast link endpoint that should handle message */
-		l_ptr = n_ptr->links[b_ptr->identity];
-		if (unlikely(!l_ptr))
-			goto unlock;
-
-		/* Verify that communication with node is currently allowed */
-		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
-		    msg_user(msg) == LINK_PROTOCOL &&
-		    (msg_type(msg) == RESET_MSG ||
-		    msg_type(msg) == ACTIVATE_MSG) &&
-		    !msg_redundant_link(msg))
-			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
-
-		if (tipc_node_blocked(n_ptr))
-			goto unlock;
-
-		/* Validate message sequence number info */
-		seq_no = msg_seqno(msg);
-		ackd = msg_ack(msg);
-
-		/* Release acked messages */
-		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
-			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
-
-		released = 0;
-		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
-			if (more(buf_seqno(skb1), ackd))
-				break;
-			 __skb_unlink(skb1, &l_ptr->transmq);
-			 kfree_skb(skb1);
-			 released = 1;
-		}
-
-		/* Try sending any messages link endpoint has pending */
-		if (unlikely(skb_queue_len(&l_ptr->backlogq)))
-			tipc_link_push_packets(l_ptr);
-
-		if (released && !skb_queue_empty(&l_ptr->wakeupq))
-			link_prepare_wakeup(l_ptr);
-
-		/* Process the incoming packet */
-		if (unlikely(!link_working_working(l_ptr))) {
-			if (msg_user(msg) == LINK_PROTOCOL) {
-				tipc_link_proto_rcv(l_ptr, skb);
-				link_retrieve_defq(l_ptr, &head);
-				skb = NULL;
-				goto unlock;
-			}
-
-			/* Traffic message. Conditionally activate link */
-			link_state_event(l_ptr, TRAFFIC_MSG_EVT);
-
-			if (link_working_working(l_ptr)) {
-				/* Re-insert buffer in front of queue */
-				__skb_queue_head(&head, skb);
-				skb = NULL;
-				goto unlock;
-			}
-			goto unlock;
-		}
-
-		/* Link is now in state WORKING_WORKING */
-		if (unlikely(seq_no != l_ptr->rcv_nxt)) {
-			link_handle_out_of_seq_msg(l_ptr, skb);
-			link_retrieve_defq(l_ptr, &head);
-			skb = NULL;
-			goto unlock;
-		}
-		l_ptr->silent_intv_cnt = 0;
-
-		/* Synchronize with parallel link if applicable */
-		if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
-			if (!link_synch(l_ptr))
-				goto unlock;
-		}
-		l_ptr->rcv_nxt++;
-		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
-			link_retrieve_defq(l_ptr, &head);
-		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
-			l_ptr->stats.sent_acks++;
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
-		}
-		tipc_link_input(l_ptr, skb);
-		skb = NULL;
-unlock:
-		tipc_node_unlock(n_ptr);
-		tipc_node_put(n_ptr);
-discard:
-		if (unlikely(skb))
-			kfree_skb(skb);
-	}
-}
-
 /* tipc_data_input - deliver data and name distr msgs to upper layer
  *
  * Consumes buffer if message is of right type
@@ -1138,16 +1056,16 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
 	case CONN_MANAGER:
-		if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
-			node->inputq = &link->inputq;
+		if (tipc_skb_queue_tail(link->inputq, skb, dport)) {
+			node->inputq = link->inputq;
 			node->action_flags |= TIPC_MSG_EVT;
 		}
 		return true;
 	case NAME_DISTRIBUTOR:
 		node->bclink.recv_permitted = true;
-		node->namedq = &link->namedq;
-		skb_queue_tail(&link->namedq, skb);
-		if (skb_queue_len(&link->namedq) == 1)
+		node->namedq = link->namedq;
+		skb_queue_tail(link->namedq, skb);
+		if (skb_queue_len(link->namedq) == 1)
 			node->action_flags |= TIPC_NAMED_MSG_EVT;
 		return true;
 	case MSG_BUNDLER:
@@ -1174,13 +1092,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
 	struct sk_buff *iskb;
 	int pos = 0;
 
-	if (likely(tipc_data_input(link, skb)))
-		return;
-
 	switch (msg_user(msg)) {
 	case TUNNEL_PROTOCOL:
 		if (msg_dup(msg)) {
-			link->flags |= LINK_SYNCHING;
+			link->exec_mode = TIPC_LINK_TUNNEL;
 			link->synch_point = msg_seqno(msg_get_wrapped(msg));
 			kfree_skb(skb);
 			break;
@@ -1215,6 +1130,110 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
 	};
 }
 
+static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
+{
+	bool released = false;
+	struct sk_buff *skb, *tmp;
+
+	skb_queue_walk_safe(&l->transmq, skb, tmp) {
+		if (more(buf_seqno(skb), acked))
+			break;
+		__skb_unlink(skb, &l->transmq);
+		kfree_skb(skb);
+		released = true;
+	}
+	return released;
+}
+
+/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
+ * @link: the link that should handle the message
+ * @skb: TIPC packet
+ * @xmitq: queue to place packets to be sent after this call
+ */
+int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
+		  struct sk_buff_head *xmitq)
+{
+	struct sk_buff_head *arrvq = &l->deferdq;
+	struct sk_buff *tmp;
+	struct tipc_msg *hdr;
+	u16 seqno, rcv_nxt;
+	int rc = 0;
+
+	if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
+		if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
+			tipc_link_build_proto_msg(l, STATE_MSG, 0,
+						  0, 0, 0, xmitq);
+		return rc;
+	}
+
+	skb_queue_walk_safe(arrvq, skb, tmp) {
+		hdr = buf_msg(skb);
+
+		/* Verify and update link state */
+		if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
+			__skb_dequeue(arrvq);
+			rc |= tipc_link_proto_rcv(l, skb, xmitq);
+			continue;
+		}
+
+		if (unlikely(!link_working(l))) {
+			rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
+			if (!link_working(l)) {
+				kfree_skb(__skb_dequeue(arrvq));
+				return rc;
+			}
+		}
+
+		l->silent_intv_cnt = 0;
+
+		/* Forward queues and wake up waiting users */
+		if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
+			tipc_link_advance_backlog(l, xmitq);
+			if (unlikely(!skb_queue_empty(&l->wakeupq)))
+				link_prepare_wakeup(l);
+		}
+
+		/* Defer reception if there is a gap in the sequence */
+		seqno = msg_seqno(hdr);
+		rcv_nxt = l->rcv_nxt;
+		if (unlikely(less(rcv_nxt, seqno))) {
+			l->stats.deferred_recv++;
+			return rc;
+		}
+
+		__skb_dequeue(arrvq);
+
+		/* Drop if packet already received */
+		if (unlikely(more(rcv_nxt, seqno))) {
+			l->stats.duplicates++;
+			kfree_skb(skb);
+			return rc;
+		}
+
+		/* Synchronize with parallel link if applicable */
+		if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
+			if (!msg_dup(hdr) && !link_synch(l)) {
+				kfree_skb(skb);
+				return rc;
+			}
+
+		/* Packet can be delivered */
+		l->rcv_nxt++;
+		l->stats.recv_info++;
+		if (unlikely(!tipc_data_input(l, skb)))
+			tipc_link_input(l, skb);
+
+		/* Ack at regular intervals */
+		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
+			l->rcv_unacked = 0;
+			l->stats.sent_acks++;
+			tipc_link_build_proto_msg(l, STATE_MSG,
+						  0, 0, 0, 0, xmitq);
+		}
+	}
+	return rc;
+}
+
 /**
  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
  *
@@ -1255,235 +1274,85 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
 }
 
 /*
- * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
+ * Send protocol message to the other endpoint.
  */
-static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
-				       struct sk_buff *buf)
+void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
+			  u32 gap, u32 tolerance, u32 priority)
 {
-	u32 seq_no = buf_seqno(buf);
-
-	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
-		tipc_link_proto_rcv(l_ptr, buf);
-		return;
-	}
-
-	/* Record OOS packet arrival */
-	l_ptr->silent_intv_cnt = 0;
+	struct sk_buff *skb = NULL;
+	struct sk_buff_head xmitq;
 
-	/*
-	 * Discard packet if a duplicate; otherwise add it to deferred queue
-	 * and notify peer of gap as per protocol specification
-	 */
-	if (less(seq_no, l_ptr->rcv_nxt)) {
-		l_ptr->stats.duplicates++;
-		kfree_skb(buf);
+	__skb_queue_head_init(&xmitq);
+	tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap,
+				  tolerance, priority, &xmitq);
+	skb = __skb_dequeue(&xmitq);
+	if (!skb)
 		return;
-	}
-
-	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
-		l_ptr->stats.deferred_recv++;
-		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
-	} else {
-		l_ptr->stats.duplicates++;
-	}
+	tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
+	l->rcv_unacked = 0;
+	kfree_skb(skb);
 }
 
-/*
- * Send protocol message to the other endpoint.
+/* tipc_link_build_proto_msg: prepare link protocol message for transmission
  */
-void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
-			  u32 gap, u32 tolerance, u32 priority)
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+				      u16 rcvgap, int tolerance, int priority,
+				      struct sk_buff_head *xmitq)
 {
-	struct sk_buff *buf = NULL;
-	struct tipc_msg *msg = l_ptr->pmsg;
-	u32 msg_size = sizeof(l_ptr->proto_msg);
-	int r_flag;
-	u16 last_rcv;
-
-	/* Don't send protocol message during link failover */
-	if (l_ptr->flags & LINK_FAILINGOVER)
-		return;
-
-	/* Abort non-RESET send if communication with node is prohibited */
-	if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
+	struct sk_buff *skb = NULL;
+	struct tipc_msg *hdr = l->pmsg;
+	u16 snd_nxt = l->snd_nxt;
+	u16 rcv_nxt = l->rcv_nxt;
+	u16 rcv_last = rcv_nxt - 1;
+	int node_up = l->owner->bclink.recv_permitted;
+
+	/* Don't send protocol message during reset or link failover */
+	if (l->exec_mode == TIPC_LINK_BLOCKED)
 		return;
 
-	/* Create protocol message with "out-of-sequence" sequence number */
-	msg_set_type(msg, msg_typ);
-	msg_set_net_plane(msg, l_ptr->net_plane);
-	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-	msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
+	msg_set_type(hdr, mtyp);
+	msg_set_net_plane(hdr, l->net_plane);
+	msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+	msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net));
+	msg_set_link_tolerance(hdr, tolerance);
+	msg_set_linkprio(hdr, priority);
+	msg_set_redundant_link(hdr, node_up);
+	msg_set_seq_gap(hdr, 0);
 
-	if (msg_typ == STATE_MSG) {
-		u16 next_sent = l_ptr->snd_nxt;
+	/* Compatibility: created msg must not be in sequence with pkt flow */
+	msg_set_seqno(hdr, snd_nxt + U16_MAX / 2);
 
-		if (!tipc_link_is_up(l_ptr))
+	if (mtyp == STATE_MSG) {
+		if (!tipc_link_is_up(l))
 			return;
-		msg_set_next_sent(msg, next_sent);
-		if (!skb_queue_empty(&l_ptr->deferdq)) {
-			last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq));
-			gap = mod(last_rcv - l_ptr->rcv_nxt);
+		msg_set_next_sent(hdr, snd_nxt);
+
+		/* Override rcvgap if there are packets in deferred queue */
+		if (!skb_queue_empty(&l->deferdq))
+			rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt;
+		if (rcvgap) {
+			msg_set_seq_gap(hdr, rcvgap);
+			l->stats.sent_nacks++;
 		}
-		msg_set_seq_gap(msg, gap);
-		if (gap)
-			l_ptr->stats.sent_nacks++;
-		msg_set_link_tolerance(msg, tolerance);
-		msg_set_linkprio(msg, priority);
-		msg_set_max_pkt(msg, l_ptr->mtu);
-		msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
-		msg_set_probe(msg, probe_msg != 0);
-		if (probe_msg)
-			l_ptr->stats.sent_probes++;
-		l_ptr->stats.sent_states++;
-	} else {		/* RESET_MSG or ACTIVATE_MSG */
-		msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));
-		msg_set_seq_gap(msg, 0);
-		msg_set_next_sent(msg, 1);
-		msg_set_probe(msg, 0);
-		msg_set_link_tolerance(msg, l_ptr->tolerance);
-		msg_set_linkprio(msg, l_ptr->priority);
-		msg_set_max_pkt(msg, l_ptr->advertised_mtu);
+		msg_set_ack(hdr, rcv_last);
+		msg_set_probe(hdr, probe);
+		if (probe)
+			l->stats.sent_probes++;
+		l->stats.sent_states++;
+	} else {
+		/* RESET_MSG or ACTIVATE_MSG */
+		msg_set_max_pkt(hdr, l->advertised_mtu);
+		msg_set_ack(hdr, l->failover_checkpt - 1);
+		msg_set_next_sent(hdr, 1);
 	}
-
-	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
-	msg_set_redundant_link(msg, r_flag);
-	msg_set_linkprio(msg, l_ptr->priority);
-	msg_set_size(msg, msg_size);
-
-	msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2)));
-
-	buf = tipc_buf_acquire(msg_size);
-	if (!buf)
+	skb = tipc_buf_acquire(msg_size(hdr));
+	if (!skb)
 		return;
-
-	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
-	buf->priority = TC_PRIO_CONTROL;
-	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
-			 &l_ptr->media_addr);
-	l_ptr->rcv_unacked = 0;
-	kfree_skb(buf);
+	skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
+	skb->priority = TC_PRIO_CONTROL;
+	__skb_queue_head(xmitq, skb);
 }
 
-/*
- * Receive protocol message :
- * Note that network plane id propagates through the network, and may
- * change at any time. The node with lowest address rules
- */
-static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
-				struct sk_buff *buf)
-{
-	u32 rec_gap = 0;
-	u32 msg_tol;
-	struct tipc_msg *msg = buf_msg(buf);
-
-	if (l_ptr->flags & LINK_FAILINGOVER)
-		goto exit;
-
-	if (l_ptr->net_plane != msg_net_plane(msg))
-		if (link_own_addr(l_ptr) > msg_prevnode(msg))
-			l_ptr->net_plane = msg_net_plane(msg);
-
-	switch (msg_type(msg)) {
-
-	case RESET_MSG:
-		if (!link_working_unknown(l_ptr) &&
-		    (l_ptr->peer_session != INVALID_SESSION)) {
-			if (less_eq(msg_session(msg), l_ptr->peer_session))
-				break; /* duplicate or old reset: ignore */
-		}
-
-		if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
-				link_working_unknown(l_ptr))) {
-			/*
-			 * peer has lost contact -- don't allow peer's links
-			 * to reactivate before we recognize loss & clean up
-			 */
-			l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
-		}
-
-		link_state_event(l_ptr, RESET_MSG);
-
-		/* fall thru' */
-	case ACTIVATE_MSG:
-		/* Update link settings according other endpoint's values */
-		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
-
-		msg_tol = msg_link_tolerance(msg);
-		if (msg_tol > l_ptr->tolerance)
-			link_set_supervision_props(l_ptr, msg_tol);
-
-		if (msg_linkprio(msg) > l_ptr->priority)
-			l_ptr->priority = msg_linkprio(msg);
-
-		if (l_ptr->mtu > msg_max_pkt(msg))
-			l_ptr->mtu = msg_max_pkt(msg);
-
-		/* Synchronize broadcast link info, if not done previously */
-		if (!tipc_node_is_up(l_ptr->owner)) {
-			l_ptr->owner->bclink.last_sent =
-				l_ptr->owner->bclink.last_in =
-				msg_last_bcast(msg);
-			l_ptr->owner->bclink.oos_state = 0;
-		}
-
-		l_ptr->peer_session = msg_session(msg);
-		l_ptr->peer_bearer_id = msg_bearer_id(msg);
-
-		if (msg_type(msg) == ACTIVATE_MSG)
-			link_state_event(l_ptr, ACTIVATE_MSG);
-		break;
-	case STATE_MSG:
-
-		msg_tol = msg_link_tolerance(msg);
-		if (msg_tol)
-			link_set_supervision_props(l_ptr, msg_tol);
-
-		if (msg_linkprio(msg) &&
-		    (msg_linkprio(msg) != l_ptr->priority)) {
-			pr_debug("%s<%s>, priority change %u->%u\n",
-				 link_rst_msg, l_ptr->name,
-				 l_ptr->priority, msg_linkprio(msg));
-			l_ptr->priority = msg_linkprio(msg);
-			tipc_link_reset(l_ptr); /* Enforce change to take effect */
-			break;
-		}
-
-		/* Record reception; force mismatch at next timeout: */
-		l_ptr->silent_intv_cnt = 0;
-
-		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
-		l_ptr->stats.recv_states++;
-		if (link_reset_unknown(l_ptr))
-			break;
-
-		if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
-			rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
-
-		if (msg_probe(msg))
-			l_ptr->stats.recv_probes++;
-
-		/* Protocol message before retransmits, reduce loss risk */
-		if (l_ptr->owner->bclink.recv_permitted)
-			tipc_bclink_update_link_state(l_ptr->owner,
-						      msg_last_bcast(msg));
-
-		if (rec_gap || (msg_probe(msg))) {
-			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
-					     rec_gap, 0, 0);
-		}
-		if (msg_seq_gap(msg)) {
-			l_ptr->stats.recv_nacks++;
-			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
-					     msg_seq_gap(msg));
-		}
-		break;
-	}
-exit:
-	kfree_skb(buf);
-}
-
-
 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
  * a different bearer. Owner node is locked.
  */
@@ -1496,7 +1365,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 	struct sk_buff *skb;
 	u32 length = msg_size(msg);
 
-	tunnel = l_ptr->owner->active_links[selector & 1];
+	tunnel = node_active_link(l_ptr->owner, selector & 1);
 	if (!tipc_link_is_up(tunnel)) {
 		pr_warn("%stunnel link no longer available\n", link_co_err);
 		return;
@@ -1522,7 +1391,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
 	int msgcount;
-	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
+	struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
 	struct tipc_msg tunnel_hdr;
 	struct sk_buff *skb;
 	int split_bundles;
@@ -1556,8 +1425,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 		return;
 	}
 
-	split_bundles = (l_ptr->owner->active_links[0] !=
-			 l_ptr->owner->active_links[1]);
+	split_bundles = (node_active_link(l_ptr->owner, 0) !=
+			 node_active_link(l_ptr->owner, 0));
 
 	skb_queue_walk(&l_ptr->transmq, skb) {
 		struct tipc_msg *msg = buf_msg(skb);
@@ -1660,7 +1529,7 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
 	if (bearer_id == link->bearer_id)
 		goto exit;
 
-	pl = link->owner->links[bearer_id];
+	pl = link->owner->links[bearer_id].link;
 	if (pl && tipc_link_is_up(pl))
 		tipc_link_reset(pl);
 
@@ -1691,22 +1560,100 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
 	}
 exit:
 	if (!link->failover_pkts && pl)
-		pl->flags &= ~LINK_FAILINGOVER;
+		pl->exec_mode = TIPC_LINK_OPEN;
 	kfree_skb(*skb);
 	*skb = iskb;
 	return *skb;
 }
 
-static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
+/* tipc_link_proto_rcv(): receive link level protocol message :
+ * Note that network plane id propagates through the network, and may
+ * change at any time. The node with lowest numerical id determines
+ * network plane
+ */
+static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
+			       struct sk_buff_head *xmitq)
 {
-	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
+	struct tipc_msg *hdr = buf_msg(skb);
+	u16 rcvgap = 0;
+	u16 nacked_gap = msg_seq_gap(hdr);
+	u16 peers_snd_nxt =  msg_next_sent(hdr);
+	u16 peers_tol = msg_link_tolerance(hdr);
+	u16 peers_prio = msg_linkprio(hdr);
+	char *if_name;
+	int rc = 0;
 
-	if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
-		return;
+	if (l->exec_mode == TIPC_LINK_BLOCKED)
+		goto exit;
+
+	if (link_own_addr(l) > msg_prevnode(hdr))
+		l->net_plane = msg_net_plane(hdr);
+
+	switch (msg_type(hdr)) {
+	case RESET_MSG:
+
+		/* Ignore duplicate RESET with old session number */
+		if ((less_eq(msg_session(hdr), l->peer_session)) &&
+		    (l->peer_session != WILDCARD_SESSION))
+			break;
+		/* fall thru' */
+	case ACTIVATE_MSG:
+
+		/* Complete own link name with peer's interface name */
+		if_name =  strrchr(l->name, ':') + 1;
+		if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME)
+			break;
+		if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
+			break;
+		strncpy(if_name, msg_data(hdr),	TIPC_MAX_IF_NAME);
+
+		/* Update own tolerance if peer indicates a non-zero value */
+		if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
+			l->tolerance = peers_tol;
 
-	l_ptr->tolerance = tol;
-	l_ptr->keepalive_intv = msecs_to_jiffies(intv);
-	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv));
+		/* Update own priority if peer's priority is higher */
+		if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI))
+			l->priority = peers_prio;
+
+		l->peer_session = msg_session(hdr);
+		l->peer_bearer_id = msg_bearer_id(hdr);
+		rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq);
+		if (l->mtu > msg_max_pkt(hdr))
+			l->mtu = msg_max_pkt(hdr);
+		break;
+	case STATE_MSG:
+		/* Update own tolerance if peer indicates a non-zero value */
+		if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
+			l->tolerance = peers_tol;
+
+		l->silent_intv_cnt = 0;
+		l->stats.recv_states++;
+		if (msg_probe(hdr))
+			l->stats.recv_probes++;
+		rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
+		if (!tipc_link_is_up(l))
+			break;
+
+		/* Has peer sent packets we haven't received yet ? */
+		if (more(peers_snd_nxt, l->rcv_nxt))
+			rcvgap = peers_snd_nxt - l->rcv_nxt;
+		if (rcvgap || (msg_probe(hdr)))
+			tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
+						  0, l->mtu, xmitq);
+		tipc_link_release_pkts(l, msg_ack(hdr));
+
+		/* If NACK, retransmit will now start at right position */
+		if (nacked_gap) {
+			rc |= tipc_link_retransm(l, nacked_gap, xmitq);
+			l->stats.recv_nacks++;
+		}
+		tipc_link_advance_backlog(l, xmitq);
+		if (unlikely(!skb_queue_empty(&l->wakeupq)))
+			link_prepare_wakeup(l);
+	}
+exit:
+	kfree_skb(skb);
+	return rc;
 }
 
 void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
@@ -1743,7 +1690,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
 	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
 		tipc_node_lock(n_ptr);
 		for (i = 0; i < MAX_BEARERS; i++) {
-			l_ptr = n_ptr->links[i];
+			l_ptr = n_ptr->links[i].link;
 			if (l_ptr && !strcmp(l_ptr->name, link_name)) {
 				*bearer_id = i;
 				found_node = n_ptr;
@@ -1770,27 +1717,28 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
 	l_ptr->stats.recv_info = l_ptr->rcv_nxt;
 }
 
-static void link_print(struct tipc_link *l_ptr, const char *str)
+static void link_print(struct tipc_link *l, const char *str)
 {
-	struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
-	struct tipc_bearer *b_ptr;
-
-	rcu_read_lock();
-	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
-	if (b_ptr)
-		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
-	rcu_read_unlock();
-
-	if (link_working_unknown(l_ptr))
-		pr_cont(":WU\n");
-	else if (link_reset_reset(l_ptr))
-		pr_cont(":RR\n");
-	else if (link_reset_unknown(l_ptr))
-		pr_cont(":RU\n");
-	else if (link_working_working(l_ptr))
-		pr_cont(":WW\n");
+	struct sk_buff *hskb = skb_peek(&l->transmq);
+	u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt;
+	u16 tail = l->snd_nxt - 1;
+
+	pr_info("%s Link <%s>:", str, l->name);
+
+	if (link_probing(l))
+		pr_cont(":P\n");
+	else if (link_establishing(l))
+		pr_cont(":E\n");
+	else if (link_resetting(l))
+		pr_cont(":R\n");
+	else if (link_working(l))
+		pr_cont(":W\n");
 	else
 		pr_cont("\n");
+
+	pr_info("XMTQ: %u [%u-%u], BKLGQ: %u, SNDNX: %u, RCVNX: %u\n",
+		skb_queue_len(&l->transmq), head, tail,
+		skb_queue_len(&l->backlogq), l->snd_nxt, l->rcv_nxt);
 }
 
 /* Parse and validate nested (link) properties valid for media, bearer and link
@@ -1865,7 +1813,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
 
 	tipc_node_lock(node);
 
-	link = node->links[bearer_id];
+	link = node->links[bearer_id].link;
 	if (!link) {
 		res = -EINVAL;
 		goto out;
@@ -1885,7 +1833,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
 			u32 tol;
 
 			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
-			link_set_supervision_props(link, tol);
+			link->tolerance = tol;
 			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);
 		}
 		if (props[TIPC_NLA_PROP_PRIO]) {
@@ -2055,10 +2003,11 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
 	for (i = *prev_link; i < MAX_BEARERS; i++) {
 		*prev_link = i;
 
-		if (!node->links[i])
+		if (!node->links[i].link)
 			continue;
 
-		err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI);
+		err = __tipc_nl_add_link(net, msg,
+					 node->links[i].link, NLM_F_MULTI);
 		if (err)
 			return err;
 	}
@@ -2172,7 +2121,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
 			return -EINVAL;
 
 		tipc_node_lock(node);
-		link = node->links[bearer_id];
+		link = node->links[bearer_id].link;
 		if (!link) {
 			tipc_node_unlock(node);
 			nlmsg_free(msg.skb);
@@ -2227,7 +2176,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
 
 	tipc_node_lock(node);
 
-	link = node->links[bearer_id];
+	link = node->links[bearer_id].link;
 	if (!link) {
 		tipc_node_unlock(node);
 		return -EINVAL;

+ 27 - 47
net/tipc/link.h

@@ -49,19 +49,21 @@
  */
 #define INVALID_LINK_SEQ 0x10000
 
-/* Link working states
+
+/* Link endpoint receive states
  */
-#define WORKING_WORKING 560810u
-#define WORKING_UNKNOWN 560811u
-#define RESET_UNKNOWN   560812u
-#define RESET_RESET     560813u
+enum {
+	TIPC_LINK_OPEN,
+	TIPC_LINK_BLOCKED,
+	TIPC_LINK_TUNNEL
+};
 
-/* Link endpoint execution states
+/* Events returned from link at packet reception or at timeout
  */
-#define LINK_STARTED     0x0001
-#define LINK_STOPPED     0x0002
-#define LINK_SYNCHING    0x0004
-#define LINK_FAILINGOVER 0x0008
+enum {
+	TIPC_LINK_UP_EVT       = 1,
+	TIPC_LINK_DOWN_EVT     = (1 << 1)
+};
 
 /* Starting value for maximum packet size negotiation on unicast links
  * (unless bearer MTU is less)
@@ -106,7 +108,6 @@ struct tipc_stats {
  * @timer: link timer
  * @owner: pointer to peer node
  * @refcnt: reference counter for permanent references (owner node & timer)
- * @flags: execution state flags for link endpoint instance
  * @peer_session: link session # being used by peer end of link
  * @peer_bearer_id: bearer id used by link's peer endpoint
  * @bearer_id: local bearer id used by link
@@ -119,6 +120,7 @@ struct tipc_stats {
  * @pmsg: convenience pointer to "proto_msg" field
  * @priority: current link priority
  * @net_plane: current link network plane ('A' through 'H')
+ * @exec_mode: transmit/receive mode for link endpoint instance
  * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
  * @exp_msg_count: # of tunnelled messages expected during link changeover
  * @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
@@ -144,12 +146,9 @@ struct tipc_link {
 	u32 addr;
 	char name[TIPC_MAX_LINK_NAME];
 	struct tipc_media_addr media_addr;
-	struct timer_list timer;
 	struct tipc_node *owner;
-	struct kref ref;
 
 	/* Management and link supervision data */
-	unsigned int flags;
 	u32 peer_session;
 	u32 peer_bearer_id;
 	u32 bearer_id;
@@ -165,6 +164,7 @@ struct tipc_link {
 	struct tipc_msg *pmsg;
 	u32 priority;
 	char net_plane;
+	u8 exec_mode;
 	u16 synch_point;
 
 	/* Failover */
@@ -192,8 +192,8 @@ struct tipc_link {
 	u16 rcv_nxt;
 	u32 rcv_unacked;
 	struct sk_buff_head deferdq;
-	struct sk_buff_head inputq;
-	struct sk_buff_head namedq;
+	struct sk_buff_head *inputq;
+	struct sk_buff_head *namedq;
 
 	/* Congestion handling */
 	struct sk_buff_head wakeupq;
@@ -207,9 +207,11 @@ struct tipc_link {
 
 struct tipc_port;
 
-struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
-			      struct tipc_bearer *b_ptr,
-			      const struct tipc_media_addr *media_addr);
+struct tipc_link *tipc_link_create(struct tipc_node *n,
+				   struct tipc_bearer *b,
+				   const struct tipc_media_addr *maddr,
+				   struct sk_buff_head *inputq,
+				   struct sk_buff_head *namedq);
 void tipc_link_delete(struct tipc_link *link);
 void tipc_link_delete_list(struct net *net, unsigned int bearer_id);
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
@@ -221,12 +223,11 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
 void tipc_link_purge_backlog(struct tipc_link *l);
 void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
-		       u32 selector);
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
-		   u32 selector);
+void tipc_link_activate(struct tipc_link *link);
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		     struct sk_buff_head *list);
+int tipc_link_xmit(struct tipc_link *link,	struct sk_buff_head *list,
+		   struct sk_buff_head *xmitq);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
 			  u32 gap, u32 tolerance, u32 priority);
 void tipc_link_push_packets(struct tipc_link *l_ptr);
@@ -243,33 +244,12 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
 int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
 int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
 void link_prepare_wakeup(struct tipc_link *l);
-
+int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
+int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
+		  struct sk_buff_head *xmitq);
 static inline u32 link_own_addr(struct tipc_link *l)
 {
 	return msg_prevnode(l->pmsg);
 }
 
-/*
- * Link status checking routines
- */
-static inline int link_working_working(struct tipc_link *l_ptr)
-{
-	return l_ptr->state == WORKING_WORKING;
-}
-
-static inline int link_working_unknown(struct tipc_link *l_ptr)
-{
-	return l_ptr->state == WORKING_UNKNOWN;
-}
-
-static inline int link_reset_unknown(struct tipc_link *l_ptr)
-{
-	return l_ptr->state == RESET_UNKNOWN;
-}
-
-static inline int link_reset_reset(struct tipc_link *l_ptr)
-{
-	return l_ptr->state == RESET_RESET;
-}
-
 #endif

+ 51 - 2
net/tipc/msg.h

@@ -38,6 +38,7 @@
 #define _TIPC_MSG_H
 
 #include <linux/tipc.h>
+#include "core.h"
 
 /*
  * Constants and routines used to read and write TIPC payload message headers
@@ -658,12 +659,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
 /*
  * Word 5
  */
-static inline u32 msg_session(struct tipc_msg *m)
+static inline u16 msg_session(struct tipc_msg *m)
 {
 	return msg_bits(m, 5, 16, 0xffff);
 }
 
-static inline void msg_set_session(struct tipc_msg *m, u32 n)
+static inline void msg_set_session(struct tipc_msg *m, u16 n)
 {
 	msg_set_bits(m, 5, 16, 0xffff, n);
 }
@@ -766,6 +767,22 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
+static inline bool msg_is_traffic(struct tipc_msg *m)
+{
+	if (likely(msg_user(m) != LINK_PROTOCOL))
+		return true;
+	if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
+		return false;
+	return true;
+}
+
+static inline bool msg_peer_is_up(struct tipc_msg *m)
+{
+	if (likely(msg_is_traffic(m)))
+		return false;
+	return msg_redundant_link(m);
+}
+
 struct sk_buff *tipc_buf_acquire(u32 size);
 bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
@@ -879,4 +896,36 @@ static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
 	return rv;
 }
 
+/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
+ * @list: list to be appended to
+ * @skb: buffer to add
+ * Returns true if queue should treated further, otherwise false
+ */
+static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
+					   struct sk_buff *skb)
+{
+	struct sk_buff *_skb, *tmp;
+	struct tipc_msg *hdr = buf_msg(skb);
+	u16 seqno = msg_seqno(hdr);
+
+	if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) {
+		__skb_queue_head(list, skb);
+		return true;
+	}
+	if (likely(less(seqno, buf_seqno(skb_peek(list))))) {
+		__skb_queue_head(list, skb);
+		return true;
+	}
+	if (!more(seqno, buf_seqno(skb_peek_tail(list)))) {
+		skb_queue_walk_safe(list, _skb, tmp) {
+			if (likely(less(seqno, buf_seqno(_skb)))) {
+				__skb_queue_before(list, _skb, skb);
+				return true;
+			}
+		}
+	}
+	__skb_queue_tail(list, skb);
+	return false;
+}
+
 #endif

+ 3 - 3
net/tipc/name_distr.c

@@ -96,13 +96,13 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
 		dnode = node->addr;
 		if (in_own_node(net, dnode))
 			continue;
-		if (!tipc_node_active_links(node))
+		if (!tipc_node_is_up(node))
 			continue;
 		oskb = pskb_copy(skb, GFP_ATOMIC);
 		if (!oskb)
 			break;
 		msg_set_destnode(buf_msg(oskb), dnode);
-		tipc_link_xmit_skb(net, oskb, dnode, dnode);
+		tipc_node_xmit_skb(net, oskb, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
 			 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
 	rcu_read_unlock();
 
-	tipc_link_xmit(net, &head, dnode, dnode);
+	tipc_node_xmit(net, &head, dnode, dnode);
 }
 
 static void tipc_publ_subscribe(struct net *net, struct publication *publ,

+ 450 - 99
net/tipc/node.c

@@ -40,10 +40,13 @@
 #include "name_distr.h"
 #include "socket.h"
 #include "bcast.h"
+#include "discover.h"
 
 static void node_lost_contact(struct tipc_node *n_ptr);
 static void node_established_contact(struct tipc_node *n_ptr);
 static void tipc_node_delete(struct tipc_node *node);
+static void tipc_node_timeout(unsigned long data);
+static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
 
 struct tipc_sock_conn {
 	u32 port;
@@ -132,6 +135,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->publ_list);
 	INIT_LIST_HEAD(&n_ptr->conn_sks);
+	skb_queue_head_init(&n_ptr->bclink.namedq);
 	__skb_queue_head_init(&n_ptr->bclink.deferdq);
 	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
@@ -139,14 +143,32 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
 			break;
 	}
 	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
-	n_ptr->action_flags = TIPC_WAIT_PEER_LINKS_DOWN;
+	n_ptr->state = SELF_DOWN_PEER_LEAVING;
 	n_ptr->signature = INVALID_NODE_SIG;
+	n_ptr->active_links[0] = INVALID_BEARER_ID;
+	n_ptr->active_links[1] = INVALID_BEARER_ID;
 	tipc_node_get(n_ptr);
+	setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
+	n_ptr->keepalive_intv = U32_MAX;
 exit:
 	spin_unlock_bh(&tn->node_list_lock);
 	return n_ptr;
 }
 
+static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
+{
+	unsigned long tol = l->tolerance;
+	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
+	unsigned long keepalive_intv = msecs_to_jiffies(intv);
+
+	/* Link with lowest tolerance determines timer interval */
+	if (keepalive_intv < n->keepalive_intv)
+		n->keepalive_intv = keepalive_intv;
+
+	/* Ensure link's abort limit corresponds to current interval */
+	l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
+}
+
 static void tipc_node_delete(struct tipc_node *node)
 {
 	list_del_rcu(&node->list);
@@ -160,8 +182,11 @@ void tipc_node_stop(struct net *net)
 	struct tipc_node *node, *t_node;
 
 	spin_lock_bh(&tn->node_list_lock);
-	list_for_each_entry_safe(node, t_node, &tn->node_list, list)
+	list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
+		if (del_timer(&node->timer))
+			tipc_node_put(node);
 		tipc_node_put(node);
+	}
 	spin_unlock_bh(&tn->node_list_lock);
 }
 
@@ -219,131 +244,170 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 	tipc_node_put(node);
 }
 
+/* tipc_node_timeout - handle expiration of node timer
+ */
+static void tipc_node_timeout(unsigned long data)
+{
+	struct tipc_node *n = (struct tipc_node *)data;
+	struct sk_buff_head xmitq;
+	struct tipc_link *l;
+	struct tipc_media_addr *maddr;
+	int bearer_id;
+	int rc = 0;
+
+	__skb_queue_head_init(&xmitq);
+
+	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
+		tipc_node_lock(n);
+		l = n->links[bearer_id].link;
+		if (l) {
+			/* Link tolerance may change asynchronously: */
+			tipc_node_calculate_timer(n, l);
+			rc = tipc_link_timeout(l, &xmitq);
+			if (rc & TIPC_LINK_DOWN_EVT)
+				tipc_link_reset(l);
+		}
+		tipc_node_unlock(n);
+		maddr = &n->links[bearer_id].maddr;
+		tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
+	}
+	if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
+		tipc_node_get(n);
+	tipc_node_put(n);
+}
+
 /**
  * tipc_node_link_up - handle addition of link
  *
  * Link becomes active (alone or shared) or standby, depending on its priority.
  */
-void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
+void tipc_node_link_up(struct tipc_node *n, int bearer_id)
 {
-	struct tipc_link **active = &n_ptr->active_links[0];
+	int *slot0 = &n->active_links[0];
+	int *slot1 = &n->active_links[1];
+	struct tipc_link_entry *links = n->links;
+	struct tipc_link *l = n->links[bearer_id].link;
 
-	n_ptr->working_links++;
-	n_ptr->action_flags |= TIPC_NOTIFY_LINK_UP;
-	n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id;
+	/* Leave room for tunnel header when returning 'mtu' to users: */
+	links[bearer_id].mtu = l->mtu - INT_H_SIZE;
+
+	n->working_links++;
+	n->action_flags |= TIPC_NOTIFY_LINK_UP;
+	n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
 
 	pr_debug("Established link <%s> on network plane %c\n",
-		 l_ptr->name, l_ptr->net_plane);
+		 l->name, l->net_plane);
 
-	if (!active[0]) {
-		active[0] = active[1] = l_ptr;
-		node_established_contact(n_ptr);
-		goto exit;
+	/* No active links ? => take both active slots */
+	if (*slot0 < 0) {
+		*slot0 = bearer_id;
+		*slot1 = bearer_id;
+		node_established_contact(n);
+		return;
 	}
-	if (l_ptr->priority < active[0]->priority) {
-		pr_debug("New link <%s> becomes standby\n", l_ptr->name);
-		goto exit;
+
+	/* Lower prio than current active ? => no slot */
+	if (l->priority < links[*slot0].link->priority) {
+		pr_debug("New link <%s> becomes standby\n", l->name);
+		return;
 	}
-	tipc_link_dup_queue_xmit(active[0], l_ptr);
-	if (l_ptr->priority == active[0]->priority) {
-		active[0] = l_ptr;
-		goto exit;
+	tipc_link_dup_queue_xmit(links[*slot0].link, l);
+
+	/* Same prio as current active ? => take one slot */
+	if (l->priority == links[*slot0].link->priority) {
+		*slot0 = bearer_id;
+		return;
 	}
-	pr_debug("Old link <%s> becomes standby\n", active[0]->name);
-	if (active[1] != active[0])
-		pr_debug("Old link <%s> becomes standby\n", active[1]->name);
-	active[0] = active[1] = l_ptr;
-exit:
-	/* Leave room for changeover header when returning 'mtu' to users: */
-	n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE;
-	n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;
+
+	/* Higher prio than current active => take both active slots */
+	pr_debug("Old link <%s> now standby\n", links[*slot0].link->name);
+	*slot0 = bearer_id;
+	*slot1 = bearer_id;
 }
 
 /**
- * node_select_active_links - select active link
+ * tipc_node_link_down - handle loss of link
  */
-static void node_select_active_links(struct tipc_node *n_ptr)
+void tipc_node_link_down(struct tipc_node *n, int bearer_id)
 {
-	struct tipc_link **active = &n_ptr->active_links[0];
-	u32 i;
-	u32 highest_prio = 0;
+	int *slot0 = &n->active_links[0];
+	int *slot1 = &n->active_links[1];
+	int i, highest = 0;
+	struct tipc_link *l, *_l;
 
-	active[0] = active[1] = NULL;
+	l = n->links[bearer_id].link;
+	n->working_links--;
+	n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
+	n->link_id = l->peer_bearer_id << 16 | l->bearer_id;
 
-	for (i = 0; i < MAX_BEARERS; i++) {
-		struct tipc_link *l_ptr = n_ptr->links[i];
+	pr_debug("Lost link <%s> on network plane %c\n",
+		 l->name, l->net_plane);
 
-		if (!l_ptr || !tipc_link_is_up(l_ptr) ||
-		    (l_ptr->priority < highest_prio))
+	/* Select new active link if any available */
+	*slot0 = INVALID_BEARER_ID;
+	*slot1 = INVALID_BEARER_ID;
+	for (i = 0; i < MAX_BEARERS; i++) {
+		_l = n->links[i].link;
+		if (!_l || !tipc_link_is_up(_l))
+			continue;
+		if (_l->priority < highest)
+			continue;
+		if (_l->priority > highest) {
+			highest = _l->priority;
+			*slot0 = i;
+			*slot1 = i;
 			continue;
-
-		if (l_ptr->priority > highest_prio) {
-			highest_prio = l_ptr->priority;
-			active[0] = active[1] = l_ptr;
-		} else {
-			active[1] = l_ptr;
 		}
+		*slot1 = i;
 	}
+	if (tipc_node_is_up(n))
+		tipc_link_failover_send_queue(l);
+	else
+		node_lost_contact(n);
 }
 
-/**
- * tipc_node_link_down - handle loss of link
- */
-void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
+bool tipc_node_is_up(struct tipc_node *n)
 {
-	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
-	struct tipc_link **active;
-
-	n_ptr->working_links--;
-	n_ptr->action_flags |= TIPC_NOTIFY_LINK_DOWN;
-	n_ptr->link_id = l_ptr->peer_bearer_id << 16 | l_ptr->bearer_id;
-
-	if (!tipc_link_is_active(l_ptr)) {
-		pr_debug("Lost standby link <%s> on network plane %c\n",
-			 l_ptr->name, l_ptr->net_plane);
-		return;
-	}
-	pr_debug("Lost link <%s> on network plane %c\n",
-		 l_ptr->name, l_ptr->net_plane);
-
-	active = &n_ptr->active_links[0];
-	if (active[0] == l_ptr)
-		active[0] = active[1];
-	if (active[1] == l_ptr)
-		active[1] = active[0];
-	if (active[0] == l_ptr)
-		node_select_active_links(n_ptr);
-	if (tipc_node_is_up(n_ptr))
-		tipc_link_failover_send_queue(l_ptr);
-	else
-		node_lost_contact(n_ptr);
-
-	/* Leave room for changeover header when returning 'mtu' to users: */
-	if (active[0]) {
-		n_ptr->act_mtus[0] = active[0]->mtu - INT_H_SIZE;
-		n_ptr->act_mtus[1] = active[1]->mtu - INT_H_SIZE;
-		return;
-	}
-	/* Loopback link went down? No fragmentation needed from now on. */
-	if (n_ptr->addr == tn->own_addr) {
-		n_ptr->act_mtus[0] = MAX_MSG_SIZE;
-		n_ptr->act_mtus[1] = MAX_MSG_SIZE;
-	}
+	return n->active_links[0] != INVALID_BEARER_ID;
 }
 
-int tipc_node_active_links(struct tipc_node *n_ptr)
+void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *b,
+			  bool *link_up, bool *addr_match,
+			  struct tipc_media_addr *maddr)
 {
-	return n_ptr->active_links[0] != NULL;
+	struct tipc_link *l = n->links[b->identity].link;
+	struct tipc_media_addr *curr = &n->links[b->identity].maddr;
+
+	*link_up = l && tipc_link_is_up(l);
+	*addr_match = l && !memcmp(curr, maddr, sizeof(*maddr));
 }
 
-int tipc_node_is_up(struct tipc_node *n_ptr)
+bool tipc_node_update_dest(struct tipc_node *n,  struct tipc_bearer *b,
+			   struct tipc_media_addr *maddr)
 {
-	return tipc_node_active_links(n_ptr);
+	struct tipc_link *l = n->links[b->identity].link;
+	struct tipc_media_addr *curr = &n->links[b->identity].maddr;
+	struct sk_buff_head *inputq = &n->links[b->identity].inputq;
+
+	if (!l) {
+		l = tipc_link_create(n, b, maddr, inputq, &n->bclink.namedq);
+		if (!l)
+			return false;
+		tipc_node_calculate_timer(n, l);
+		if (n->link_cnt == 1) {
+			if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
+				tipc_node_get(n);
+		}
+	}
+	memcpy(&l->media_addr, maddr, sizeof(*maddr));
+	memcpy(curr, maddr, sizeof(*maddr));
+	tipc_link_reset(l);
+	return true;
 }
 
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 {
-	n_ptr->links[l_ptr->bearer_id] = l_ptr;
+	n_ptr->links[l_ptr->bearer_id].link = l_ptr;
 	n_ptr->link_cnt++;
 }
 
@@ -352,15 +416,151 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 	int i;
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		if (l_ptr != n_ptr->links[i])
+		if (l_ptr != n_ptr->links[i].link)
 			continue;
-		n_ptr->links[i] = NULL;
+		n_ptr->links[i].link = NULL;
 		n_ptr->link_cnt--;
 	}
 }
 
+/* tipc_node_fsm_evt - node finite state machine
+ * Determines when contact is allowed with peer node
+ */
+static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
+{
+	int state = n->state;
+
+	switch (state) {
+	case SELF_DOWN_PEER_DOWN:
+		switch (evt) {
+		case SELF_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_COMING;
+			break;
+		case PEER_ESTABL_CONTACT_EVT:
+			state = SELF_COMING_PEER_UP;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_UP_PEER_UP:
+		switch (evt) {
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_LEAVING;
+			break;
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_LEAVING_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_DOWN_PEER_LEAVING:
+		switch (evt) {
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+		case SELF_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_UP_PEER_COMING:
+		switch (evt) {
+		case PEER_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_UP;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_LEAVING;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_COMING_PEER_UP:
+		switch (evt) {
+		case SELF_ESTABL_CONTACT_EVT:
+			state = SELF_UP_PEER_UP;
+			break;
+		case PEER_LOST_CONTACT_EVT:
+			state = SELF_LEAVING_PEER_DOWN;
+			break;
+		case SELF_LOST_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	case SELF_LEAVING_PEER_DOWN:
+		switch (evt) {
+		case SELF_LOST_CONTACT_EVT:
+			state = SELF_DOWN_PEER_DOWN;
+			break;
+		case SELF_ESTABL_CONTACT_EVT:
+		case PEER_ESTABL_CONTACT_EVT:
+		case PEER_LOST_CONTACT_EVT:
+			break;
+		default:
+			pr_err("Unknown node fsm evt %x/%x\n", state, evt);
+		}
+		break;
+	default:
+		pr_err("Unknown node fsm state %x\n", state);
+		break;
+	}
+
+	n->state = state;
+}
+
+bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
+			  struct tipc_msg *hdr)
+{
+	int state = n->state;
+
+	if (likely(state == SELF_UP_PEER_UP))
+		return true;
+
+	if (state == SELF_DOWN_PEER_DOWN)
+		return true;
+
+	if (state == SELF_UP_PEER_COMING) {
+		/* If not traffic msg, peer may still be ESTABLISHING */
+		if (tipc_link_is_up(l) && msg_is_traffic(hdr))
+			tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
+		return true;
+	}
+
+	if (state == SELF_COMING_PEER_UP)
+		return true;
+
+	if (state == SELF_LEAVING_PEER_DOWN)
+		return false;
+
+	if (state == SELF_DOWN_PEER_LEAVING) {
+		if (msg_peer_is_up(hdr))
+			return false;
+		tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
+		return true;
+	}
+	return false;
+}
+
 static void node_established_contact(struct tipc_node *n_ptr)
 {
+	tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
 	n_ptr->bclink.oos_state = 0;
 	n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
@@ -396,21 +596,18 @@ static void node_lost_contact(struct tipc_node *n_ptr)
 
 	/* Abort any ongoing link failover */
 	for (i = 0; i < MAX_BEARERS; i++) {
-		struct tipc_link *l_ptr = n_ptr->links[i];
+		struct tipc_link *l_ptr = n_ptr->links[i].link;
 		if (!l_ptr)
 			continue;
-		l_ptr->flags &= ~LINK_FAILINGOVER;
+		l_ptr->exec_mode = TIPC_LINK_OPEN;
 		l_ptr->failover_checkpt = 0;
 		l_ptr->failover_pkts = 0;
 		kfree_skb(l_ptr->failover_skb);
 		l_ptr->failover_skb = NULL;
 		tipc_link_reset_fragments(l_ptr);
 	}
-
-	n_ptr->action_flags &= ~TIPC_WAIT_OWN_LINKS_DOWN;
-
 	/* Prevent re-contact with node until cleanup is done */
-	n_ptr->action_flags |= TIPC_WAIT_PEER_LINKS_DOWN;
+	tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
 
 	/* Notify publications from this node */
 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
@@ -453,7 +650,7 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
 		goto exit;
 
 	tipc_node_lock(node);
-	link = node->links[bearer_id];
+	link = node->links[bearer_id].link;
 	if (link) {
 		strncpy(linkname, link->name, len);
 		err = 0;
@@ -559,6 +756,160 @@ msg_full:
 	return -EMSGSIZE;
 }
 
+static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
+					       int *bearer_id,
+					       struct tipc_media_addr **maddr)
+{
+	int id = n->active_links[sel & 1];
+
+	if (unlikely(id < 0))
+		return NULL;
+
+	*bearer_id = id;
+	*maddr = &n->links[id].maddr;
+	return n->links[id].link;
+}
+
+/**
+ * tipc_node_xmit() is the general link level function for message sending
+ * @net: the applicable net namespace
+ * @list: chain of buffers containing message
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
+		   u32 dnode, int selector)
+{
+	struct tipc_link *l = NULL;
+	struct tipc_node *n;
+	struct sk_buff_head xmitq;
+	struct tipc_media_addr *maddr;
+	int bearer_id;
+	int rc = -EHOSTUNREACH;
+
+	__skb_queue_head_init(&xmitq);
+	n = tipc_node_find(net, dnode);
+	if (likely(n)) {
+		tipc_node_lock(n);
+		l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
+		if (likely(l))
+			rc = tipc_link_xmit(l, list, &xmitq);
+		if (unlikely(rc == -ENOBUFS))
+			tipc_link_reset(l);
+		tipc_node_unlock(n);
+		tipc_node_put(n);
+	}
+	if (likely(!rc)) {
+		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+		return 0;
+	}
+	if (likely(in_own_node(net, dnode))) {
+		tipc_sk_rcv(net, list);
+		return 0;
+	}
+	return rc;
+}
+
+/* tipc_node_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
+		       u32 selector)
+{
+	struct sk_buff_head head;
+	int rc;
+
+	skb_queue_head_init(&head);
+	__skb_queue_tail(&head, skb);
+	rc = tipc_node_xmit(net, &head, dnode, selector);
+	if (rc == -ELINKCONG)
+		kfree_skb(skb);
+	return 0;
+}
+
+/**
+ * tipc_rcv - process TIPC packets/messages arriving from off-node
+ * @net: the applicable net namespace
+ * @skb: TIPC packet
+ * @bearer: pointer to bearer message arrived on
+ *
+ * Invoked with no locks held. Bearer pointer must point to a valid bearer
+ * structure (i.e. cannot be NULL), but bearer can be inactive.
+ */
+void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
+{
+	struct sk_buff_head xmitq;
+	struct tipc_node *n;
+	struct tipc_link *l;
+	struct tipc_msg *hdr;
+	struct tipc_media_addr *maddr;
+	int bearer_id = b->identity;
+	int rc = 0;
+
+	__skb_queue_head_init(&xmitq);
+
+	/* Ensure message is well-formed */
+	if (unlikely(!tipc_msg_validate(skb)))
+		goto discard;
+
+	/* Handle arrival of a non-unicast link packet */
+	hdr = buf_msg(skb);
+	if (unlikely(msg_non_seq(hdr))) {
+		if (msg_user(hdr) ==  LINK_CONFIG)
+			tipc_disc_rcv(net, skb, b);
+		else
+			tipc_bclink_rcv(net, skb);
+		return;
+	}
+
+	/* Locate neighboring node that sent packet */
+	n = tipc_node_find(net, msg_prevnode(hdr));
+	if (unlikely(!n))
+		goto discard;
+	tipc_node_lock(n);
+
+	/* Locate link endpoint that should handle packet */
+	l = n->links[bearer_id].link;
+	if (unlikely(!l))
+		goto unlock;
+
+	/* Is reception of this packet permitted at the moment ? */
+	if (unlikely(n->state != SELF_UP_PEER_UP))
+		if (!tipc_node_filter_skb(n, l, hdr))
+			goto unlock;
+
+	if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
+		tipc_bclink_sync_state(n, hdr);
+
+	/* Release acked broadcast messages */
+	if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
+		tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
+
+	/* Check protocol and update link state */
+	rc = tipc_link_rcv(l, skb, &xmitq);
+
+	if (unlikely(rc & TIPC_LINK_UP_EVT))
+		tipc_link_activate(l);
+	if (unlikely(rc & TIPC_LINK_DOWN_EVT))
+		tipc_link_reset(l);
+	skb = NULL;
+unlock:
+	tipc_node_unlock(n);
+	tipc_sk_rcv(net, &n->links[bearer_id].inputq);
+	maddr = &n->links[bearer_id].maddr;
+	tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+	tipc_node_put(n);
+discard:
+	kfree_skb(skb);
+}
+
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
 	int err;

+ 65 - 28
net/tipc/node.h

@@ -45,6 +45,26 @@
 /* Out-of-range value for node signature */
 #define INVALID_NODE_SIG	0x10000
 
+#define INVALID_BEARER_ID -1
+
+/* Node FSM states and events:
+ */
+enum {
+	SELF_DOWN_PEER_DOWN    = 0xdd,
+	SELF_UP_PEER_UP        = 0xaa,
+	SELF_DOWN_PEER_LEAVING = 0xd1,
+	SELF_UP_PEER_COMING    = 0xac,
+	SELF_COMING_PEER_UP    = 0xca,
+	SELF_LEAVING_PEER_DOWN = 0x1d,
+};
+
+enum {
+	SELF_ESTABL_CONTACT_EVT = 0xec,
+	SELF_LOST_CONTACT_EVT   = 0x1c,
+	PEER_ESTABL_CONTACT_EVT = 0xfec,
+	PEER_LOST_CONTACT_EVT   = 0xf1c
+};
+
 /* Flags used to take different actions according to flag type
  * TIPC_WAIT_PEER_LINKS_DOWN: wait to see that peer's links are down
  * TIPC_WAIT_OWN_LINKS_DOWN: wait until peer node is declared down
@@ -54,8 +74,6 @@
  */
 enum {
 	TIPC_MSG_EVT                    = 1,
-	TIPC_WAIT_PEER_LINKS_DOWN	= (1 << 1),
-	TIPC_WAIT_OWN_LINKS_DOWN	= (1 << 2),
 	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),
 	TIPC_NOTIFY_NODE_UP		= (1 << 4),
 	TIPC_WAKEUP_BCAST_USERS		= (1 << 5),
@@ -85,10 +103,17 @@ struct tipc_node_bclink {
 	u32 deferred_size;
 	struct sk_buff_head deferdq;
 	struct sk_buff *reasm_buf;
-	int inputq_map;
+	struct sk_buff_head namedq;
 	bool recv_permitted;
 };
 
+struct tipc_link_entry {
+	struct tipc_link *link;
+	u32 mtu;
+	struct sk_buff_head inputq;
+	struct tipc_media_addr maddr;
+};
+
 /**
  * struct tipc_node - TIPC node structure
  * @addr: network address of node
@@ -98,9 +123,8 @@ struct tipc_node_bclink {
  * @hash: links to adjacent nodes in unsorted hash chain
  * @inputq: pointer to input queue containing messages for msg event
  * @namedq: pointer to name table input queue with name table messages
- * @curr_link: the link holding the node lock, if any
- * @active_links: pointers to active links to node
- * @links: pointers to all links to node
+ * @active_links: bearer ids of active links, used as index into links[] array
+ * @links: array containing references to all links to node
  * @action_flags: bit mask of different types of node actions
  * @bclink: broadcast-related info
  * @list: links to adjacent nodes in sorted list of cluster's nodes
@@ -120,12 +144,12 @@ struct tipc_node {
 	struct hlist_node hash;
 	struct sk_buff_head *inputq;
 	struct sk_buff_head *namedq;
-	struct tipc_link *active_links[2];
-	u32 act_mtus[2];
-	struct tipc_link *links[MAX_BEARERS];
+	int active_links[2];
+	struct tipc_link_entry links[MAX_BEARERS];
 	int action_flags;
 	struct tipc_node_bclink bclink;
 	struct list_head list;
+	int state;
 	int link_cnt;
 	u16 working_links;
 	u16 capabilities;
@@ -133,6 +157,8 @@ struct tipc_node {
 	u32 link_id;
 	struct list_head publ_list;
 	struct list_head conn_sks;
+	unsigned long keepalive_intv;
+	struct timer_list timer;
 	struct rcu_head rcu;
 };
 
@@ -140,18 +166,25 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr);
 void tipc_node_put(struct tipc_node *node);
 struct tipc_node *tipc_node_create(struct net *net, u32 addr);
 void tipc_node_stop(struct net *net);
+void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *bearer,
+			  bool *link_up, bool *addr_match,
+			  struct tipc_media_addr *maddr);
+bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
+			   struct tipc_media_addr *maddr);
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
 void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
-int tipc_node_active_links(struct tipc_node *n_ptr);
-int tipc_node_is_up(struct tipc_node *n_ptr);
+void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);
+void tipc_node_link_up(struct tipc_node *n_ptr, int bearer_id);
+bool tipc_node_is_up(struct tipc_node *n);
 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
 			   char *linkname, size_t len);
 void tipc_node_unlock(struct tipc_node *node);
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
+		   int selector);
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
+		       u32 selector);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
-
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
 
 static inline void tipc_node_lock(struct tipc_node *node)
@@ -159,26 +192,30 @@ static inline void tipc_node_lock(struct tipc_node *node)
 	spin_lock_bh(&node->lock);
 }
 
-static inline bool tipc_node_blocked(struct tipc_node *node)
+static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
 {
-	return (node->action_flags & (TIPC_WAIT_PEER_LINKS_DOWN |
-		TIPC_NOTIFY_NODE_DOWN | TIPC_WAIT_OWN_LINKS_DOWN));
+	int bearer_id = n->active_links[sel & 1];
+
+	if (unlikely(bearer_id == INVALID_BEARER_ID))
+		return NULL;
+
+	return n->links[bearer_id].link;
 }
 
-static inline uint tipc_node_get_mtu(struct net *net, u32 addr, u32 selector)
+static inline unsigned int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
 {
-	struct tipc_node *node;
-	u32 mtu;
-
-	node = tipc_node_find(net, addr);
+	struct tipc_node *n;
+	int bearer_id;
+	unsigned int mtu = MAX_MSG_SIZE;
 
-	if (likely(node)) {
-		mtu = node->act_mtus[selector & 1];
-		tipc_node_put(node);
-	} else {
-		mtu = MAX_MSG_SIZE;
-	}
+	n = tipc_node_find(net, addr);
+	if (unlikely(!n))
+		return mtu;
 
+	bearer_id = n->active_links[sel & 1];
+	if (likely(bearer_id != INVALID_BEARER_ID))
+		mtu = n->links[bearer_id].mtu;
+	tipc_node_put(n);
 	return mtu;
 }
 

+ 37 - 34
net/tipc/socket.c

@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
 
 	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
 		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
-			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
+			tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
 	}
 }
 
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
 			}
 			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
 					     TIPC_ERR_NO_PORT))
-				tipc_link_xmit_skb(net, skb, dnode, 0);
+				tipc_node_xmit_skb(net, skb, dnode, 0);
 		}
 	}
 
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
 				      tsk_own_node(tsk), tsk_peer_port(tsk),
 				      tsk->portid, TIPC_ERR_NO_PORT);
 		if (skb)
-			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		tipc_node_remove_conn(net, dnode, tsk->portid);
 	}
 
@@ -686,21 +686,22 @@ new_mtu:
 
 	do {
 		rc = tipc_bclink_xmit(net, pktchain);
-		if (likely(rc >= 0)) {
-			rc = dsz;
-			break;
+		if (likely(!rc))
+			return dsz;
+
+		if (rc == -ELINKCONG) {
+			tsk->link_cong = 1;
+			rc = tipc_wait_for_sndmsg(sock, &timeo);
+			if (!rc)
+				continue;
 		}
+		__skb_queue_purge(pktchain);
 		if (rc == -EMSGSIZE) {
 			msg->msg_iter = save;
 			goto new_mtu;
 		}
-		if (rc != -ELINKCONG)
-			break;
-		tipc_sk(sk)->link_cong = 1;
-		rc = tipc_wait_for_sndmsg(sock, &timeo);
-		if (rc)
-			__skb_queue_purge(pktchain);
-	} while (!rc);
+		break;
+	} while (1);
 	return rc;
 }
 
@@ -924,24 +925,25 @@ new_mtu:
 	do {
 		skb = skb_peek(pktchain);
 		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
-		if (likely(rc >= 0)) {
+		rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
+		if (likely(!rc)) {
 			if (sock->state != SS_READY)
 				sock->state = SS_CONNECTING;
-			rc = dsz;
-			break;
+			return dsz;
 		}
+		if (rc == -ELINKCONG) {
+			tsk->link_cong = 1;
+			rc = tipc_wait_for_sndmsg(sock, &timeo);
+			if (!rc)
+				continue;
+		}
+		__skb_queue_purge(pktchain);
 		if (rc == -EMSGSIZE) {
 			m->msg_iter = save;
 			goto new_mtu;
 		}
-		if (rc != -ELINKCONG)
-			break;
-		tsk->link_cong = 1;
-		rc = tipc_wait_for_sndmsg(sock, &timeo);
-		if (rc)
-			__skb_queue_purge(pktchain);
-	} while (!rc);
+		break;
+	} while (1);
 
 	return rc;
 }
@@ -1043,15 +1045,16 @@ next:
 		return rc;
 	do {
 		if (likely(!tsk_conn_cong(tsk))) {
-			rc = tipc_link_xmit(net, pktchain, dnode, portid);
+			rc = tipc_node_xmit(net, pktchain, dnode, portid);
 			if (likely(!rc)) {
 				tsk->sent_unacked++;
 				sent += send;
 				if (sent == dsz)
-					break;
+					return dsz;
 				goto next;
 			}
 			if (rc == -EMSGSIZE) {
+				__skb_queue_purge(pktchain);
 				tsk->max_pkt = tipc_node_get_mtu(net, dnode,
 								 portid);
 				m->msg_iter = save;
@@ -1059,13 +1062,13 @@ next:
 			}
 			if (rc != -ELINKCONG)
 				break;
+
 			tsk->link_cong = 1;
 		}
 		rc = tipc_wait_for_sndpkt(sock, &timeo);
-		if (rc)
-			__skb_queue_purge(pktchain);
 	} while (!rc);
 
+	__skb_queue_purge(pktchain);
 	return sent ? sent : rc;
 }
 
@@ -1221,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 		return;
 	msg = buf_msg(skb);
 	msg_set_msgcnt(msg, ack);
-	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1700,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 		return 0;
 	}
 	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
-		tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 	return 0;
 }
 
@@ -1796,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
 			continue;
 xmit:
-		tipc_link_xmit_skb(net, skb, dnode, dport);
+		tipc_node_xmit_skb(net, skb, dnode, dport);
 	}
 	return err ? -EHOSTUNREACH : 0;
 }
@@ -2089,7 +2092,7 @@ restart:
 			}
 			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
 					     TIPC_CONN_SHUTDOWN))
-				tipc_link_xmit_skb(net, skb, dnode,
+				tipc_node_xmit_skb(net, skb, dnode,
 						   tsk->portid);
 		} else {
 			dnode = tsk_peer_node(tsk);
@@ -2099,7 +2102,7 @@ restart:
 					      0, dnode, tsk_own_node(tsk),
 					      tsk_peer_port(tsk),
 					      tsk->portid, TIPC_CONN_SHUTDOWN);
-			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		}
 		tsk->connected = 0;
 		sock->state = SS_DISCONNECTING;
@@ -2161,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
 	}
 	bh_unlock_sock(sk);
 	if (skb)
-		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
+		tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
 exit:
 	sock_put(sk);
 }