group.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. /*
  2. * net/tipc/group.c: TIPC group messaging code
  3. *
  4. * Copyright (c) 2017, Ericsson AB
  5. * All rights reserved.
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. *
  10. * 1. Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * 2. Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. * 3. Neither the names of the copyright holders nor the names of its
  16. * contributors may be used to endorse or promote products derived from
  17. * this software without specific prior written permission.
  18. *
  19. * Alternatively, this software may be distributed under the terms of the
  20. * GNU General Public License ("GPL") version 2 as published by the Free
  21. * Software Foundation.
  22. *
  23. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  24. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  27. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33. * POSSIBILITY OF SUCH DAMAGE.
  34. */
  35. #include "core.h"
  36. #include "addr.h"
  37. #include "group.h"
  38. #include "bcast.h"
  39. #include "server.h"
  40. #include "msg.h"
  41. #include "socket.h"
  42. #include "node.h"
  43. #include "name_table.h"
  44. #include "subscr.h"
  45. #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
  46. #define ADV_IDLE ADV_UNIT
  47. #define ADV_ACTIVE (ADV_UNIT * 12)
  48. enum mbr_state {
  49. MBR_QUARANTINED,
  50. MBR_DISCOVERED,
  51. MBR_JOINING,
  52. MBR_PUBLISHED,
  53. MBR_JOINED,
  54. MBR_LEAVING
  55. };
  56. struct tipc_member {
  57. struct rb_node tree_node;
  58. struct list_head list;
  59. struct list_head congested;
  60. struct sk_buff *event_msg;
  61. struct tipc_group *group;
  62. u32 node;
  63. u32 port;
  64. u32 instance;
  65. enum mbr_state state;
  66. u16 advertised;
  67. u16 window;
  68. u16 bc_rcv_nxt;
  69. bool usr_pending;
  70. };
  71. struct tipc_group {
  72. struct rb_root members;
  73. struct list_head congested;
  74. struct tipc_nlist dests;
  75. struct net *net;
  76. int subid;
  77. u32 type;
  78. u32 instance;
  79. u32 domain;
  80. u32 scope;
  81. u32 portid;
  82. u16 member_cnt;
  83. u16 bc_snd_nxt;
  84. bool loopback;
  85. bool events;
  86. };
  87. static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
  88. int mtyp, struct sk_buff_head *xmitq);
  89. static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
  90. {
  91. int mcnt = grp->member_cnt + 1;
  92. /* Scale to bytes, considering worst-case truesize/msgsize ratio */
  93. return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4;
  94. }
  95. u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
  96. {
  97. return grp->bc_snd_nxt;
  98. }
  99. static bool tipc_group_is_enabled(struct tipc_member *m)
  100. {
  101. return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
  102. }
  103. static bool tipc_group_is_receiver(struct tipc_member *m)
  104. {
  105. return m && m->state >= MBR_JOINED;
  106. }
  107. u32 tipc_group_exclude(struct tipc_group *grp)
  108. {
  109. if (!grp->loopback)
  110. return grp->portid;
  111. return 0;
  112. }
  113. int tipc_group_size(struct tipc_group *grp)
  114. {
  115. return grp->member_cnt;
  116. }
  117. struct tipc_group *tipc_group_create(struct net *net, u32 portid,
  118. struct tipc_group_req *mreq)
  119. {
  120. struct tipc_group *grp;
  121. u32 type = mreq->type;
  122. grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
  123. if (!grp)
  124. return NULL;
  125. tipc_nlist_init(&grp->dests, tipc_own_addr(net));
  126. INIT_LIST_HEAD(&grp->congested);
  127. grp->members = RB_ROOT;
  128. grp->net = net;
  129. grp->portid = portid;
  130. grp->domain = addr_domain(net, mreq->scope);
  131. grp->type = type;
  132. grp->instance = mreq->instance;
  133. grp->scope = mreq->scope;
  134. grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
  135. grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
  136. if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
  137. return grp;
  138. kfree(grp);
  139. return NULL;
  140. }
  141. void tipc_group_delete(struct net *net, struct tipc_group *grp)
  142. {
  143. struct rb_root *tree = &grp->members;
  144. struct tipc_member *m, *tmp;
  145. struct sk_buff_head xmitq;
  146. __skb_queue_head_init(&xmitq);
  147. rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
  148. tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
  149. list_del(&m->list);
  150. kfree(m);
  151. }
  152. tipc_node_distr_xmit(net, &xmitq);
  153. tipc_nlist_purge(&grp->dests);
  154. tipc_topsrv_kern_unsubscr(net, grp->subid);
  155. kfree(grp);
  156. }
  157. struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
  158. u32 node, u32 port)
  159. {
  160. struct rb_node *n = grp->members.rb_node;
  161. u64 nkey, key = (u64)node << 32 | port;
  162. struct tipc_member *m;
  163. while (n) {
  164. m = container_of(n, struct tipc_member, tree_node);
  165. nkey = (u64)m->node << 32 | m->port;
  166. if (key < nkey)
  167. n = n->rb_left;
  168. else if (key > nkey)
  169. n = n->rb_right;
  170. else
  171. return m;
  172. }
  173. return NULL;
  174. }
  175. static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
  176. u32 node, u32 port)
  177. {
  178. struct tipc_member *m;
  179. m = tipc_group_find_member(grp, node, port);
  180. if (m && tipc_group_is_enabled(m))
  181. return m;
  182. return NULL;
  183. }
  184. static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
  185. u32 node)
  186. {
  187. struct tipc_member *m;
  188. struct rb_node *n;
  189. for (n = rb_first(&grp->members); n; n = rb_next(n)) {
  190. m = container_of(n, struct tipc_member, tree_node);
  191. if (m->node == node)
  192. return m;
  193. }
  194. return NULL;
  195. }
  196. static void tipc_group_add_to_tree(struct tipc_group *grp,
  197. struct tipc_member *m)
  198. {
  199. u64 nkey, key = (u64)m->node << 32 | m->port;
  200. struct rb_node **n, *parent = NULL;
  201. struct tipc_member *tmp;
  202. n = &grp->members.rb_node;
  203. while (*n) {
  204. tmp = container_of(*n, struct tipc_member, tree_node);
  205. parent = *n;
  206. tmp = container_of(parent, struct tipc_member, tree_node);
  207. nkey = (u64)tmp->node << 32 | tmp->port;
  208. if (key < nkey)
  209. n = &(*n)->rb_left;
  210. else if (key > nkey)
  211. n = &(*n)->rb_right;
  212. else
  213. return;
  214. }
  215. rb_link_node(&m->tree_node, parent, n);
  216. rb_insert_color(&m->tree_node, &grp->members);
  217. }
  218. static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
  219. u32 node, u32 port,
  220. int state)
  221. {
  222. struct tipc_member *m;
  223. m = kzalloc(sizeof(*m), GFP_ATOMIC);
  224. if (!m)
  225. return NULL;
  226. INIT_LIST_HEAD(&m->list);
  227. INIT_LIST_HEAD(&m->congested);
  228. m->group = grp;
  229. m->node = node;
  230. m->port = port;
  231. grp->member_cnt++;
  232. tipc_group_add_to_tree(grp, m);
  233. tipc_nlist_add(&grp->dests, m->node);
  234. m->state = state;
  235. return m;
  236. }
  237. void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
  238. {
  239. tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
  240. }
  241. static void tipc_group_delete_member(struct tipc_group *grp,
  242. struct tipc_member *m)
  243. {
  244. rb_erase(&m->tree_node, &grp->members);
  245. grp->member_cnt--;
  246. list_del_init(&m->list);
  247. list_del_init(&m->congested);
  248. /* If last member on a node, remove node from dest list */
  249. if (!tipc_group_find_node(grp, m->node))
  250. tipc_nlist_del(&grp->dests, m->node);
  251. kfree(m);
  252. }
  253. struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
  254. {
  255. return &grp->dests;
  256. }
  257. void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
  258. int *scope)
  259. {
  260. seq->type = grp->type;
  261. seq->lower = grp->instance;
  262. seq->upper = grp->instance;
  263. *scope = grp->scope;
  264. }
  265. void tipc_group_update_member(struct tipc_member *m, int len)
  266. {
  267. struct tipc_group *grp = m->group;
  268. struct tipc_member *_m, *tmp;
  269. if (!tipc_group_is_enabled(m))
  270. return;
  271. m->window -= len;
  272. if (m->window >= ADV_IDLE)
  273. return;
  274. if (!list_empty(&m->congested))
  275. return;
  276. /* Sort member into congested members' list */
  277. list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
  278. if (m->window > _m->window)
  279. continue;
  280. list_add_tail(&m->congested, &_m->congested);
  281. return;
  282. }
  283. list_add_tail(&m->congested, &grp->congested);
  284. }
  285. void tipc_group_update_bc_members(struct tipc_group *grp, int len)
  286. {
  287. struct tipc_member *m;
  288. struct rb_node *n;
  289. for (n = rb_first(&grp->members); n; n = rb_next(n)) {
  290. m = container_of(n, struct tipc_member, tree_node);
  291. if (tipc_group_is_enabled(m))
  292. tipc_group_update_member(m, len);
  293. }
  294. grp->bc_snd_nxt++;
  295. }
  296. bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
  297. int len, struct tipc_member **mbr)
  298. {
  299. struct sk_buff_head xmitq;
  300. struct tipc_member *m;
  301. int adv, state;
  302. m = tipc_group_find_dest(grp, dnode, dport);
  303. *mbr = m;
  304. if (!m)
  305. return false;
  306. if (m->usr_pending)
  307. return true;
  308. if (m->window >= len)
  309. return false;
  310. m->usr_pending = true;
  311. /* If not fully advertised, do it now to prevent mutual blocking */
  312. adv = m->advertised;
  313. state = m->state;
  314. if (state < MBR_JOINED)
  315. return true;
  316. if (state == MBR_JOINED && adv == ADV_IDLE)
  317. return true;
  318. skb_queue_head_init(&xmitq);
  319. tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
  320. tipc_node_distr_xmit(grp->net, &xmitq);
  321. return true;
  322. }
  323. bool tipc_group_bc_cong(struct tipc_group *grp, int len)
  324. {
  325. struct tipc_member *m = NULL;
  326. if (list_empty(&grp->congested))
  327. return false;
  328. m = list_first_entry(&grp->congested, struct tipc_member, congested);
  329. if (m->window >= len)
  330. return false;
  331. return tipc_group_cong(grp, m->node, m->port, len, &m);
  332. }
  333. /* tipc_group_filter_msg() - determine if we should accept arriving message
  334. */
  335. void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
  336. struct sk_buff_head *xmitq)
  337. {
  338. struct sk_buff *skb = __skb_dequeue(inputq);
  339. struct tipc_member *m;
  340. struct tipc_msg *hdr;
  341. u32 node, port;
  342. int mtyp;
  343. if (!skb)
  344. return;
  345. hdr = buf_msg(skb);
  346. mtyp = msg_type(hdr);
  347. node = msg_orignode(hdr);
  348. port = msg_origport(hdr);
  349. if (!msg_in_group(hdr))
  350. goto drop;
  351. if (mtyp == TIPC_GRP_MEMBER_EVT) {
  352. if (!grp->events)
  353. goto drop;
  354. __skb_queue_tail(inputq, skb);
  355. return;
  356. }
  357. m = tipc_group_find_member(grp, node, port);
  358. if (!tipc_group_is_receiver(m))
  359. goto drop;
  360. TIPC_SKB_CB(skb)->orig_member = m->instance;
  361. __skb_queue_tail(inputq, skb);
  362. m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1;
  363. return;
  364. drop:
  365. kfree_skb(skb);
  366. }
  367. void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
  368. u32 port, struct sk_buff_head *xmitq)
  369. {
  370. struct tipc_member *m;
  371. m = tipc_group_find_member(grp, node, port);
  372. if (!m)
  373. return;
  374. m->advertised -= blks;
  375. switch (m->state) {
  376. case MBR_JOINED:
  377. if (m->advertised <= (ADV_ACTIVE - ADV_UNIT))
  378. tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
  379. break;
  380. case MBR_DISCOVERED:
  381. case MBR_JOINING:
  382. case MBR_LEAVING:
  383. default:
  384. break;
  385. }
  386. }
  387. static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
  388. int mtyp, struct sk_buff_head *xmitq)
  389. {
  390. struct tipc_msg *hdr;
  391. struct sk_buff *skb;
  392. int adv = 0;
  393. skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
  394. m->node, tipc_own_addr(grp->net),
  395. m->port, grp->portid, 0);
  396. if (!skb)
  397. return;
  398. if (m->state == MBR_JOINED)
  399. adv = ADV_ACTIVE - m->advertised;
  400. hdr = buf_msg(skb);
  401. if (mtyp == GRP_JOIN_MSG) {
  402. msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
  403. msg_set_adv_win(hdr, adv);
  404. m->advertised += adv;
  405. } else if (mtyp == GRP_ADV_MSG) {
  406. msg_set_adv_win(hdr, adv);
  407. m->advertised += adv;
  408. }
  409. __skb_queue_tail(xmitq, skb);
  410. }
  411. void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
  412. struct tipc_msg *hdr, struct sk_buff_head *inputq,
  413. struct sk_buff_head *xmitq)
  414. {
  415. u32 node = msg_orignode(hdr);
  416. u32 port = msg_origport(hdr);
  417. struct tipc_member *m;
  418. if (!grp)
  419. return;
  420. m = tipc_group_find_member(grp, node, port);
  421. switch (msg_type(hdr)) {
  422. case GRP_JOIN_MSG:
  423. if (!m)
  424. m = tipc_group_create_member(grp, node, port,
  425. MBR_QUARANTINED);
  426. if (!m)
  427. return;
  428. m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
  429. m->window += msg_adv_win(hdr);
  430. /* Wait until PUBLISH event is received */
  431. if (m->state == MBR_DISCOVERED) {
  432. m->state = MBR_JOINING;
  433. } else if (m->state == MBR_PUBLISHED) {
  434. m->state = MBR_JOINED;
  435. *usr_wakeup = true;
  436. m->usr_pending = false;
  437. tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
  438. __skb_queue_tail(inputq, m->event_msg);
  439. }
  440. if (m->window < ADV_IDLE)
  441. tipc_group_update_member(m, 0);
  442. else
  443. list_del_init(&m->congested);
  444. return;
  445. case GRP_LEAVE_MSG:
  446. if (!m)
  447. return;
  448. /* Wait until WITHDRAW event is received */
  449. if (m->state != MBR_LEAVING) {
  450. m->state = MBR_LEAVING;
  451. return;
  452. }
  453. /* Otherwise deliver already received WITHDRAW event */
  454. __skb_queue_tail(inputq, m->event_msg);
  455. *usr_wakeup = m->usr_pending;
  456. tipc_group_delete_member(grp, m);
  457. list_del_init(&m->congested);
  458. return;
  459. case GRP_ADV_MSG:
  460. if (!m)
  461. return;
  462. m->window += msg_adv_win(hdr);
  463. *usr_wakeup = m->usr_pending;
  464. m->usr_pending = false;
  465. list_del_init(&m->congested);
  466. return;
  467. default:
  468. pr_warn("Received unknown GROUP_PROTO message\n");
  469. }
  470. }
  471. /* tipc_group_member_evt() - receive and handle a member up/down event
  472. */
  473. void tipc_group_member_evt(struct tipc_group *grp,
  474. bool *usr_wakeup,
  475. int *sk_rcvbuf,
  476. struct sk_buff *skb,
  477. struct sk_buff_head *inputq,
  478. struct sk_buff_head *xmitq)
  479. {
  480. struct tipc_msg *hdr = buf_msg(skb);
  481. struct tipc_event *evt = (void *)msg_data(hdr);
  482. u32 instance = evt->found_lower;
  483. u32 node = evt->port.node;
  484. u32 port = evt->port.ref;
  485. int event = evt->event;
  486. struct tipc_member *m;
  487. struct net *net;
  488. u32 self;
  489. if (!grp)
  490. goto drop;
  491. net = grp->net;
  492. self = tipc_own_addr(net);
  493. if (!grp->loopback && node == self && port == grp->portid)
  494. goto drop;
  495. /* Convert message before delivery to user */
  496. msg_set_hdr_sz(hdr, GROUP_H_SIZE);
  497. msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
  498. msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
  499. msg_set_origport(hdr, port);
  500. msg_set_orignode(hdr, node);
  501. msg_set_nametype(hdr, grp->type);
  502. msg_set_grp_evt(hdr, event);
  503. m = tipc_group_find_member(grp, node, port);
  504. if (event == TIPC_PUBLISHED) {
  505. if (!m)
  506. m = tipc_group_create_member(grp, node, port,
  507. MBR_DISCOVERED);
  508. if (!m)
  509. goto drop;
  510. /* Hold back event if JOIN message not yet received */
  511. if (m->state == MBR_DISCOVERED) {
  512. m->event_msg = skb;
  513. m->state = MBR_PUBLISHED;
  514. } else {
  515. __skb_queue_tail(inputq, skb);
  516. m->state = MBR_JOINED;
  517. *usr_wakeup = true;
  518. m->usr_pending = false;
  519. }
  520. m->instance = instance;
  521. TIPC_SKB_CB(skb)->orig_member = m->instance;
  522. tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
  523. if (m->window < ADV_IDLE)
  524. tipc_group_update_member(m, 0);
  525. else
  526. list_del_init(&m->congested);
  527. } else if (event == TIPC_WITHDRAWN) {
  528. if (!m)
  529. goto drop;
  530. TIPC_SKB_CB(skb)->orig_member = m->instance;
  531. *usr_wakeup = m->usr_pending;
  532. m->usr_pending = false;
  533. /* Hold back event if more messages might be expected */
  534. if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
  535. m->event_msg = skb;
  536. m->state = MBR_LEAVING;
  537. } else {
  538. __skb_queue_tail(inputq, skb);
  539. tipc_group_delete_member(grp, m);
  540. }
  541. list_del_init(&m->congested);
  542. }
  543. *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
  544. return;
  545. drop:
  546. kfree_skb(skb);
  547. }