lowcomms-tcp.c 23 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013
  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
  5. ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
  6. **
  7. ** This copyrighted material is made available to anyone wishing to use,
  8. ** modify, copy, or redistribute it subject to the terms and conditions
  9. ** of the GNU General Public License v.2.
  10. **
  11. *******************************************************************************
  12. ******************************************************************************/
  13. /*
  14. * lowcomms.c
  15. *
  16. * This is the "low-level" comms layer.
  17. *
  18. * It is responsible for sending/receiving messages
  19. * from other nodes in the cluster.
  20. *
  21. * Cluster nodes are referred to by their nodeids. nodeids are
  22. * simply 32 bit numbers to the locking module - if they need to
  23. * be expanded for the cluster infrastructure then that is it's
  24. * responsibility. It is this layer's
  25. * responsibility to resolve these into IP address or
  26. * whatever it needs for inter-node communication.
  27. *
  28. * The comms level is two kernel threads that deal mainly with
  29. * the receiving of messages from other nodes and passing them
  30. * up to the mid-level comms layer (which understands the
  31. * message format) for execution by the locking core, and
  32. * a send thread which does all the setting up of connections
  33. * to remote nodes and the sending of data. Threads are not allowed
  34. * to send their own data because it may cause them to wait in times
  35. * of high load. Also, this way, the sending thread can collect together
  36. * messages bound for one node and send them in one block.
  37. *
  38. * I don't see any problem with the recv thread executing the locking
  39. * code on behalf of remote processes as the locking code is
  40. * short, efficient and never waits.
  41. *
  42. */
  43. #include <asm/ioctls.h>
  44. #include <net/sock.h>
  45. #include <net/tcp.h>
  46. #include <linux/pagemap.h>
  47. #include "dlm_internal.h"
  48. #include "lowcomms.h"
  49. #include "midcomms.h"
  50. #include "config.h"
  51. struct cbuf {
  52. unsigned int base;
  53. unsigned int len;
  54. unsigned int mask;
  55. };
  56. #define NODE_INCREMENT 32
  57. static void cbuf_add(struct cbuf *cb, int n)
  58. {
  59. cb->len += n;
  60. }
  61. static int cbuf_data(struct cbuf *cb)
  62. {
  63. return ((cb->base + cb->len) & cb->mask);
  64. }
  65. static void cbuf_init(struct cbuf *cb, int size)
  66. {
  67. cb->base = cb->len = 0;
  68. cb->mask = size-1;
  69. }
  70. static void cbuf_eat(struct cbuf *cb, int n)
  71. {
  72. cb->len -= n;
  73. cb->base += n;
  74. cb->base &= cb->mask;
  75. }
  76. static bool cbuf_empty(struct cbuf *cb)
  77. {
  78. return cb->len == 0;
  79. }
  80. /* Maximum number of incoming messages to process before
  81. doing a cond_resched()
  82. */
  83. #define MAX_RX_MSG_COUNT 25
  84. struct connection {
  85. struct socket *sock; /* NULL if not connected */
  86. uint32_t nodeid; /* So we know who we are in the list */
  87. struct rw_semaphore sock_sem; /* Stop connect races */
  88. unsigned long flags; /* bit 1,2 = We are on the read/write lists */
  89. #define CF_READ_PENDING 1
  90. #define CF_WRITE_PENDING 2
  91. #define CF_CONNECT_PENDING 3
  92. #define CF_IS_OTHERCON 4
  93. struct list_head writequeue; /* List of outgoing writequeue_entries */
  94. struct list_head listenlist; /* List of allocated listening sockets */
  95. spinlock_t writequeue_lock;
  96. int (*rx_action) (struct connection *); /* What to do when active */
  97. struct page *rx_page;
  98. struct cbuf cb;
  99. int retries;
  100. atomic_t waiting_requests;
  101. #define MAX_CONNECT_RETRIES 3
  102. struct connection *othercon;
  103. struct work_struct rwork; /* Receive workqueue */
  104. struct work_struct swork; /* Send workqueue */
  105. };
  106. #define sock2con(x) ((struct connection *)(x)->sk_user_data)
  107. /* An entry waiting to be sent */
  108. struct writequeue_entry {
  109. struct list_head list;
  110. struct page *page;
  111. int offset;
  112. int len;
  113. int end;
  114. int users;
  115. struct connection *con;
  116. };
  117. static struct sockaddr_storage dlm_local_addr;
  118. /* Work queues */
  119. static struct workqueue_struct *recv_workqueue;
  120. static struct workqueue_struct *send_workqueue;
  121. /* An array of pointers to connections, indexed by NODEID */
  122. static struct connection **connections;
  123. static DECLARE_MUTEX(connections_lock);
  124. static struct kmem_cache *con_cache;
  125. static int conn_array_size;
  126. static void process_recv_sockets(struct work_struct *work);
  127. static void process_send_sockets(struct work_struct *work);
  128. static struct connection *nodeid2con(int nodeid, gfp_t allocation)
  129. {
  130. struct connection *con = NULL;
  131. down(&connections_lock);
  132. if (nodeid >= conn_array_size) {
  133. int new_size = nodeid + NODE_INCREMENT;
  134. struct connection **new_conns;
  135. new_conns = kzalloc(sizeof(struct connection *) *
  136. new_size, allocation);
  137. if (!new_conns)
  138. goto finish;
  139. memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
  140. conn_array_size = new_size;
  141. kfree(connections);
  142. connections = new_conns;
  143. }
  144. con = connections[nodeid];
  145. if (con == NULL && allocation) {
  146. con = kmem_cache_zalloc(con_cache, allocation);
  147. if (!con)
  148. goto finish;
  149. con->nodeid = nodeid;
  150. init_rwsem(&con->sock_sem);
  151. INIT_LIST_HEAD(&con->writequeue);
  152. spin_lock_init(&con->writequeue_lock);
  153. INIT_WORK(&con->swork, process_send_sockets);
  154. INIT_WORK(&con->rwork, process_recv_sockets);
  155. connections[nodeid] = con;
  156. }
  157. finish:
  158. up(&connections_lock);
  159. return con;
  160. }
  161. /* Data available on socket or listen socket received a connect */
  162. static void lowcomms_data_ready(struct sock *sk, int count_unused)
  163. {
  164. struct connection *con = sock2con(sk);
  165. if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
  166. queue_work(recv_workqueue, &con->rwork);
  167. }
  168. static void lowcomms_write_space(struct sock *sk)
  169. {
  170. struct connection *con = sock2con(sk);
  171. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
  172. queue_work(send_workqueue, &con->swork);
  173. }
  174. static inline void lowcomms_connect_sock(struct connection *con)
  175. {
  176. if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
  177. queue_work(send_workqueue, &con->swork);
  178. }
  179. static void lowcomms_state_change(struct sock *sk)
  180. {
  181. if (sk->sk_state == TCP_ESTABLISHED)
  182. lowcomms_write_space(sk);
  183. }
  184. /* Make a socket active */
  185. static int add_sock(struct socket *sock, struct connection *con)
  186. {
  187. con->sock = sock;
  188. /* Install a data_ready callback */
  189. con->sock->sk->sk_data_ready = lowcomms_data_ready;
  190. con->sock->sk->sk_write_space = lowcomms_write_space;
  191. con->sock->sk->sk_state_change = lowcomms_state_change;
  192. return 0;
  193. }
  194. /* Add the port number to an IP6 or 4 sockaddr and return the address
  195. length */
  196. static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
  197. int *addr_len)
  198. {
  199. saddr->ss_family = dlm_local_addr.ss_family;
  200. if (saddr->ss_family == AF_INET) {
  201. struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
  202. in4_addr->sin_port = cpu_to_be16(port);
  203. *addr_len = sizeof(struct sockaddr_in);
  204. } else {
  205. struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
  206. in6_addr->sin6_port = cpu_to_be16(port);
  207. *addr_len = sizeof(struct sockaddr_in6);
  208. }
  209. }
  210. /* Close a remote connection and tidy up */
  211. static void close_connection(struct connection *con, bool and_other)
  212. {
  213. down_write(&con->sock_sem);
  214. if (con->sock) {
  215. sock_release(con->sock);
  216. con->sock = NULL;
  217. }
  218. if (con->othercon && and_other) {
  219. /* Will only re-enter once. */
  220. close_connection(con->othercon, false);
  221. }
  222. if (con->rx_page) {
  223. __free_page(con->rx_page);
  224. con->rx_page = NULL;
  225. }
  226. con->retries = 0;
  227. up_write(&con->sock_sem);
  228. }
  229. /* Data received from remote end */
  230. static int receive_from_sock(struct connection *con)
  231. {
  232. int ret = 0;
  233. struct msghdr msg;
  234. struct iovec iov[2];
  235. mm_segment_t fs;
  236. unsigned len;
  237. int r;
  238. int call_again_soon = 0;
  239. down_read(&con->sock_sem);
  240. if (con->sock == NULL)
  241. goto out;
  242. if (con->rx_page == NULL) {
  243. /*
  244. * This doesn't need to be atomic, but I think it should
  245. * improve performance if it is.
  246. */
  247. con->rx_page = alloc_page(GFP_ATOMIC);
  248. if (con->rx_page == NULL)
  249. goto out_resched;
  250. cbuf_init(&con->cb, PAGE_CACHE_SIZE);
  251. }
  252. msg.msg_control = NULL;
  253. msg.msg_controllen = 0;
  254. msg.msg_iovlen = 1;
  255. msg.msg_iov = iov;
  256. msg.msg_name = NULL;
  257. msg.msg_namelen = 0;
  258. msg.msg_flags = 0;
  259. /*
  260. * iov[0] is the bit of the circular buffer between the current end
  261. * point (cb.base + cb.len) and the end of the buffer.
  262. */
  263. iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
  264. iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
  265. iov[1].iov_len = 0;
  266. /*
  267. * iov[1] is the bit of the circular buffer between the start of the
  268. * buffer and the start of the currently used section (cb.base)
  269. */
  270. if (cbuf_data(&con->cb) >= con->cb.base) {
  271. iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
  272. iov[1].iov_len = con->cb.base;
  273. iov[1].iov_base = page_address(con->rx_page);
  274. msg.msg_iovlen = 2;
  275. }
  276. len = iov[0].iov_len + iov[1].iov_len;
  277. fs = get_fs();
  278. set_fs(get_ds());
  279. r = ret = sock_recvmsg(con->sock, &msg, len,
  280. MSG_DONTWAIT | MSG_NOSIGNAL);
  281. set_fs(fs);
  282. if (ret <= 0)
  283. goto out_close;
  284. if (ret == len)
  285. call_again_soon = 1;
  286. cbuf_add(&con->cb, ret);
  287. ret = dlm_process_incoming_buffer(con->nodeid,
  288. page_address(con->rx_page),
  289. con->cb.base, con->cb.len,
  290. PAGE_CACHE_SIZE);
  291. if (ret == -EBADMSG) {
  292. printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
  293. "iov_len=%u, iov_base[0]=%p, read=%d\n",
  294. page_address(con->rx_page), con->cb.base, con->cb.len,
  295. len, iov[0].iov_base, r);
  296. }
  297. if (ret < 0)
  298. goto out_close;
  299. cbuf_eat(&con->cb, ret);
  300. if (cbuf_empty(&con->cb) && !call_again_soon) {
  301. __free_page(con->rx_page);
  302. con->rx_page = NULL;
  303. }
  304. out:
  305. if (call_again_soon)
  306. goto out_resched;
  307. up_read(&con->sock_sem);
  308. return 0;
  309. out_resched:
  310. if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
  311. queue_work(recv_workqueue, &con->rwork);
  312. up_read(&con->sock_sem);
  313. cond_resched();
  314. return 0;
  315. out_close:
  316. up_read(&con->sock_sem);
  317. if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
  318. close_connection(con, false);
  319. /* Reconnect when there is something to send */
  320. }
  321. return ret;
  322. }
  323. /* Listening socket is busy, accept a connection */
  324. static int accept_from_sock(struct connection *con)
  325. {
  326. int result;
  327. struct sockaddr_storage peeraddr;
  328. struct socket *newsock;
  329. int len;
  330. int nodeid;
  331. struct connection *newcon;
  332. memset(&peeraddr, 0, sizeof(peeraddr));
  333. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
  334. IPPROTO_TCP, &newsock);
  335. if (result < 0)
  336. return -ENOMEM;
  337. down_read_nested(&con->sock_sem, 0);
  338. result = -ENOTCONN;
  339. if (con->sock == NULL)
  340. goto accept_err;
  341. newsock->type = con->sock->type;
  342. newsock->ops = con->sock->ops;
  343. result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
  344. if (result < 0)
  345. goto accept_err;
  346. /* Get the connected socket's peer */
  347. memset(&peeraddr, 0, sizeof(peeraddr));
  348. if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
  349. &len, 2)) {
  350. result = -ECONNABORTED;
  351. goto accept_err;
  352. }
  353. /* Get the new node's NODEID */
  354. make_sockaddr(&peeraddr, 0, &len);
  355. if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
  356. printk("dlm: connect from non cluster node\n");
  357. sock_release(newsock);
  358. up_read(&con->sock_sem);
  359. return -1;
  360. }
  361. log_print("got connection from %d", nodeid);
  362. /* Check to see if we already have a connection to this node. This
  363. * could happen if the two nodes initiate a connection at roughly
  364. * the same time and the connections cross on the wire.
  365. * TEMPORARY FIX:
  366. * In this case we store the incoming one in "othercon"
  367. */
  368. newcon = nodeid2con(nodeid, GFP_KERNEL);
  369. if (!newcon) {
  370. result = -ENOMEM;
  371. goto accept_err;
  372. }
  373. down_write_nested(&newcon->sock_sem, 1);
  374. if (newcon->sock) {
  375. struct connection *othercon = newcon->othercon;
  376. if (!othercon) {
  377. othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
  378. if (!othercon) {
  379. printk("dlm: failed to allocate incoming socket\n");
  380. up_write(&newcon->sock_sem);
  381. result = -ENOMEM;
  382. goto accept_err;
  383. }
  384. othercon->nodeid = nodeid;
  385. othercon->rx_action = receive_from_sock;
  386. init_rwsem(&othercon->sock_sem);
  387. INIT_WORK(&othercon->swork, process_send_sockets);
  388. INIT_WORK(&othercon->rwork, process_recv_sockets);
  389. set_bit(CF_IS_OTHERCON, &othercon->flags);
  390. newcon->othercon = othercon;
  391. }
  392. othercon->sock = newsock;
  393. newsock->sk->sk_user_data = othercon;
  394. add_sock(newsock, othercon);
  395. }
  396. else {
  397. newsock->sk->sk_user_data = newcon;
  398. newcon->rx_action = receive_from_sock;
  399. add_sock(newsock, newcon);
  400. }
  401. up_write(&newcon->sock_sem);
  402. /*
  403. * Add it to the active queue in case we got data
  404. * beween processing the accept adding the socket
  405. * to the read_sockets list
  406. */
  407. if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
  408. queue_work(recv_workqueue, &newcon->rwork);
  409. up_read(&con->sock_sem);
  410. return 0;
  411. accept_err:
  412. up_read(&con->sock_sem);
  413. sock_release(newsock);
  414. if (result != -EAGAIN)
  415. printk("dlm: error accepting connection from node: %d\n", result);
  416. return result;
  417. }
  418. /* Connect a new socket to its peer */
  419. static void connect_to_sock(struct connection *con)
  420. {
  421. int result = -EHOSTUNREACH;
  422. struct sockaddr_storage saddr;
  423. int addr_len;
  424. struct socket *sock;
  425. if (con->nodeid == 0) {
  426. log_print("attempt to connect sock 0 foiled");
  427. return;
  428. }
  429. down_write(&con->sock_sem);
  430. if (con->retries++ > MAX_CONNECT_RETRIES)
  431. goto out;
  432. /* Some odd races can cause double-connects, ignore them */
  433. if (con->sock) {
  434. result = 0;
  435. goto out;
  436. }
  437. /* Create a socket to communicate with */
  438. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
  439. IPPROTO_TCP, &sock);
  440. if (result < 0)
  441. goto out_err;
  442. memset(&saddr, 0, sizeof(saddr));
  443. if (dlm_nodeid_to_addr(con->nodeid, &saddr))
  444. goto out_err;
  445. sock->sk->sk_user_data = con;
  446. con->rx_action = receive_from_sock;
  447. make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
  448. add_sock(sock, con);
  449. log_print("connecting to %d", con->nodeid);
  450. result =
  451. sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
  452. O_NONBLOCK);
  453. if (result == -EINPROGRESS)
  454. result = 0;
  455. if (result == 0)
  456. goto out;
  457. out_err:
  458. if (con->sock) {
  459. sock_release(con->sock);
  460. con->sock = NULL;
  461. }
  462. /*
  463. * Some errors are fatal and this list might need adjusting. For other
  464. * errors we try again until the max number of retries is reached.
  465. */
  466. if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
  467. result != -ENETDOWN && result != EINVAL
  468. && result != -EPROTONOSUPPORT) {
  469. lowcomms_connect_sock(con);
  470. result = 0;
  471. }
  472. out:
  473. up_write(&con->sock_sem);
  474. return;
  475. }
  476. static struct socket *create_listen_sock(struct connection *con,
  477. struct sockaddr_storage *saddr)
  478. {
  479. struct socket *sock = NULL;
  480. mm_segment_t fs;
  481. int result = 0;
  482. int one = 1;
  483. int addr_len;
  484. if (dlm_local_addr.ss_family == AF_INET)
  485. addr_len = sizeof(struct sockaddr_in);
  486. else
  487. addr_len = sizeof(struct sockaddr_in6);
  488. /* Create a socket to communicate with */
  489. result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
  490. if (result < 0) {
  491. printk("dlm: Can't create listening comms socket\n");
  492. goto create_out;
  493. }
  494. fs = get_fs();
  495. set_fs(get_ds());
  496. result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
  497. (char *)&one, sizeof(one));
  498. set_fs(fs);
  499. if (result < 0) {
  500. printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
  501. result);
  502. }
  503. sock->sk->sk_user_data = con;
  504. con->rx_action = accept_from_sock;
  505. con->sock = sock;
  506. /* Bind to our port */
  507. make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
  508. result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
  509. if (result < 0) {
  510. printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
  511. sock_release(sock);
  512. sock = NULL;
  513. con->sock = NULL;
  514. goto create_out;
  515. }
  516. fs = get_fs();
  517. set_fs(get_ds());
  518. result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
  519. (char *)&one, sizeof(one));
  520. set_fs(fs);
  521. if (result < 0) {
  522. printk("dlm: Set keepalive failed: %d\n", result);
  523. }
  524. result = sock->ops->listen(sock, 5);
  525. if (result < 0) {
  526. printk("dlm: Can't listen on port %d\n",
  527. dlm_config.ci_tcp_port);
  528. sock_release(sock);
  529. sock = NULL;
  530. goto create_out;
  531. }
  532. create_out:
  533. return sock;
  534. }
  535. /* Listen on all interfaces */
  536. static int listen_for_all(void)
  537. {
  538. struct socket *sock = NULL;
  539. struct connection *con = nodeid2con(0, GFP_KERNEL);
  540. int result = -EINVAL;
  541. /* We don't support multi-homed hosts */
  542. set_bit(CF_IS_OTHERCON, &con->flags);
  543. sock = create_listen_sock(con, &dlm_local_addr);
  544. if (sock) {
  545. add_sock(sock, con);
  546. result = 0;
  547. }
  548. else {
  549. result = -EADDRINUSE;
  550. }
  551. return result;
  552. }
  553. static struct writequeue_entry *new_writequeue_entry(struct connection *con,
  554. gfp_t allocation)
  555. {
  556. struct writequeue_entry *entry;
  557. entry = kmalloc(sizeof(struct writequeue_entry), allocation);
  558. if (!entry)
  559. return NULL;
  560. entry->page = alloc_page(allocation);
  561. if (!entry->page) {
  562. kfree(entry);
  563. return NULL;
  564. }
  565. entry->offset = 0;
  566. entry->len = 0;
  567. entry->end = 0;
  568. entry->users = 0;
  569. entry->con = con;
  570. return entry;
  571. }
  572. void *dlm_lowcomms_get_buffer(int nodeid, int len,
  573. gfp_t allocation, char **ppc)
  574. {
  575. struct connection *con;
  576. struct writequeue_entry *e;
  577. int offset = 0;
  578. int users = 0;
  579. con = nodeid2con(nodeid, allocation);
  580. if (!con)
  581. return NULL;
  582. spin_lock(&con->writequeue_lock);
  583. e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
  584. if ((&e->list == &con->writequeue) ||
  585. (PAGE_CACHE_SIZE - e->end < len)) {
  586. e = NULL;
  587. } else {
  588. offset = e->end;
  589. e->end += len;
  590. users = e->users++;
  591. }
  592. spin_unlock(&con->writequeue_lock);
  593. if (e) {
  594. got_one:
  595. if (users == 0)
  596. kmap(e->page);
  597. *ppc = page_address(e->page) + offset;
  598. return e;
  599. }
  600. e = new_writequeue_entry(con, allocation);
  601. if (e) {
  602. spin_lock(&con->writequeue_lock);
  603. offset = e->end;
  604. e->end += len;
  605. users = e->users++;
  606. list_add_tail(&e->list, &con->writequeue);
  607. spin_unlock(&con->writequeue_lock);
  608. goto got_one;
  609. }
  610. return NULL;
  611. }
  612. void dlm_lowcomms_commit_buffer(void *mh)
  613. {
  614. struct writequeue_entry *e = (struct writequeue_entry *)mh;
  615. struct connection *con = e->con;
  616. int users;
  617. spin_lock(&con->writequeue_lock);
  618. users = --e->users;
  619. if (users)
  620. goto out;
  621. e->len = e->end - e->offset;
  622. kunmap(e->page);
  623. spin_unlock(&con->writequeue_lock);
  624. if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
  625. queue_work(send_workqueue, &con->swork);
  626. }
  627. return;
  628. out:
  629. spin_unlock(&con->writequeue_lock);
  630. return;
  631. }
  632. static void free_entry(struct writequeue_entry *e)
  633. {
  634. __free_page(e->page);
  635. kfree(e);
  636. }
  637. /* Send a message */
  638. static void send_to_sock(struct connection *con)
  639. {
  640. int ret = 0;
  641. ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
  642. const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
  643. struct writequeue_entry *e;
  644. int len, offset;
  645. down_read(&con->sock_sem);
  646. if (con->sock == NULL)
  647. goto out_connect;
  648. sendpage = con->sock->ops->sendpage;
  649. spin_lock(&con->writequeue_lock);
  650. for (;;) {
  651. e = list_entry(con->writequeue.next, struct writequeue_entry,
  652. list);
  653. if ((struct list_head *) e == &con->writequeue)
  654. break;
  655. len = e->len;
  656. offset = e->offset;
  657. BUG_ON(len == 0 && e->users == 0);
  658. spin_unlock(&con->writequeue_lock);
  659. kmap(e->page);
  660. ret = 0;
  661. if (len) {
  662. ret = sendpage(con->sock, e->page, offset, len,
  663. msg_flags);
  664. if (ret == -EAGAIN || ret == 0)
  665. goto out;
  666. if (ret <= 0)
  667. goto send_error;
  668. }
  669. else {
  670. /* Don't starve people filling buffers */
  671. cond_resched();
  672. }
  673. spin_lock(&con->writequeue_lock);
  674. e->offset += ret;
  675. e->len -= ret;
  676. if (e->len == 0 && e->users == 0) {
  677. list_del(&e->list);
  678. kunmap(e->page);
  679. free_entry(e);
  680. continue;
  681. }
  682. }
  683. spin_unlock(&con->writequeue_lock);
  684. out:
  685. up_read(&con->sock_sem);
  686. return;
  687. send_error:
  688. up_read(&con->sock_sem);
  689. close_connection(con, false);
  690. lowcomms_connect_sock(con);
  691. return;
  692. out_connect:
  693. up_read(&con->sock_sem);
  694. lowcomms_connect_sock(con);
  695. return;
  696. }
  697. static void clean_one_writequeue(struct connection *con)
  698. {
  699. struct list_head *list;
  700. struct list_head *temp;
  701. spin_lock(&con->writequeue_lock);
  702. list_for_each_safe(list, temp, &con->writequeue) {
  703. struct writequeue_entry *e =
  704. list_entry(list, struct writequeue_entry, list);
  705. list_del(&e->list);
  706. free_entry(e);
  707. }
  708. spin_unlock(&con->writequeue_lock);
  709. }
  710. /* Called from recovery when it knows that a node has
  711. left the cluster */
  712. int dlm_lowcomms_close(int nodeid)
  713. {
  714. struct connection *con;
  715. if (!connections)
  716. goto out;
  717. log_print("closing connection to node %d", nodeid);
  718. con = nodeid2con(nodeid, 0);
  719. if (con) {
  720. clean_one_writequeue(con);
  721. close_connection(con, true);
  722. atomic_set(&con->waiting_requests, 0);
  723. }
  724. return 0;
  725. out:
  726. return -1;
  727. }
  728. /* Look for activity on active sockets */
  729. static void process_recv_sockets(struct work_struct *work)
  730. {
  731. struct connection *con = container_of(work, struct connection, rwork);
  732. int err;
  733. clear_bit(CF_READ_PENDING, &con->flags);
  734. do {
  735. err = con->rx_action(con);
  736. } while (!err);
  737. }
  738. static void process_send_sockets(struct work_struct *work)
  739. {
  740. struct connection *con = container_of(work, struct connection, swork);
  741. if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
  742. connect_to_sock(con);
  743. }
  744. if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
  745. send_to_sock(con);
  746. }
  747. }
  748. /* Discard all entries on the write queues */
  749. static void clean_writequeues(void)
  750. {
  751. int nodeid;
  752. for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
  753. struct connection *con = nodeid2con(nodeid, 0);
  754. if (con)
  755. clean_one_writequeue(con);
  756. }
  757. }
  758. static void work_stop(void)
  759. {
  760. destroy_workqueue(recv_workqueue);
  761. destroy_workqueue(send_workqueue);
  762. }
  763. static int work_start(void)
  764. {
  765. int error;
  766. recv_workqueue = create_workqueue("dlm_recv");
  767. error = IS_ERR(recv_workqueue);
  768. if (error) {
  769. log_print("can't start dlm_recv %d", error);
  770. return error;
  771. }
  772. send_workqueue = create_singlethread_workqueue("dlm_send");
  773. error = IS_ERR(send_workqueue);
  774. if (error) {
  775. log_print("can't start dlm_send %d", error);
  776. destroy_workqueue(recv_workqueue);
  777. return error;
  778. }
  779. return 0;
  780. }
  781. void dlm_lowcomms_stop(void)
  782. {
  783. int i;
  784. /* Set all the flags to prevent any
  785. socket activity.
  786. */
  787. for (i = 0; i < conn_array_size; i++) {
  788. if (connections[i])
  789. connections[i]->flags |= 0xFF;
  790. }
  791. work_stop();
  792. clean_writequeues();
  793. for (i = 0; i < conn_array_size; i++) {
  794. if (connections[i]) {
  795. close_connection(connections[i], true);
  796. if (connections[i]->othercon)
  797. kmem_cache_free(con_cache, connections[i]->othercon);
  798. kmem_cache_free(con_cache, connections[i]);
  799. }
  800. }
  801. kfree(connections);
  802. connections = NULL;
  803. kmem_cache_destroy(con_cache);
  804. }
  805. /* This is quite likely to sleep... */
  806. int dlm_lowcomms_start(void)
  807. {
  808. int error = 0;
  809. error = -ENOMEM;
  810. connections = kzalloc(sizeof(struct connection *) *
  811. NODE_INCREMENT, GFP_KERNEL);
  812. if (!connections)
  813. goto out;
  814. conn_array_size = NODE_INCREMENT;
  815. if (dlm_our_addr(&dlm_local_addr, 0)) {
  816. log_print("no local IP address has been set");
  817. goto fail_free_conn;
  818. }
  819. if (!dlm_our_addr(&dlm_local_addr, 1)) {
  820. log_print("This dlm comms module does not support multi-homed clustering");
  821. goto fail_free_conn;
  822. }
  823. con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
  824. __alignof__(struct connection), 0,
  825. NULL, NULL);
  826. if (!con_cache)
  827. goto fail_free_conn;
  828. /* Start listening */
  829. error = listen_for_all();
  830. if (error)
  831. goto fail_unlisten;
  832. error = work_start();
  833. if (error)
  834. goto fail_unlisten;
  835. return 0;
  836. fail_unlisten:
  837. close_connection(connections[0], false);
  838. kmem_cache_free(con_cache, connections[0]);
  839. kmem_cache_destroy(con_cache);
  840. fail_free_conn:
  841. kfree(connections);
  842. out:
  843. return error;
  844. }
  845. /*
  846. * Overrides for Emacs so that we follow Linus's tabbing style.
  847. * Emacs will notice this stuff at the end of the file and automatically
  848. * adjust the settings for this buffer only. This must remain at the end
  849. * of the file.
  850. * ---------------------------------------------------------------------------
  851. * Local variables:
  852. * c-file-style: "linux"
  853. * End:
  854. */