smc_tx.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /*
  2. * Shared Memory Communications over RDMA (SMC-R) and RoCE
  3. *
  4. * Manage send buffer.
  5. * Producer:
  6. * Copy user space data into send buffer, if send buffer space available.
  7. * Consumer:
  8. * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
  9. *
  10. * Copyright IBM Corp. 2016
  11. *
  12. * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
  13. */
  14. #include <linux/net.h>
  15. #include <linux/rcupdate.h>
  16. #include <linux/workqueue.h>
  17. #include <linux/sched/signal.h>
  18. #include <net/sock.h>
  19. #include "smc.h"
  20. #include "smc_wr.h"
  21. #include "smc_cdc.h"
  22. #include "smc_tx.h"
  23. /***************************** sndbuf producer *******************************/
  24. /* callback implementation for sk.sk_write_space()
  25. * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
  26. * called under sk_socket lock.
  27. */
  28. static void smc_tx_write_space(struct sock *sk)
  29. {
  30. struct socket *sock = sk->sk_socket;
  31. struct smc_sock *smc = smc_sk(sk);
  32. struct socket_wq *wq;
  33. /* similar to sk_stream_write_space */
  34. if (atomic_read(&smc->conn.sndbuf_space) && sock) {
  35. clear_bit(SOCK_NOSPACE, &sock->flags);
  36. rcu_read_lock();
  37. wq = rcu_dereference(sk->sk_wq);
  38. if (skwq_has_sleeper(wq))
  39. wake_up_interruptible_poll(&wq->wait,
  40. POLLOUT | POLLWRNORM |
  41. POLLWRBAND);
  42. if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
  43. sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
  44. rcu_read_unlock();
  45. }
  46. }
  47. /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
  48. * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
  49. */
  50. void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
  51. {
  52. if (smc->sk.sk_socket &&
  53. test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
  54. smc->sk.sk_write_space(&smc->sk);
  55. }
  56. /* blocks sndbuf producer until at least one byte of free space available */
  57. static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
  58. {
  59. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  60. struct smc_connection *conn = &smc->conn;
  61. struct sock *sk = &smc->sk;
  62. bool noblock;
  63. long timeo;
  64. int rc = 0;
  65. /* similar to sk_stream_wait_memory */
  66. timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
  67. noblock = timeo ? false : true;
  68. add_wait_queue(sk_sleep(sk), &wait);
  69. while (1) {
  70. sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  71. if (sk->sk_err ||
  72. (sk->sk_shutdown & SEND_SHUTDOWN) ||
  73. conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
  74. rc = -EPIPE;
  75. break;
  76. }
  77. if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
  78. rc = -ECONNRESET;
  79. break;
  80. }
  81. if (!timeo) {
  82. if (noblock)
  83. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  84. rc = -EAGAIN;
  85. break;
  86. }
  87. if (signal_pending(current)) {
  88. rc = sock_intr_errno(timeo);
  89. break;
  90. }
  91. sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  92. if (atomic_read(&conn->sndbuf_space))
  93. break; /* at least 1 byte of free space available */
  94. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  95. sk->sk_write_pending++;
  96. sk_wait_event(sk, &timeo,
  97. sk->sk_err ||
  98. (sk->sk_shutdown & SEND_SHUTDOWN) ||
  99. smc_cdc_rxed_any_close_or_senddone(conn) ||
  100. atomic_read(&conn->sndbuf_space),
  101. &wait);
  102. sk->sk_write_pending--;
  103. }
  104. remove_wait_queue(sk_sleep(sk), &wait);
  105. return rc;
  106. }
  107. /* sndbuf producer: main API called by socket layer.
  108. * called under sock lock.
  109. */
  110. int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
  111. {
  112. size_t copylen, send_done = 0, send_remaining = len;
  113. size_t chunk_len, chunk_off, chunk_len_sum;
  114. struct smc_connection *conn = &smc->conn;
  115. union smc_host_cursor prep;
  116. struct sock *sk = &smc->sk;
  117. char *sndbuf_base;
  118. int tx_cnt_prep;
  119. int writespace;
  120. int rc, chunk;
  121. /* This should be in poll */
  122. sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  123. if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
  124. rc = -EPIPE;
  125. goto out_err;
  126. }
  127. while (msg_data_left(msg)) {
  128. if (sk->sk_state == SMC_INIT)
  129. return -ENOTCONN;
  130. if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
  131. (smc->sk.sk_err == ECONNABORTED) ||
  132. conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
  133. return -EPIPE;
  134. if (smc_cdc_rxed_any_close(conn))
  135. return send_done ?: -ECONNRESET;
  136. if (!atomic_read(&conn->sndbuf_space)) {
  137. rc = smc_tx_wait_memory(smc, msg->msg_flags);
  138. if (rc) {
  139. if (send_done)
  140. return send_done;
  141. goto out_err;
  142. }
  143. continue;
  144. }
  145. /* initialize variables for 1st iteration of subsequent loop */
  146. /* could be just 1 byte, even after smc_tx_wait_memory above */
  147. writespace = atomic_read(&conn->sndbuf_space);
  148. /* not more than what user space asked for */
  149. copylen = min_t(size_t, send_remaining, writespace);
  150. /* determine start of sndbuf */
  151. sndbuf_base = conn->sndbuf_desc->cpu_addr;
  152. smc_curs_write(&prep,
  153. smc_curs_read(&conn->tx_curs_prep, conn),
  154. conn);
  155. tx_cnt_prep = prep.count;
  156. /* determine chunks where to write into sndbuf */
  157. /* either unwrapped case, or 1st chunk of wrapped case */
  158. chunk_len = min_t(size_t,
  159. copylen, conn->sndbuf_size - tx_cnt_prep);
  160. chunk_len_sum = chunk_len;
  161. chunk_off = tx_cnt_prep;
  162. for (chunk = 0; chunk < 2; chunk++) {
  163. rc = memcpy_from_msg(sndbuf_base + chunk_off,
  164. msg, chunk_len);
  165. if (rc) {
  166. if (send_done)
  167. return send_done;
  168. goto out_err;
  169. }
  170. send_done += chunk_len;
  171. send_remaining -= chunk_len;
  172. if (chunk_len_sum == copylen)
  173. break; /* either on 1st or 2nd iteration */
  174. /* prepare next (== 2nd) iteration */
  175. chunk_len = copylen - chunk_len; /* remainder */
  176. chunk_len_sum += chunk_len;
  177. chunk_off = 0; /* modulo offset in send ring buffer */
  178. }
  179. /* update cursors */
  180. smc_curs_add(conn->sndbuf_size, &prep, copylen);
  181. smc_curs_write(&conn->tx_curs_prep,
  182. smc_curs_read(&prep, conn),
  183. conn);
  184. /* increased in send tasklet smc_cdc_tx_handler() */
  185. smp_mb__before_atomic();
  186. atomic_sub(copylen, &conn->sndbuf_space);
  187. /* guarantee 0 <= sndbuf_space <= sndbuf_size */
  188. smp_mb__after_atomic();
  189. /* since we just produced more new data into sndbuf,
  190. * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
  191. */
  192. smc_tx_sndbuf_nonempty(conn);
  193. } /* while (msg_data_left(msg)) */
  194. return send_done;
  195. out_err:
  196. rc = sk_stream_error(sk, msg->msg_flags, rc);
  197. /* make sure we wake any epoll edge trigger waiter */
  198. if (unlikely(rc == -EAGAIN))
  199. sk->sk_write_space(sk);
  200. return rc;
  201. }
  202. /***************************** sndbuf consumer *******************************/
  203. /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
  204. static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
  205. int num_sges, struct ib_sge sges[])
  206. {
  207. struct smc_link_group *lgr = conn->lgr;
  208. struct ib_send_wr *failed_wr = NULL;
  209. struct ib_rdma_wr rdma_wr;
  210. struct smc_link *link;
  211. int rc;
  212. memset(&rdma_wr, 0, sizeof(rdma_wr));
  213. link = &lgr->lnk[SMC_SINGLE_LINK];
  214. rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
  215. rdma_wr.wr.sg_list = sges;
  216. rdma_wr.wr.num_sge = num_sges;
  217. rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
  218. rdma_wr.remote_addr =
  219. lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
  220. /* RMBE within RMB */
  221. ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
  222. /* offset within RMBE */
  223. peer_rmbe_offset;
  224. rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
  225. rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
  226. if (rc)
  227. conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
  228. return rc;
  229. }
  230. /* sndbuf consumer */
  231. static inline void smc_tx_advance_cursors(struct smc_connection *conn,
  232. union smc_host_cursor *prod,
  233. union smc_host_cursor *sent,
  234. size_t len)
  235. {
  236. smc_curs_add(conn->peer_rmbe_size, prod, len);
  237. /* increased in recv tasklet smc_cdc_msg_rcv() */
  238. smp_mb__before_atomic();
  239. /* data in flight reduces usable snd_wnd */
  240. atomic_sub(len, &conn->peer_rmbe_space);
  241. /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
  242. smp_mb__after_atomic();
  243. smc_curs_add(conn->sndbuf_size, sent, len);
  244. }
  245. /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
  246. * usable snd_wnd as max transmit
  247. */
  248. static int smc_tx_rdma_writes(struct smc_connection *conn)
  249. {
  250. size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
  251. size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
  252. union smc_host_cursor sent, prep, prod, cons;
  253. struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
  254. struct smc_link_group *lgr = conn->lgr;
  255. int to_send, rmbespace;
  256. struct smc_link *link;
  257. int num_sges;
  258. int rc;
  259. /* source: sndbuf */
  260. smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
  261. smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
  262. /* cf. wmem_alloc - (snd_max - snd_una) */
  263. to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
  264. if (to_send <= 0)
  265. return 0;
  266. /* destination: RMBE */
  267. /* cf. snd_wnd */
  268. rmbespace = atomic_read(&conn->peer_rmbe_space);
  269. if (rmbespace <= 0)
  270. return 0;
  271. smc_curs_write(&prod,
  272. smc_curs_read(&conn->local_tx_ctrl.prod, conn),
  273. conn);
  274. smc_curs_write(&cons,
  275. smc_curs_read(&conn->local_rx_ctrl.cons, conn),
  276. conn);
  277. /* if usable snd_wnd closes ask peer to advertise once it opens again */
  278. conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
  279. /* cf. usable snd_wnd */
  280. len = min(to_send, rmbespace);
  281. /* initialize variables for first iteration of subsequent nested loop */
  282. link = &lgr->lnk[SMC_SINGLE_LINK];
  283. dst_off = prod.count;
  284. if (prod.wrap == cons.wrap) {
  285. /* the filled destination area is unwrapped,
  286. * hence the available free destination space is wrapped
  287. * and we need 2 destination chunks of sum len; start with 1st
  288. * which is limited by what's available in sndbuf
  289. */
  290. dst_len = min_t(size_t,
  291. conn->peer_rmbe_size - prod.count, len);
  292. } else {
  293. /* the filled destination area is wrapped,
  294. * hence the available free destination space is unwrapped
  295. * and we need a single destination chunk of entire len
  296. */
  297. dst_len = len;
  298. }
  299. dst_len_sum = dst_len;
  300. src_off = sent.count;
  301. /* dst_len determines the maximum src_len */
  302. if (sent.count + dst_len <= conn->sndbuf_size) {
  303. /* unwrapped src case: single chunk of entire dst_len */
  304. src_len = dst_len;
  305. } else {
  306. /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
  307. src_len = conn->sndbuf_size - sent.count;
  308. }
  309. src_len_sum = src_len;
  310. for (dstchunk = 0; dstchunk < 2; dstchunk++) {
  311. num_sges = 0;
  312. for (srcchunk = 0; srcchunk < 2; srcchunk++) {
  313. sges[srcchunk].addr =
  314. conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] +
  315. src_off;
  316. sges[srcchunk].length = src_len;
  317. sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
  318. num_sges++;
  319. src_off += src_len;
  320. if (src_off >= conn->sndbuf_size)
  321. src_off -= conn->sndbuf_size;
  322. /* modulo in send ring */
  323. if (src_len_sum == dst_len)
  324. break; /* either on 1st or 2nd iteration */
  325. /* prepare next (== 2nd) iteration */
  326. src_len = dst_len - src_len; /* remainder */
  327. src_len_sum += src_len;
  328. }
  329. rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
  330. if (rc)
  331. return rc;
  332. if (dst_len_sum == len)
  333. break; /* either on 1st or 2nd iteration */
  334. /* prepare next (== 2nd) iteration */
  335. dst_off = 0; /* modulo offset in RMBE ring buffer */
  336. dst_len = len - dst_len; /* remainder */
  337. dst_len_sum += dst_len;
  338. src_len = min_t(int,
  339. dst_len, conn->sndbuf_size - sent.count);
  340. src_len_sum = src_len;
  341. }
  342. smc_tx_advance_cursors(conn, &prod, &sent, len);
  343. /* update connection's cursors with advanced local cursors */
  344. smc_curs_write(&conn->local_tx_ctrl.prod,
  345. smc_curs_read(&prod, conn),
  346. conn);
  347. /* dst: peer RMBE */
  348. smc_curs_write(&conn->tx_curs_sent,
  349. smc_curs_read(&sent, conn),
  350. conn);
  351. /* src: local sndbuf */
  352. return 0;
  353. }
  354. /* Wakeup sndbuf consumers from any context (IRQ or process)
  355. * since there is more data to transmit; usable snd_wnd as max transmit
  356. */
  357. int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
  358. {
  359. struct smc_cdc_tx_pend *pend;
  360. struct smc_wr_buf *wr_buf;
  361. int rc;
  362. spin_lock_bh(&conn->send_lock);
  363. rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
  364. &pend);
  365. if (rc < 0) {
  366. if (rc == -EBUSY) {
  367. struct smc_sock *smc =
  368. container_of(conn, struct smc_sock, conn);
  369. if (smc->sk.sk_err == ECONNABORTED) {
  370. rc = sock_error(&smc->sk);
  371. goto out_unlock;
  372. }
  373. rc = 0;
  374. schedule_work(&conn->tx_work);
  375. }
  376. goto out_unlock;
  377. }
  378. rc = smc_tx_rdma_writes(conn);
  379. if (rc) {
  380. smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
  381. (struct smc_wr_tx_pend_priv *)pend);
  382. goto out_unlock;
  383. }
  384. rc = smc_cdc_msg_send(conn, wr_buf, pend);
  385. out_unlock:
  386. spin_unlock_bh(&conn->send_lock);
  387. return rc;
  388. }
  389. /* Wakeup sndbuf consumers from process context
  390. * since there is more data to transmit
  391. */
  392. static void smc_tx_work(struct work_struct *work)
  393. {
  394. struct smc_connection *conn = container_of(work,
  395. struct smc_connection,
  396. tx_work);
  397. struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
  398. lock_sock(&smc->sk);
  399. smc_tx_sndbuf_nonempty(conn);
  400. release_sock(&smc->sk);
  401. }
  402. void smc_tx_consumer_update(struct smc_connection *conn)
  403. {
  404. union smc_host_cursor cfed, cons;
  405. struct smc_cdc_tx_pend *pend;
  406. struct smc_wr_buf *wr_buf;
  407. int to_confirm, rc;
  408. smc_curs_write(&cons,
  409. smc_curs_read(&conn->local_tx_ctrl.cons, conn),
  410. conn);
  411. smc_curs_write(&cfed,
  412. smc_curs_read(&conn->rx_curs_confirmed, conn),
  413. conn);
  414. to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
  415. if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
  416. ((to_confirm > conn->rmbe_update_limit) &&
  417. ((to_confirm > (conn->rmbe_size / 2)) ||
  418. conn->local_rx_ctrl.prod_flags.write_blocked))) {
  419. rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
  420. &wr_buf, &pend);
  421. if (!rc)
  422. rc = smc_cdc_msg_send(conn, wr_buf, pend);
  423. if (rc < 0) {
  424. schedule_work(&conn->tx_work);
  425. return;
  426. }
  427. smc_curs_write(&conn->rx_curs_confirmed,
  428. smc_curs_read(&conn->local_tx_ctrl.cons, conn),
  429. conn);
  430. conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
  431. }
  432. if (conn->local_rx_ctrl.prod_flags.write_blocked &&
  433. !atomic_read(&conn->bytes_to_rcv))
  434. conn->local_rx_ctrl.prod_flags.write_blocked = 0;
  435. }
  436. /***************************** send initialize *******************************/
  437. /* Initialize send properties on connection establishment. NB: not __init! */
  438. void smc_tx_init(struct smc_sock *smc)
  439. {
  440. smc->sk.sk_write_space = smc_tx_write_space;
  441. INIT_WORK(&smc->conn.tx_work, smc_tx_work);
  442. spin_lock_init(&smc->conn.send_lock);
  443. }