|
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
|
|
link_init_max_pkt(l_ptr);
|
|
link_init_max_pkt(l_ptr);
|
|
|
|
|
|
l_ptr->next_out_no = 1;
|
|
l_ptr->next_out_no = 1;
|
|
- INIT_LIST_HEAD(&l_ptr->waiting_ports);
|
|
|
|
|
|
+ __skb_queue_head_init(&l_ptr->waiting_sks);
|
|
|
|
|
|
link_reset_statistics(l_ptr);
|
|
link_reset_statistics(l_ptr);
|
|
|
|
|
|
@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * link_schedule_port - schedule port for deferred sending
|
|
|
|
- * @l_ptr: pointer to link
|
|
|
|
- * @origport: reference to sending port
|
|
|
|
- * @sz: amount of data to be sent
|
|
|
|
- *
|
|
|
|
- * Schedules port for renewed sending of messages after link congestion
|
|
|
|
- * has abated.
|
|
|
|
|
|
+ * link_schedule_user - schedule user for wakeup after congestion
|
|
|
|
+ * @link: congested link
|
|
|
|
+ * @oport: sending port
|
|
|
|
+ * @chain_sz: size of buffer chain that was attempted sent
|
|
|
|
+ * @imp: importance of message attempted sent
|
|
|
|
+ * Create pseudo msg to send back to user when congestion abates
|
|
*/
|
|
*/
|
|
-static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
|
|
|
|
|
|
+static bool link_schedule_user(struct tipc_link *link, u32 oport,
|
|
|
|
+ uint chain_sz, uint imp)
|
|
{
|
|
{
|
|
- struct tipc_port *p_ptr;
|
|
|
|
- struct tipc_sock *tsk;
|
|
|
|
|
|
+ struct sk_buff *buf;
|
|
|
|
|
|
- spin_lock_bh(&tipc_port_list_lock);
|
|
|
|
- p_ptr = tipc_port_lock(origport);
|
|
|
|
- if (p_ptr) {
|
|
|
|
- if (!list_empty(&p_ptr->wait_list))
|
|
|
|
- goto exit;
|
|
|
|
- tsk = tipc_port_to_sock(p_ptr);
|
|
|
|
- tsk->link_cong = 1;
|
|
|
|
- p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
|
|
|
|
- list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
|
|
|
|
- l_ptr->stats.link_congs++;
|
|
|
|
-exit:
|
|
|
|
- tipc_port_unlock(p_ptr);
|
|
|
|
- }
|
|
|
|
- spin_unlock_bh(&tipc_port_list_lock);
|
|
|
|
- return -ELINKCONG;
|
|
|
|
|
|
+ buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
|
|
|
|
+ tipc_own_addr, oport, 0, 0);
|
|
|
|
+ if (!buf)
|
|
|
|
+ return false;
|
|
|
|
+ TIPC_SKB_CB(buf)->chain_sz = chain_sz;
|
|
|
|
+ TIPC_SKB_CB(buf)->chain_imp = imp;
|
|
|
|
+ __skb_queue_tail(&link->waiting_sks, buf);
|
|
|
|
+ link->stats.link_congs++;
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
|
|
|
|
-void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
|
|
|
|
|
|
+/**
|
|
|
|
+ * link_prepare_wakeup - prepare users for wakeup after congestion
|
|
|
|
+ * @link: congested link
|
|
|
|
+ * Move a number of waiting users, as permitted by available space in
|
|
|
|
+ * the send queue, from link wait queue to node wait queue for wakeup
|
|
|
|
+ */
|
|
|
|
+static void link_prepare_wakeup(struct tipc_link *link)
|
|
{
|
|
{
|
|
- struct tipc_port *p_ptr;
|
|
|
|
- struct tipc_sock *tsk;
|
|
|
|
- struct tipc_port *temp_p_ptr;
|
|
|
|
- int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
|
|
|
|
-
|
|
|
|
- if (all)
|
|
|
|
- win = 100000;
|
|
|
|
- if (win <= 0)
|
|
|
|
- return;
|
|
|
|
- if (!spin_trylock_bh(&tipc_port_list_lock))
|
|
|
|
- return;
|
|
|
|
- if (link_congested(l_ptr))
|
|
|
|
- goto exit;
|
|
|
|
- list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
|
|
|
|
- wait_list) {
|
|
|
|
- if (win <= 0)
|
|
|
|
|
|
+ struct sk_buff_head *wq = &link->waiting_sks;
|
|
|
|
+ struct sk_buff *buf;
|
|
|
|
+ uint pend_qsz = link->out_queue_size;
|
|
|
|
+
|
|
|
|
+ for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
|
|
|
|
+ if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
|
|
break;
|
|
break;
|
|
- tsk = tipc_port_to_sock(p_ptr);
|
|
|
|
- list_del_init(&p_ptr->wait_list);
|
|
|
|
- spin_lock_bh(p_ptr->lock);
|
|
|
|
- tsk->link_cong = 0;
|
|
|
|
- tipc_sock_wakeup(tsk);
|
|
|
|
- win -= p_ptr->waiting_pkts;
|
|
|
|
- spin_unlock_bh(p_ptr->lock);
|
|
|
|
|
|
+ pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
|
|
|
|
+ __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
|
|
}
|
|
}
|
|
-
|
|
|
|
-exit:
|
|
|
|
- spin_unlock_bh(&tipc_port_list_lock);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
|
|
u32 prev_state = l_ptr->state;
|
|
u32 prev_state = l_ptr->state;
|
|
u32 checkpoint = l_ptr->next_in_no;
|
|
u32 checkpoint = l_ptr->next_in_no;
|
|
int was_active_link = tipc_link_is_active(l_ptr);
|
|
int was_active_link = tipc_link_is_active(l_ptr);
|
|
|
|
+ struct tipc_node *owner = l_ptr->owner;
|
|
|
|
|
|
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
|
|
msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
|
|
|
|
|
|
@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
|
|
kfree_skb(l_ptr->proto_msg_queue);
|
|
kfree_skb(l_ptr->proto_msg_queue);
|
|
l_ptr->proto_msg_queue = NULL;
|
|
l_ptr->proto_msg_queue = NULL;
|
|
kfree_skb_list(l_ptr->oldest_deferred_in);
|
|
kfree_skb_list(l_ptr->oldest_deferred_in);
|
|
- if (!list_empty(&l_ptr->waiting_ports))
|
|
|
|
- tipc_link_wakeup_ports(l_ptr, 1);
|
|
|
|
-
|
|
|
|
|
|
+ if (!skb_queue_empty(&l_ptr->waiting_sks)) {
|
|
|
|
+ skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
|
|
|
|
+ owner->action_flags |= TIPC_WAKEUP_USERS;
|
|
|
|
+ }
|
|
l_ptr->retransm_queue_head = 0;
|
|
l_ptr->retransm_queue_head = 0;
|
|
l_ptr->retransm_queue_size = 0;
|
|
l_ptr->retransm_queue_size = 0;
|
|
l_ptr->last_out = NULL;
|
|
l_ptr->last_out = NULL;
|
|
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
|
|
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
|
|
static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
|
|
{
|
|
{
|
|
struct tipc_msg *msg = buf_msg(buf);
|
|
struct tipc_msg *msg = buf_msg(buf);
|
|
- uint psz = msg_size(msg);
|
|
|
|
uint imp = tipc_msg_tot_importance(msg);
|
|
uint imp = tipc_msg_tot_importance(msg);
|
|
u32 oport = msg_tot_origport(msg);
|
|
u32 oport = msg_tot_origport(msg);
|
|
|
|
|
|
- if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
|
|
|
|
- if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
|
|
|
|
- link_schedule_port(link, oport, psz);
|
|
|
|
- return -ELINKCONG;
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
|
|
+ if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
|
|
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
|
|
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
|
|
tipc_link_reset(link);
|
|
tipc_link_reset(link);
|
|
|
|
+ goto drop;
|
|
}
|
|
}
|
|
|
|
+ if (unlikely(msg_errcode(msg)))
|
|
|
|
+ goto drop;
|
|
|
|
+ if (unlikely(msg_reroute_cnt(msg)))
|
|
|
|
+ goto drop;
|
|
|
|
+ if (TIPC_SKB_CB(buf)->wakeup_pending)
|
|
|
|
+ return -ELINKCONG;
|
|
|
|
+ if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
|
|
|
|
+ return -ELINKCONG;
|
|
|
|
+drop:
|
|
kfree_skb_list(buf);
|
|
kfree_skb_list(buf);
|
|
return -EHOSTUNREACH;
|
|
return -EHOSTUNREACH;
|
|
}
|
|
}
|
|
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
|
|
if (unlikely(l_ptr->next_out))
|
|
if (unlikely(l_ptr->next_out))
|
|
tipc_link_push_queue(l_ptr);
|
|
tipc_link_push_queue(l_ptr);
|
|
|
|
|
|
- if (unlikely(!list_empty(&l_ptr->waiting_ports)))
|
|
|
|
- tipc_link_wakeup_ports(l_ptr, 0);
|
|
|
|
|
|
+ if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
|
|
|
|
+ link_prepare_wakeup(l_ptr);
|
|
|
|
+ l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
|
|
|
|
+ }
|
|
|
|
|
|
/* Process the incoming packet */
|
|
/* Process the incoming packet */
|
|
if (unlikely(!link_working_working(l_ptr))) {
|
|
if (unlikely(!link_working_working(l_ptr))) {
|