|
@@ -46,6 +46,7 @@
|
|
|
|
|
|
#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
|
|
|
#define ADV_IDLE ADV_UNIT
|
|
|
+#define ADV_ACTIVE (ADV_UNIT * 12)
|
|
|
|
|
|
enum mbr_state {
|
|
|
MBR_QUARANTINED,
|
|
@@ -59,16 +60,22 @@ enum mbr_state {
|
|
|
struct tipc_member {
|
|
|
struct rb_node tree_node;
|
|
|
struct list_head list;
|
|
|
+ struct list_head congested;
|
|
|
struct sk_buff *event_msg;
|
|
|
+ struct tipc_group *group;
|
|
|
u32 node;
|
|
|
u32 port;
|
|
|
u32 instance;
|
|
|
enum mbr_state state;
|
|
|
+ u16 advertised;
|
|
|
+ u16 window;
|
|
|
u16 bc_rcv_nxt;
|
|
|
+ bool usr_pending;
|
|
|
};
|
|
|
|
|
|
struct tipc_group {
|
|
|
struct rb_root members;
|
|
|
+ struct list_head congested;
|
|
|
struct tipc_nlist dests;
|
|
|
struct net *net;
|
|
|
int subid;
|
|
@@ -86,11 +93,24 @@ struct tipc_group {
|
|
|
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
|
|
|
int mtyp, struct sk_buff_head *xmitq);
|
|
|
|
|
|
+static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
|
|
|
+{
|
|
|
+ int mcnt = grp->member_cnt + 1;
|
|
|
+
|
|
|
+ /* Scale to bytes, considering worst-case truesize/msgsize ratio */
|
|
|
+ return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
|
|
|
+}
|
|
|
+
|
|
|
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
|
|
|
{
|
|
|
return grp->bc_snd_nxt;
|
|
|
}
|
|
|
|
|
|
+static bool tipc_group_is_enabled(struct tipc_member *m)
|
|
|
+{
|
|
|
+ return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
|
|
|
+}
|
|
|
+
|
|
|
static bool tipc_group_is_receiver(struct tipc_member *m)
|
|
|
{
|
|
|
return m && m->state >= MBR_JOINED;
|
|
@@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
|
|
|
if (!grp)
|
|
|
return NULL;
|
|
|
tipc_nlist_init(&grp->dests, tipc_own_addr(net));
|
|
|
+ INIT_LIST_HEAD(&grp->congested);
|
|
|
grp->members = RB_ROOT;
|
|
|
grp->net = net;
|
|
|
grp->portid = portid;
|
|
@@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
|
|
|
if (!m)
|
|
|
return NULL;
|
|
|
INIT_LIST_HEAD(&m->list);
|
|
|
+ INIT_LIST_HEAD(&m->congested);
|
|
|
+ m->group = grp;
|
|
|
m->node = node;
|
|
|
m->port = port;
|
|
|
grp->member_cnt++;
|
|
@@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
|
|
|
rb_erase(&m->tree_node, &grp->members);
|
|
|
grp->member_cnt--;
|
|
|
list_del_init(&m->list);
|
|
|
+ list_del_init(&m->congested);
|
|
|
|
|
|
/* If last member on a node, remove node from dest list */
|
|
|
if (!tipc_group_find_node(grp, m->node))
|
|
@@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
|
|
|
*scope = grp->scope;
|
|
|
}
|
|
|
|
|
|
-void tipc_group_update_bc_members(struct tipc_group *grp)
|
|
|
+void tipc_group_update_member(struct tipc_member *m, int len)
|
|
|
+{
|
|
|
+ struct tipc_group *grp = m->group;
|
|
|
+ struct tipc_member *_m, *tmp;
|
|
|
+
|
|
|
+ if (!tipc_group_is_enabled(m))
|
|
|
+ return;
|
|
|
+
|
|
|
+ m->window -= len;
|
|
|
+
|
|
|
+ if (m->window >= ADV_IDLE)
|
|
|
+ return;
|
|
|
+
|
|
|
+ if (!list_empty(&m->congested))
|
|
|
+ return;
|
|
|
+
|
|
|
+ /* Sort member into congested members' list */
|
|
|
+ list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
|
|
|
+ if (m->window > _m->window)
|
|
|
+ continue;
|
|
|
+ list_add_tail(&m->congested, &_m->congested);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ list_add_tail(&m->congested, &grp->congested);
|
|
|
+}
|
|
|
+
|
|
|
+void tipc_group_update_bc_members(struct tipc_group *grp, int len)
|
|
|
{
|
|
|
+ struct tipc_member *m;
|
|
|
+ struct rb_node *n;
|
|
|
+
|
|
|
+ for (n = rb_first(&grp->members); n; n = rb_next(n)) {
|
|
|
+ m = container_of(n, struct tipc_member, tree_node);
|
|
|
+ if (tipc_group_is_enabled(m))
|
|
|
+ tipc_group_update_member(m, len);
|
|
|
+ }
|
|
|
grp->bc_snd_nxt++;
|
|
|
}
|
|
|
|
|
|
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
|
|
|
+{
|
|
|
+ struct tipc_member *m;
|
|
|
+
|
|
|
+ if (list_empty(&grp->congested))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ m = list_first_entry(&grp->congested, struct tipc_member, congested);
|
|
|
+ if (m->window >= len)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
/* tipc_group_filter_msg() - determine if we should accept arriving message
|
|
|
*/
|
|
|
void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
|
|
@@ -302,11 +374,36 @@ drop:
|
|
|
kfree_skb(skb);
|
|
|
}
|
|
|
|
|
|
+void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
|
|
|
+ u32 port, struct sk_buff_head *xmitq)
|
|
|
+{
|
|
|
+ struct tipc_member *m;
|
|
|
+
|
|
|
+ m = tipc_group_find_member(grp, node, port);
|
|
|
+ if (!m)
|
|
|
+ return;
|
|
|
+
|
|
|
+ m->advertised -= blks;
|
|
|
+
|
|
|
+ switch (m->state) {
|
|
|
+ case MBR_JOINED:
|
|
|
+ if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
|
|
|
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
|
|
|
+ break;
|
|
|
+ case MBR_DISCOVERED:
|
|
|
+ case MBR_JOINING:
|
|
|
+ case MBR_LEAVING:
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
|
|
|
int mtyp, struct sk_buff_head *xmitq)
|
|
|
{
|
|
|
struct tipc_msg *hdr;
|
|
|
struct sk_buff *skb;
|
|
|
+ int adv = 0;
|
|
|
|
|
|
skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
|
|
|
m->node, tipc_own_addr(grp->net),
|
|
@@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
|
|
|
if (!skb)
|
|
|
return;
|
|
|
|
|
|
+ if (m->state == MBR_JOINED)
|
|
|
+ adv = ADV_ACTIVE - m->advertised;
|
|
|
+
|
|
|
hdr = buf_msg(skb);
|
|
|
- if (mtyp == GRP_JOIN_MSG)
|
|
|
+
|
|
|
+ if (mtyp == GRP_JOIN_MSG) {
|
|
|
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
|
|
|
+ msg_set_adv_win(hdr, adv);
|
|
|
+ m->advertised += adv;
|
|
|
+ } else if (mtyp == GRP_ADV_MSG) {
|
|
|
+ msg_set_adv_win(hdr, adv);
|
|
|
+ m->advertised += adv;
|
|
|
+ }
|
|
|
__skb_queue_tail(xmitq, skb);
|
|
|
}
|
|
|
|
|
|
-void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
|
|
|
- struct sk_buff_head *inputq,
|
|
|
+void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
|
|
|
+ struct tipc_msg *hdr, struct sk_buff_head *inputq,
|
|
|
struct sk_buff_head *xmitq)
|
|
|
{
|
|
|
u32 node = msg_orignode(hdr);
|
|
@@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
|
|
|
if (!m)
|
|
|
return;
|
|
|
m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
|
|
|
+ m->window += msg_adv_win(hdr);
|
|
|
|
|
|
/* Wait until PUBLISH event is received */
|
|
|
if (m->state == MBR_DISCOVERED) {
|
|
|
m->state = MBR_JOINING;
|
|
|
} else if (m->state == MBR_PUBLISHED) {
|
|
|
m->state = MBR_JOINED;
|
|
|
+ *usr_wakeup = true;
|
|
|
+ m->usr_pending = false;
|
|
|
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
|
|
|
__skb_queue_tail(inputq, m->event_msg);
|
|
|
}
|
|
|
+ if (m->window < ADV_IDLE)
|
|
|
+ tipc_group_update_member(m, 0);
|
|
|
+ else
|
|
|
+ list_del_init(&m->congested);
|
|
|
return;
|
|
|
case GRP_LEAVE_MSG:
|
|
|
if (!m)
|
|
@@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr,
|
|
|
}
|
|
|
/* Otherwise deliver already received WITHDRAW event */
|
|
|
__skb_queue_tail(inputq, m->event_msg);
|
|
|
+ *usr_wakeup = m->usr_pending;
|
|
|
tipc_group_delete_member(grp, m);
|
|
|
+ list_del_init(&m->congested);
|
|
|
+ return;
|
|
|
+ case GRP_ADV_MSG:
|
|
|
+ if (!m)
|
|
|
+ return;
|
|
|
+ m->window += msg_adv_win(hdr);
|
|
|
+ *usr_wakeup = m->usr_pending;
|
|
|
+ m->usr_pending = false;
|
|
|
+ list_del_init(&m->congested);
|
|
|
return;
|
|
|
default:
|
|
|
pr_warn("Received unknown GROUP_PROTO message\n");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/* tipc_group_member_evt() - receive and handle a member up/down event
|
|
|
+ */
|
|
|
void tipc_group_member_evt(struct tipc_group *grp,
|
|
|
+ bool *usr_wakeup,
|
|
|
+ int *sk_rcvbuf,
|
|
|
struct sk_buff *skb,
|
|
|
struct sk_buff_head *inputq,
|
|
|
struct sk_buff_head *xmitq)
|
|
@@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp,
|
|
|
} else {
|
|
|
__skb_queue_tail(inputq, skb);
|
|
|
m->state = MBR_JOINED;
|
|
|
+ *usr_wakeup = true;
|
|
|
+ m->usr_pending = false;
|
|
|
}
|
|
|
m->instance = instance;
|
|
|
TIPC_SKB_CB(skb)->orig_member = m->instance;
|
|
|
tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
|
|
|
+ if (m->window < ADV_IDLE)
|
|
|
+ tipc_group_update_member(m, 0);
|
|
|
+ else
|
|
|
+ list_del_init(&m->congested);
|
|
|
} else if (event == TIPC_WITHDRAWN) {
|
|
|
if (!m)
|
|
|
goto drop;
|
|
|
|
|
|
TIPC_SKB_CB(skb)->orig_member = m->instance;
|
|
|
|
|
|
+ *usr_wakeup = m->usr_pending;
|
|
|
+ m->usr_pending = false;
|
|
|
+
|
|
|
/* Hold back event if more messages might be expected */
|
|
|
if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
|
|
|
m->event_msg = skb;
|
|
@@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp,
|
|
|
__skb_queue_tail(inputq, skb);
|
|
|
tipc_group_delete_member(grp, m);
|
|
|
}
|
|
|
+ list_del_init(&m->congested);
|
|
|
}
|
|
|
+ *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
|
|
|
return;
|
|
|
drop:
|
|
|
kfree_skb(skb);
|