smc_tx.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * Shared Memory Communications over RDMA (SMC-R) and RoCE
  3. *
  4. * Manage send buffer.
  5. * Producer:
  6. * Copy user space data into send buffer, if send buffer space available.
  7. * Consumer:
  8. * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
  9. *
  10. * Copyright IBM Corp. 2016
  11. *
  12. * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
  13. */
  14. #include <linux/net.h>
  15. #include <linux/rcupdate.h>
  16. #include <linux/workqueue.h>
  17. #include <linux/sched/signal.h>
  18. #include <net/sock.h>
  19. #include "smc.h"
  20. #include "smc_wr.h"
  21. #include "smc_cdc.h"
  22. #include "smc_tx.h"
  23. /***************************** sndbuf producer *******************************/
  24. /* callback implementation for sk.sk_write_space()
  25. * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
  26. * called under sk_socket lock.
  27. */
  28. static void smc_tx_write_space(struct sock *sk)
  29. {
  30. struct socket *sock = sk->sk_socket;
  31. struct smc_sock *smc = smc_sk(sk);
  32. struct socket_wq *wq;
  33. /* similar to sk_stream_write_space */
  34. if (atomic_read(&smc->conn.sndbuf_space) && sock) {
  35. clear_bit(SOCK_NOSPACE, &sock->flags);
  36. rcu_read_lock();
  37. wq = rcu_dereference(sk->sk_wq);
  38. if (skwq_has_sleeper(wq))
  39. wake_up_interruptible_poll(&wq->wait,
  40. POLLOUT | POLLWRNORM |
  41. POLLWRBAND);
  42. if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
  43. sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
  44. rcu_read_unlock();
  45. }
  46. }
  47. /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
  48. * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
  49. */
  50. void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
  51. {
  52. if (smc->sk.sk_socket &&
  53. test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
  54. smc->sk.sk_write_space(&smc->sk);
  55. }
  56. /* blocks sndbuf producer until at least one byte of free space available */
  57. static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
  58. {
  59. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  60. struct smc_connection *conn = &smc->conn;
  61. struct sock *sk = &smc->sk;
  62. bool noblock;
  63. long timeo;
  64. int rc = 0;
  65. /* similar to sk_stream_wait_memory */
  66. timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
  67. noblock = timeo ? false : true;
  68. add_wait_queue(sk_sleep(sk), &wait);
  69. while (1) {
  70. sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  71. if (sk->sk_err ||
  72. (sk->sk_shutdown & SEND_SHUTDOWN) ||
  73. conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
  74. rc = -EPIPE;
  75. break;
  76. }
  77. if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
  78. rc = -ECONNRESET;
  79. break;
  80. }
  81. if (!timeo) {
  82. if (noblock)
  83. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  84. rc = -EAGAIN;
  85. break;
  86. }
  87. if (signal_pending(current)) {
  88. rc = sock_intr_errno(timeo);
  89. break;
  90. }
  91. sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  92. if (atomic_read(&conn->sndbuf_space))
  93. break; /* at least 1 byte of free space available */
  94. set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
  95. sk->sk_write_pending++;
  96. sk_wait_event(sk, &timeo,
  97. sk->sk_err ||
  98. (sk->sk_shutdown & SEND_SHUTDOWN) ||
  99. smc_cdc_rxed_any_close_or_senddone(conn) ||
  100. atomic_read(&conn->sndbuf_space),
  101. &wait);
  102. sk->sk_write_pending--;
  103. }
  104. remove_wait_queue(sk_sleep(sk), &wait);
  105. return rc;
  106. }
  107. /* sndbuf producer: main API called by socket layer.
  108. * called under sock lock.
  109. */
  110. int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
  111. {
  112. size_t copylen, send_done = 0, send_remaining = len;
  113. size_t chunk_len, chunk_off, chunk_len_sum;
  114. struct smc_connection *conn = &smc->conn;
  115. union smc_host_cursor prep;
  116. struct sock *sk = &smc->sk;
  117. char *sndbuf_base;
  118. int tx_cnt_prep;
  119. int writespace;
  120. int rc, chunk;
  121. /* This should be in poll */
  122. sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
  123. if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
  124. rc = -EPIPE;
  125. goto out_err;
  126. }
  127. while (msg_data_left(msg)) {
  128. if (sk->sk_state == SMC_INIT)
  129. return -ENOTCONN;
  130. if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
  131. (smc->sk.sk_err == ECONNABORTED) ||
  132. conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
  133. return -EPIPE;
  134. if (smc_cdc_rxed_any_close(conn))
  135. return send_done ?: -ECONNRESET;
  136. if (!atomic_read(&conn->sndbuf_space)) {
  137. rc = smc_tx_wait_memory(smc, msg->msg_flags);
  138. if (rc) {
  139. if (send_done)
  140. return send_done;
  141. goto out_err;
  142. }
  143. continue;
  144. }
  145. /* initialize variables for 1st iteration of subsequent loop */
  146. /* could be just 1 byte, even after smc_tx_wait_memory above */
  147. writespace = atomic_read(&conn->sndbuf_space);
  148. /* not more than what user space asked for */
  149. copylen = min_t(size_t, send_remaining, writespace);
  150. /* determine start of sndbuf */
  151. sndbuf_base = conn->sndbuf_desc->cpu_addr;
  152. smc_curs_write(&prep,
  153. smc_curs_read(&conn->tx_curs_prep, conn),
  154. conn);
  155. tx_cnt_prep = prep.count;
  156. /* determine chunks where to write into sndbuf */
  157. /* either unwrapped case, or 1st chunk of wrapped case */
  158. chunk_len = min_t(size_t,
  159. copylen, conn->sndbuf_size - tx_cnt_prep);
  160. chunk_len_sum = chunk_len;
  161. chunk_off = tx_cnt_prep;
  162. smc_sndbuf_sync_sg_for_cpu(conn);
  163. for (chunk = 0; chunk < 2; chunk++) {
  164. rc = memcpy_from_msg(sndbuf_base + chunk_off,
  165. msg, chunk_len);
  166. if (rc) {
  167. smc_sndbuf_sync_sg_for_device(conn);
  168. if (send_done)
  169. return send_done;
  170. goto out_err;
  171. }
  172. send_done += chunk_len;
  173. send_remaining -= chunk_len;
  174. if (chunk_len_sum == copylen)
  175. break; /* either on 1st or 2nd iteration */
  176. /* prepare next (== 2nd) iteration */
  177. chunk_len = copylen - chunk_len; /* remainder */
  178. chunk_len_sum += chunk_len;
  179. chunk_off = 0; /* modulo offset in send ring buffer */
  180. }
  181. smc_sndbuf_sync_sg_for_device(conn);
  182. /* update cursors */
  183. smc_curs_add(conn->sndbuf_size, &prep, copylen);
  184. smc_curs_write(&conn->tx_curs_prep,
  185. smc_curs_read(&prep, conn),
  186. conn);
  187. /* increased in send tasklet smc_cdc_tx_handler() */
  188. smp_mb__before_atomic();
  189. atomic_sub(copylen, &conn->sndbuf_space);
  190. /* guarantee 0 <= sndbuf_space <= sndbuf_size */
  191. smp_mb__after_atomic();
  192. /* since we just produced more new data into sndbuf,
  193. * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
  194. */
  195. smc_tx_sndbuf_nonempty(conn);
  196. } /* while (msg_data_left(msg)) */
  197. return send_done;
  198. out_err:
  199. rc = sk_stream_error(sk, msg->msg_flags, rc);
  200. /* make sure we wake any epoll edge trigger waiter */
  201. if (unlikely(rc == -EAGAIN))
  202. sk->sk_write_space(sk);
  203. return rc;
  204. }
  205. /***************************** sndbuf consumer *******************************/
  206. /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
  207. static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
  208. int num_sges, struct ib_sge sges[])
  209. {
  210. struct smc_link_group *lgr = conn->lgr;
  211. struct ib_send_wr *failed_wr = NULL;
  212. struct ib_rdma_wr rdma_wr;
  213. struct smc_link *link;
  214. int rc;
  215. memset(&rdma_wr, 0, sizeof(rdma_wr));
  216. link = &lgr->lnk[SMC_SINGLE_LINK];
  217. rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
  218. rdma_wr.wr.sg_list = sges;
  219. rdma_wr.wr.num_sge = num_sges;
  220. rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
  221. rdma_wr.remote_addr =
  222. lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
  223. /* RMBE within RMB */
  224. ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
  225. /* offset within RMBE */
  226. peer_rmbe_offset;
  227. rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
  228. rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
  229. if (rc)
  230. conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
  231. return rc;
  232. }
  233. /* sndbuf consumer */
  234. static inline void smc_tx_advance_cursors(struct smc_connection *conn,
  235. union smc_host_cursor *prod,
  236. union smc_host_cursor *sent,
  237. size_t len)
  238. {
  239. smc_curs_add(conn->peer_rmbe_size, prod, len);
  240. /* increased in recv tasklet smc_cdc_msg_rcv() */
  241. smp_mb__before_atomic();
  242. /* data in flight reduces usable snd_wnd */
  243. atomic_sub(len, &conn->peer_rmbe_space);
  244. /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
  245. smp_mb__after_atomic();
  246. smc_curs_add(conn->sndbuf_size, sent, len);
  247. }
  248. /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
  249. * usable snd_wnd as max transmit
  250. */
  251. static int smc_tx_rdma_writes(struct smc_connection *conn)
  252. {
  253. size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
  254. size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
  255. union smc_host_cursor sent, prep, prod, cons;
  256. struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
  257. struct smc_link_group *lgr = conn->lgr;
  258. int to_send, rmbespace;
  259. struct smc_link *link;
  260. dma_addr_t dma_addr;
  261. int num_sges;
  262. int rc;
  263. /* source: sndbuf */
  264. smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
  265. smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
  266. /* cf. wmem_alloc - (snd_max - snd_una) */
  267. to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
  268. if (to_send <= 0)
  269. return 0;
  270. /* destination: RMBE */
  271. /* cf. snd_wnd */
  272. rmbespace = atomic_read(&conn->peer_rmbe_space);
  273. if (rmbespace <= 0)
  274. return 0;
  275. smc_curs_write(&prod,
  276. smc_curs_read(&conn->local_tx_ctrl.prod, conn),
  277. conn);
  278. smc_curs_write(&cons,
  279. smc_curs_read(&conn->local_rx_ctrl.cons, conn),
  280. conn);
  281. /* if usable snd_wnd closes ask peer to advertise once it opens again */
  282. conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
  283. /* cf. usable snd_wnd */
  284. len = min(to_send, rmbespace);
  285. /* initialize variables for first iteration of subsequent nested loop */
  286. link = &lgr->lnk[SMC_SINGLE_LINK];
  287. dst_off = prod.count;
  288. if (prod.wrap == cons.wrap) {
  289. /* the filled destination area is unwrapped,
  290. * hence the available free destination space is wrapped
  291. * and we need 2 destination chunks of sum len; start with 1st
  292. * which is limited by what's available in sndbuf
  293. */
  294. dst_len = min_t(size_t,
  295. conn->peer_rmbe_size - prod.count, len);
  296. } else {
  297. /* the filled destination area is wrapped,
  298. * hence the available free destination space is unwrapped
  299. * and we need a single destination chunk of entire len
  300. */
  301. dst_len = len;
  302. }
  303. dst_len_sum = dst_len;
  304. src_off = sent.count;
  305. /* dst_len determines the maximum src_len */
  306. if (sent.count + dst_len <= conn->sndbuf_size) {
  307. /* unwrapped src case: single chunk of entire dst_len */
  308. src_len = dst_len;
  309. } else {
  310. /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
  311. src_len = conn->sndbuf_size - sent.count;
  312. }
  313. src_len_sum = src_len;
  314. dma_addr = sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
  315. for (dstchunk = 0; dstchunk < 2; dstchunk++) {
  316. num_sges = 0;
  317. for (srcchunk = 0; srcchunk < 2; srcchunk++) {
  318. sges[srcchunk].addr = dma_addr + src_off;
  319. sges[srcchunk].length = src_len;
  320. sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
  321. num_sges++;
  322. src_off += src_len;
  323. if (src_off >= conn->sndbuf_size)
  324. src_off -= conn->sndbuf_size;
  325. /* modulo in send ring */
  326. if (src_len_sum == dst_len)
  327. break; /* either on 1st or 2nd iteration */
  328. /* prepare next (== 2nd) iteration */
  329. src_len = dst_len - src_len; /* remainder */
  330. src_len_sum += src_len;
  331. }
  332. rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
  333. if (rc)
  334. return rc;
  335. if (dst_len_sum == len)
  336. break; /* either on 1st or 2nd iteration */
  337. /* prepare next (== 2nd) iteration */
  338. dst_off = 0; /* modulo offset in RMBE ring buffer */
  339. dst_len = len - dst_len; /* remainder */
  340. dst_len_sum += dst_len;
  341. src_len = min_t(int,
  342. dst_len, conn->sndbuf_size - sent.count);
  343. src_len_sum = src_len;
  344. }
  345. smc_tx_advance_cursors(conn, &prod, &sent, len);
  346. /* update connection's cursors with advanced local cursors */
  347. smc_curs_write(&conn->local_tx_ctrl.prod,
  348. smc_curs_read(&prod, conn),
  349. conn);
  350. /* dst: peer RMBE */
  351. smc_curs_write(&conn->tx_curs_sent,
  352. smc_curs_read(&sent, conn),
  353. conn);
  354. /* src: local sndbuf */
  355. return 0;
  356. }
  357. /* Wakeup sndbuf consumers from any context (IRQ or process)
  358. * since there is more data to transmit; usable snd_wnd as max transmit
  359. */
  360. int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
  361. {
  362. struct smc_cdc_tx_pend *pend;
  363. struct smc_wr_buf *wr_buf;
  364. int rc;
  365. spin_lock_bh(&conn->send_lock);
  366. rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
  367. &pend);
  368. if (rc < 0) {
  369. if (rc == -EBUSY) {
  370. struct smc_sock *smc =
  371. container_of(conn, struct smc_sock, conn);
  372. if (smc->sk.sk_err == ECONNABORTED) {
  373. rc = sock_error(&smc->sk);
  374. goto out_unlock;
  375. }
  376. rc = 0;
  377. schedule_work(&conn->tx_work);
  378. }
  379. goto out_unlock;
  380. }
  381. rc = smc_tx_rdma_writes(conn);
  382. if (rc) {
  383. smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
  384. (struct smc_wr_tx_pend_priv *)pend);
  385. goto out_unlock;
  386. }
  387. rc = smc_cdc_msg_send(conn, wr_buf, pend);
  388. out_unlock:
  389. spin_unlock_bh(&conn->send_lock);
  390. return rc;
  391. }
  392. /* Wakeup sndbuf consumers from process context
  393. * since there is more data to transmit
  394. */
  395. static void smc_tx_work(struct work_struct *work)
  396. {
  397. struct smc_connection *conn = container_of(work,
  398. struct smc_connection,
  399. tx_work);
  400. struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
  401. int rc;
  402. lock_sock(&smc->sk);
  403. rc = smc_tx_sndbuf_nonempty(conn);
  404. if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
  405. !atomic_read(&conn->bytes_to_rcv))
  406. conn->local_rx_ctrl.prod_flags.write_blocked = 0;
  407. release_sock(&smc->sk);
  408. }
  409. void smc_tx_consumer_update(struct smc_connection *conn)
  410. {
  411. union smc_host_cursor cfed, cons;
  412. struct smc_cdc_tx_pend *pend;
  413. struct smc_wr_buf *wr_buf;
  414. int to_confirm, rc;
  415. smc_curs_write(&cons,
  416. smc_curs_read(&conn->local_tx_ctrl.cons, conn),
  417. conn);
  418. smc_curs_write(&cfed,
  419. smc_curs_read(&conn->rx_curs_confirmed, conn),
  420. conn);
  421. to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
  422. if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
  423. ((to_confirm > conn->rmbe_update_limit) &&
  424. ((to_confirm > (conn->rmbe_size / 2)) ||
  425. conn->local_rx_ctrl.prod_flags.write_blocked))) {
  426. rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
  427. &wr_buf, &pend);
  428. if (!rc)
  429. rc = smc_cdc_msg_send(conn, wr_buf, pend);
  430. if (rc < 0) {
  431. schedule_work(&conn->tx_work);
  432. return;
  433. }
  434. smc_curs_write(&conn->rx_curs_confirmed,
  435. smc_curs_read(&conn->local_tx_ctrl.cons, conn),
  436. conn);
  437. conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
  438. }
  439. if (conn->local_rx_ctrl.prod_flags.write_blocked &&
  440. !atomic_read(&conn->bytes_to_rcv))
  441. conn->local_rx_ctrl.prod_flags.write_blocked = 0;
  442. }
  443. /***************************** send initialize *******************************/
  444. /* Initialize send properties on connection establishment. NB: not __init! */
  445. void smc_tx_init(struct smc_sock *smc)
  446. {
  447. smc->sk.sk_write_space = smc_tx_write_space;
  448. INIT_WORK(&smc->conn.tx_work, smc_tx_work);
  449. spin_lock_init(&smc->conn.send_lock);
  450. }