|
@@ -139,6 +139,13 @@ static void tipc_link_put(struct tipc_link *l_ptr)
|
|
|
kref_put(&l_ptr->ref, tipc_link_release);
|
|
|
}
|
|
|
|
|
|
+static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
|
|
|
+{
|
|
|
+ if (l->owner->active_links[0] != l)
|
|
|
+ return l->owner->active_links[0];
|
|
|
+ return l->owner->active_links[1];
|
|
|
+}
|
|
|
+
|
|
|
static void link_init_max_pkt(struct tipc_link *l_ptr)
|
|
|
{
|
|
|
struct tipc_node *node = l_ptr->owner;
|
|
@@ -1026,6 +1033,32 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/* link_synch(): check if all packets arrived before the synch
|
|
|
+ * point have been consumed
|
|
|
+ * Returns true if the parallel links are synched, otherwise false
|
|
|
+ */
|
|
|
+static bool link_synch(struct tipc_link *l)
|
|
|
+{
|
|
|
+ unsigned int post_synch;
|
|
|
+ struct tipc_link *pl;
|
|
|
+
|
|
|
+ pl = tipc_parallel_link(l);
|
|
|
+ if (pl == l)
|
|
|
+ goto synched;
|
|
|
+
|
|
|
+ /* Was last pre-synch packet added to input queue ? */
|
|
|
+ if (less_eq(pl->next_in_no, l->synch_point))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* Is it still in the input queue ? */
|
|
|
+ post_synch = mod(pl->next_in_no - l->synch_point) - 1;
|
|
|
+ if (skb_queue_len(&pl->inputq) > post_synch)
|
|
|
+ return false;
|
|
|
+synched:
|
|
|
+ l->flags &= ~LINK_SYNCHING;
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
static void link_retrieve_defq(struct tipc_link *link,
|
|
|
struct sk_buff_head *list)
|
|
|
{
|
|
@@ -1156,6 +1189,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
|
|
|
skb = NULL;
|
|
|
goto unlock;
|
|
|
}
|
|
|
+ /* Synchronize with parallel link if applicable */
|
|
|
+ if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
|
|
|
+ link_handle_out_of_seq_msg(l_ptr, skb);
|
|
|
+ if (link_synch(l_ptr))
|
|
|
+ link_retrieve_defq(l_ptr, &head);
|
|
|
+ skb = NULL;
|
|
|
+ goto unlock;
|
|
|
+ }
|
|
|
l_ptr->next_in_no++;
|
|
|
if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
|
|
|
link_retrieve_defq(l_ptr, &head);
|
|
@@ -1231,6 +1272,10 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
|
|
|
|
|
|
switch (msg_user(msg)) {
|
|
|
case CHANGEOVER_PROTOCOL:
|
|
|
+ if (msg_dup(msg)) {
|
|
|
+ link->flags |= LINK_SYNCHING;
|
|
|
+ link->synch_point = msg_seqno(msg_get_wrapped(msg));
|
|
|
+ }
|
|
|
if (!tipc_link_tunnel_rcv(node, &skb))
|
|
|
break;
|
|
|
if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
|