svc_rdma_rw.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. /*
  2. * Copyright (c) 2016 Oracle. All rights reserved.
  3. *
  4. * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
  5. */
  6. #include <linux/sunrpc/rpc_rdma.h>
  7. #include <linux/sunrpc/svc_rdma.h>
  8. #include <linux/sunrpc/debug.h>
  9. #include <rdma/rw.h>
  10. #define RPCDBG_FACILITY RPCDBG_SVCXPRT
  11. static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
  12. static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
  13. /* Each R/W context contains state for one chain of RDMA Read or
  14. * Write Work Requests.
  15. *
  16. * Each WR chain handles a single contiguous server-side buffer,
  17. * because scatterlist entries after the first have to start on
  18. * page alignment. xdr_buf iovecs cannot guarantee alignment.
  19. *
  20. * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
  21. * from a client may contain a unique R_key, so each WR chain moves
  22. * up to one segment at a time.
  23. *
  24. * The scatterlist makes this data structure over 4KB in size. To
  25. * make it less likely to fail, and to handle the allocation for
  26. * smaller I/O requests without disabling bottom-halves, these
  27. * contexts are created on demand, but cached and reused until the
  28. * controlling svcxprt_rdma is destroyed.
  29. */
  30. struct svc_rdma_rw_ctxt {
  31. struct list_head rw_list;
  32. struct rdma_rw_ctx rw_ctx;
  33. int rw_nents;
  34. struct sg_table rw_sg_table;
  35. struct scatterlist rw_first_sgl[0];
  36. };
  37. static inline struct svc_rdma_rw_ctxt *
  38. svc_rdma_next_ctxt(struct list_head *list)
  39. {
  40. return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
  41. rw_list);
  42. }
  43. static struct svc_rdma_rw_ctxt *
  44. svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
  45. {
  46. struct svc_rdma_rw_ctxt *ctxt;
  47. spin_lock(&rdma->sc_rw_ctxt_lock);
  48. ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
  49. if (ctxt) {
  50. list_del(&ctxt->rw_list);
  51. spin_unlock(&rdma->sc_rw_ctxt_lock);
  52. } else {
  53. spin_unlock(&rdma->sc_rw_ctxt_lock);
  54. ctxt = kmalloc(sizeof(*ctxt) +
  55. SG_CHUNK_SIZE * sizeof(struct scatterlist),
  56. GFP_KERNEL);
  57. if (!ctxt)
  58. goto out;
  59. INIT_LIST_HEAD(&ctxt->rw_list);
  60. }
  61. ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
  62. if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
  63. ctxt->rw_sg_table.sgl)) {
  64. kfree(ctxt);
  65. ctxt = NULL;
  66. }
  67. out:
  68. return ctxt;
  69. }
  70. static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
  71. struct svc_rdma_rw_ctxt *ctxt)
  72. {
  73. sg_free_table_chained(&ctxt->rw_sg_table, true);
  74. spin_lock(&rdma->sc_rw_ctxt_lock);
  75. list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
  76. spin_unlock(&rdma->sc_rw_ctxt_lock);
  77. }
  78. /**
  79. * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
  80. * @rdma: transport about to be destroyed
  81. *
  82. */
  83. void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
  84. {
  85. struct svc_rdma_rw_ctxt *ctxt;
  86. while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
  87. list_del(&ctxt->rw_list);
  88. kfree(ctxt);
  89. }
  90. }
  91. /* A chunk context tracks all I/O for moving one Read or Write
  92. * chunk. This is a a set of rdma_rw's that handle data movement
  93. * for all segments of one chunk.
  94. *
  95. * These are small, acquired with a single allocator call, and
  96. * no more than one is needed per chunk. They are allocated on
  97. * demand, and not cached.
  98. */
  99. struct svc_rdma_chunk_ctxt {
  100. struct ib_cqe cc_cqe;
  101. struct svcxprt_rdma *cc_rdma;
  102. struct list_head cc_rwctxts;
  103. int cc_sqecount;
  104. };
  105. static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
  106. struct svc_rdma_chunk_ctxt *cc)
  107. {
  108. cc->cc_rdma = rdma;
  109. svc_xprt_get(&rdma->sc_xprt);
  110. INIT_LIST_HEAD(&cc->cc_rwctxts);
  111. cc->cc_sqecount = 0;
  112. }
  113. static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
  114. enum dma_data_direction dir)
  115. {
  116. struct svcxprt_rdma *rdma = cc->cc_rdma;
  117. struct svc_rdma_rw_ctxt *ctxt;
  118. while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
  119. list_del(&ctxt->rw_list);
  120. rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
  121. rdma->sc_port_num, ctxt->rw_sg_table.sgl,
  122. ctxt->rw_nents, dir);
  123. svc_rdma_put_rw_ctxt(rdma, ctxt);
  124. }
  125. svc_xprt_put(&rdma->sc_xprt);
  126. }
  127. /* State for sending a Write or Reply chunk.
  128. * - Tracks progress of writing one chunk over all its segments
  129. * - Stores arguments for the SGL constructor functions
  130. */
  131. struct svc_rdma_write_info {
  132. /* write state of this chunk */
  133. unsigned int wi_seg_off;
  134. unsigned int wi_seg_no;
  135. unsigned int wi_nsegs;
  136. __be32 *wi_segs;
  137. /* SGL constructor arguments */
  138. struct xdr_buf *wi_xdr;
  139. unsigned char *wi_base;
  140. unsigned int wi_next_off;
  141. struct svc_rdma_chunk_ctxt wi_cc;
  142. };
  143. static struct svc_rdma_write_info *
  144. svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
  145. {
  146. struct svc_rdma_write_info *info;
  147. info = kmalloc(sizeof(*info), GFP_KERNEL);
  148. if (!info)
  149. return info;
  150. info->wi_seg_off = 0;
  151. info->wi_seg_no = 0;
  152. info->wi_nsegs = be32_to_cpup(++chunk);
  153. info->wi_segs = ++chunk;
  154. svc_rdma_cc_init(rdma, &info->wi_cc);
  155. info->wi_cc.cc_cqe.done = svc_rdma_write_done;
  156. return info;
  157. }
  158. static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
  159. {
  160. svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
  161. kfree(info);
  162. }
  163. /**
  164. * svc_rdma_write_done - Write chunk completion
  165. * @cq: controlling Completion Queue
  166. * @wc: Work Completion
  167. *
  168. * Pages under I/O are freed by a subsequent Send completion.
  169. */
  170. static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
  171. {
  172. struct ib_cqe *cqe = wc->wr_cqe;
  173. struct svc_rdma_chunk_ctxt *cc =
  174. container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
  175. struct svcxprt_rdma *rdma = cc->cc_rdma;
  176. struct svc_rdma_write_info *info =
  177. container_of(cc, struct svc_rdma_write_info, wi_cc);
  178. atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
  179. wake_up(&rdma->sc_send_wait);
  180. if (unlikely(wc->status != IB_WC_SUCCESS)) {
  181. set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
  182. if (wc->status != IB_WC_WR_FLUSH_ERR)
  183. pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
  184. ib_wc_status_msg(wc->status),
  185. wc->status, wc->vendor_err);
  186. }
  187. svc_rdma_write_info_free(info);
  188. }
  189. /* State for pulling a Read chunk.
  190. */
  191. struct svc_rdma_read_info {
  192. struct svc_rdma_op_ctxt *ri_readctxt;
  193. unsigned int ri_position;
  194. unsigned int ri_pageno;
  195. unsigned int ri_pageoff;
  196. unsigned int ri_chunklen;
  197. struct svc_rdma_chunk_ctxt ri_cc;
  198. };
  199. static struct svc_rdma_read_info *
  200. svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
  201. {
  202. struct svc_rdma_read_info *info;
  203. info = kmalloc(sizeof(*info), GFP_KERNEL);
  204. if (!info)
  205. return info;
  206. svc_rdma_cc_init(rdma, &info->ri_cc);
  207. info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
  208. return info;
  209. }
  210. static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
  211. {
  212. svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
  213. kfree(info);
  214. }
  215. /**
  216. * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
  217. * @cq: controlling Completion Queue
  218. * @wc: Work Completion
  219. *
  220. */
  221. static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
  222. {
  223. struct ib_cqe *cqe = wc->wr_cqe;
  224. struct svc_rdma_chunk_ctxt *cc =
  225. container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
  226. struct svcxprt_rdma *rdma = cc->cc_rdma;
  227. struct svc_rdma_read_info *info =
  228. container_of(cc, struct svc_rdma_read_info, ri_cc);
  229. atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
  230. wake_up(&rdma->sc_send_wait);
  231. if (unlikely(wc->status != IB_WC_SUCCESS)) {
  232. set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
  233. if (wc->status != IB_WC_WR_FLUSH_ERR)
  234. pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
  235. ib_wc_status_msg(wc->status),
  236. wc->status, wc->vendor_err);
  237. svc_rdma_put_context(info->ri_readctxt, 1);
  238. } else {
  239. spin_lock(&rdma->sc_rq_dto_lock);
  240. list_add_tail(&info->ri_readctxt->list,
  241. &rdma->sc_read_complete_q);
  242. spin_unlock(&rdma->sc_rq_dto_lock);
  243. set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
  244. svc_xprt_enqueue(&rdma->sc_xprt);
  245. }
  246. svc_rdma_read_info_free(info);
  247. }
  248. /* This function sleeps when the transport's Send Queue is congested.
  249. *
  250. * Assumptions:
  251. * - If ib_post_send() succeeds, only one completion is expected,
  252. * even if one or more WRs are flushed. This is true when posting
  253. * an rdma_rw_ctx or when posting a single signaled WR.
  254. */
  255. static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
  256. {
  257. struct svcxprt_rdma *rdma = cc->cc_rdma;
  258. struct svc_xprt *xprt = &rdma->sc_xprt;
  259. struct ib_send_wr *first_wr, *bad_wr;
  260. struct list_head *tmp;
  261. struct ib_cqe *cqe;
  262. int ret;
  263. if (cc->cc_sqecount > rdma->sc_sq_depth)
  264. return -EINVAL;
  265. first_wr = NULL;
  266. cqe = &cc->cc_cqe;
  267. list_for_each(tmp, &cc->cc_rwctxts) {
  268. struct svc_rdma_rw_ctxt *ctxt;
  269. ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
  270. first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
  271. rdma->sc_port_num, cqe, first_wr);
  272. cqe = NULL;
  273. }
  274. do {
  275. if (atomic_sub_return(cc->cc_sqecount,
  276. &rdma->sc_sq_avail) > 0) {
  277. ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
  278. if (ret)
  279. break;
  280. return 0;
  281. }
  282. atomic_inc(&rdma_stat_sq_starve);
  283. atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
  284. wait_event(rdma->sc_send_wait,
  285. atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
  286. } while (1);
  287. pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
  288. set_bit(XPT_CLOSE, &xprt->xpt_flags);
  289. /* If even one was posted, there will be a completion. */
  290. if (bad_wr != first_wr)
  291. return 0;
  292. atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
  293. wake_up(&rdma->sc_send_wait);
  294. return -ENOTCONN;
  295. }
  296. /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
  297. */
  298. static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
  299. unsigned int len,
  300. struct svc_rdma_rw_ctxt *ctxt)
  301. {
  302. struct scatterlist *sg = ctxt->rw_sg_table.sgl;
  303. sg_set_buf(&sg[0], info->wi_base, len);
  304. info->wi_base += len;
  305. ctxt->rw_nents = 1;
  306. }
  307. /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
  308. */
  309. static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
  310. unsigned int remaining,
  311. struct svc_rdma_rw_ctxt *ctxt)
  312. {
  313. unsigned int sge_no, sge_bytes, page_off, page_no;
  314. struct xdr_buf *xdr = info->wi_xdr;
  315. struct scatterlist *sg;
  316. struct page **page;
  317. page_off = info->wi_next_off + xdr->page_base;
  318. page_no = page_off >> PAGE_SHIFT;
  319. page_off = offset_in_page(page_off);
  320. page = xdr->pages + page_no;
  321. info->wi_next_off += remaining;
  322. sg = ctxt->rw_sg_table.sgl;
  323. sge_no = 0;
  324. do {
  325. sge_bytes = min_t(unsigned int, remaining,
  326. PAGE_SIZE - page_off);
  327. sg_set_page(sg, *page, sge_bytes, page_off);
  328. remaining -= sge_bytes;
  329. sg = sg_next(sg);
  330. page_off = 0;
  331. sge_no++;
  332. page++;
  333. } while (remaining);
  334. ctxt->rw_nents = sge_no;
  335. }
  336. /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
  337. * an RPC Reply.
  338. */
  339. static int
  340. svc_rdma_build_writes(struct svc_rdma_write_info *info,
  341. void (*constructor)(struct svc_rdma_write_info *info,
  342. unsigned int len,
  343. struct svc_rdma_rw_ctxt *ctxt),
  344. unsigned int remaining)
  345. {
  346. struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
  347. struct svcxprt_rdma *rdma = cc->cc_rdma;
  348. struct svc_rdma_rw_ctxt *ctxt;
  349. __be32 *seg;
  350. int ret;
  351. seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
  352. do {
  353. unsigned int write_len;
  354. u32 seg_length, seg_handle;
  355. u64 seg_offset;
  356. if (info->wi_seg_no >= info->wi_nsegs)
  357. goto out_overflow;
  358. seg_handle = be32_to_cpup(seg);
  359. seg_length = be32_to_cpup(seg + 1);
  360. xdr_decode_hyper(seg + 2, &seg_offset);
  361. seg_offset += info->wi_seg_off;
  362. write_len = min(remaining, seg_length - info->wi_seg_off);
  363. ctxt = svc_rdma_get_rw_ctxt(rdma,
  364. (write_len >> PAGE_SHIFT) + 2);
  365. if (!ctxt)
  366. goto out_noctx;
  367. constructor(info, write_len, ctxt);
  368. ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
  369. rdma->sc_port_num, ctxt->rw_sg_table.sgl,
  370. ctxt->rw_nents, 0, seg_offset,
  371. seg_handle, DMA_TO_DEVICE);
  372. if (ret < 0)
  373. goto out_initerr;
  374. list_add(&ctxt->rw_list, &cc->cc_rwctxts);
  375. cc->cc_sqecount += ret;
  376. if (write_len == seg_length - info->wi_seg_off) {
  377. seg += 4;
  378. info->wi_seg_no++;
  379. info->wi_seg_off = 0;
  380. } else {
  381. info->wi_seg_off += write_len;
  382. }
  383. remaining -= write_len;
  384. } while (remaining);
  385. return 0;
  386. out_overflow:
  387. dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
  388. info->wi_nsegs);
  389. return -E2BIG;
  390. out_noctx:
  391. dprintk("svcrdma: no R/W ctxs available\n");
  392. return -ENOMEM;
  393. out_initerr:
  394. svc_rdma_put_rw_ctxt(rdma, ctxt);
  395. pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
  396. return -EIO;
  397. }
  398. /* Send one of an xdr_buf's kvecs by itself. To send a Reply
  399. * chunk, the whole RPC Reply is written back to the client.
  400. * This function writes either the head or tail of the xdr_buf
  401. * containing the Reply.
  402. */
  403. static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
  404. struct kvec *vec)
  405. {
  406. info->wi_base = vec->iov_base;
  407. return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
  408. vec->iov_len);
  409. }
  410. /* Send an xdr_buf's page list by itself. A Write chunk is
  411. * just the page list. a Reply chunk is the head, page list,
  412. * and tail. This function is shared between the two types
  413. * of chunk.
  414. */
  415. static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
  416. struct xdr_buf *xdr)
  417. {
  418. info->wi_xdr = xdr;
  419. info->wi_next_off = 0;
  420. return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
  421. xdr->page_len);
  422. }
  423. /**
  424. * svc_rdma_send_write_chunk - Write all segments in a Write chunk
  425. * @rdma: controlling RDMA transport
  426. * @wr_ch: Write chunk provided by client
  427. * @xdr: xdr_buf containing the data payload
  428. *
  429. * Returns a non-negative number of bytes the chunk consumed, or
  430. * %-E2BIG if the payload was larger than the Write chunk,
  431. * %-EINVAL if client provided too many segments,
  432. * %-ENOMEM if rdma_rw context pool was exhausted,
  433. * %-ENOTCONN if posting failed (connection is lost),
  434. * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
  435. */
  436. int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
  437. struct xdr_buf *xdr)
  438. {
  439. struct svc_rdma_write_info *info;
  440. int ret;
  441. if (!xdr->page_len)
  442. return 0;
  443. info = svc_rdma_write_info_alloc(rdma, wr_ch);
  444. if (!info)
  445. return -ENOMEM;
  446. ret = svc_rdma_send_xdr_pagelist(info, xdr);
  447. if (ret < 0)
  448. goto out_err;
  449. ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
  450. if (ret < 0)
  451. goto out_err;
  452. return xdr->page_len;
  453. out_err:
  454. svc_rdma_write_info_free(info);
  455. return ret;
  456. }
  457. /**
  458. * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
  459. * @rdma: controlling RDMA transport
  460. * @rp_ch: Reply chunk provided by client
  461. * @writelist: true if client provided a Write list
  462. * @xdr: xdr_buf containing an RPC Reply
  463. *
  464. * Returns a non-negative number of bytes the chunk consumed, or
  465. * %-E2BIG if the payload was larger than the Reply chunk,
  466. * %-EINVAL if client provided too many segments,
  467. * %-ENOMEM if rdma_rw context pool was exhausted,
  468. * %-ENOTCONN if posting failed (connection is lost),
  469. * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
  470. */
  471. int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
  472. bool writelist, struct xdr_buf *xdr)
  473. {
  474. struct svc_rdma_write_info *info;
  475. int consumed, ret;
  476. info = svc_rdma_write_info_alloc(rdma, rp_ch);
  477. if (!info)
  478. return -ENOMEM;
  479. ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
  480. if (ret < 0)
  481. goto out_err;
  482. consumed = xdr->head[0].iov_len;
  483. /* Send the page list in the Reply chunk only if the
  484. * client did not provide Write chunks.
  485. */
  486. if (!writelist && xdr->page_len) {
  487. ret = svc_rdma_send_xdr_pagelist(info, xdr);
  488. if (ret < 0)
  489. goto out_err;
  490. consumed += xdr->page_len;
  491. }
  492. if (xdr->tail[0].iov_len) {
  493. ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
  494. if (ret < 0)
  495. goto out_err;
  496. consumed += xdr->tail[0].iov_len;
  497. }
  498. ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
  499. if (ret < 0)
  500. goto out_err;
  501. return consumed;
  502. out_err:
  503. svc_rdma_write_info_free(info);
  504. return ret;
  505. }
  506. static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
  507. struct svc_rqst *rqstp,
  508. u32 rkey, u32 len, u64 offset)
  509. {
  510. struct svc_rdma_op_ctxt *head = info->ri_readctxt;
  511. struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
  512. struct svc_rdma_rw_ctxt *ctxt;
  513. unsigned int sge_no, seg_len;
  514. struct scatterlist *sg;
  515. int ret;
  516. sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
  517. ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
  518. if (!ctxt)
  519. goto out_noctx;
  520. ctxt->rw_nents = sge_no;
  521. dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n",
  522. len, offset, rkey, sge_no);
  523. sg = ctxt->rw_sg_table.sgl;
  524. for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
  525. seg_len = min_t(unsigned int, len,
  526. PAGE_SIZE - info->ri_pageoff);
  527. head->arg.pages[info->ri_pageno] =
  528. rqstp->rq_pages[info->ri_pageno];
  529. if (!info->ri_pageoff)
  530. head->count++;
  531. sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
  532. seg_len, info->ri_pageoff);
  533. sg = sg_next(sg);
  534. info->ri_pageoff += seg_len;
  535. if (info->ri_pageoff == PAGE_SIZE) {
  536. info->ri_pageno++;
  537. info->ri_pageoff = 0;
  538. }
  539. len -= seg_len;
  540. /* Safety check */
  541. if (len &&
  542. &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
  543. goto out_overrun;
  544. }
  545. ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp,
  546. cc->cc_rdma->sc_port_num,
  547. ctxt->rw_sg_table.sgl, ctxt->rw_nents,
  548. 0, offset, rkey, DMA_FROM_DEVICE);
  549. if (ret < 0)
  550. goto out_initerr;
  551. list_add(&ctxt->rw_list, &cc->cc_rwctxts);
  552. cc->cc_sqecount += ret;
  553. return 0;
  554. out_noctx:
  555. dprintk("svcrdma: no R/W ctxs available\n");
  556. return -ENOMEM;
  557. out_overrun:
  558. dprintk("svcrdma: request overruns rq_pages\n");
  559. return -EINVAL;
  560. out_initerr:
  561. svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt);
  562. pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
  563. return -EIO;
  564. }
  565. static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
  566. struct svc_rdma_read_info *info,
  567. __be32 *p)
  568. {
  569. int ret;
  570. info->ri_chunklen = 0;
  571. while (*p++ != xdr_zero) {
  572. u32 rs_handle, rs_length;
  573. u64 rs_offset;
  574. if (be32_to_cpup(p++) != info->ri_position)
  575. break;
  576. rs_handle = be32_to_cpup(p++);
  577. rs_length = be32_to_cpup(p++);
  578. p = xdr_decode_hyper(p, &rs_offset);
  579. ret = svc_rdma_build_read_segment(info, rqstp,
  580. rs_handle, rs_length,
  581. rs_offset);
  582. if (ret < 0)
  583. break;
  584. info->ri_chunklen += rs_length;
  585. }
  586. return ret;
  587. }
  588. /* If there is inline content following the Read chunk, append it to
  589. * the page list immediately following the data payload. This has to
  590. * be done after the reader function has determined how many pages
  591. * were consumed for RDMA Read.
  592. *
  593. * On entry, ri_pageno and ri_pageoff point directly to the end of the
  594. * page list. On exit, both have been updated to the new "next byte".
  595. *
  596. * Assumptions:
  597. * - Inline content fits entirely in rq_pages[0]
  598. * - Trailing content is only a handful of bytes
  599. */
  600. static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
  601. struct svc_rdma_read_info *info)
  602. {
  603. struct svc_rdma_op_ctxt *head = info->ri_readctxt;
  604. unsigned int tail_length, remaining;
  605. u8 *srcp, *destp;
  606. /* Assert that all inline content fits in page 0. This is an
  607. * implementation limit, not a protocol limit.
  608. */
  609. if (head->arg.head[0].iov_len > PAGE_SIZE) {
  610. pr_warn_once("svcrdma: too much trailing inline content\n");
  611. return -EINVAL;
  612. }
  613. srcp = head->arg.head[0].iov_base;
  614. srcp += info->ri_position;
  615. tail_length = head->arg.head[0].iov_len - info->ri_position;
  616. remaining = tail_length;
  617. /* If there is room on the last page in the page list, try to
  618. * fit the trailing content there.
  619. */
  620. if (info->ri_pageoff > 0) {
  621. unsigned int len;
  622. len = min_t(unsigned int, remaining,
  623. PAGE_SIZE - info->ri_pageoff);
  624. destp = page_address(rqstp->rq_pages[info->ri_pageno]);
  625. destp += info->ri_pageoff;
  626. memcpy(destp, srcp, len);
  627. srcp += len;
  628. destp += len;
  629. info->ri_pageoff += len;
  630. remaining -= len;
  631. if (info->ri_pageoff == PAGE_SIZE) {
  632. info->ri_pageno++;
  633. info->ri_pageoff = 0;
  634. }
  635. }
  636. /* Otherwise, a fresh page is needed. */
  637. if (remaining) {
  638. head->arg.pages[info->ri_pageno] =
  639. rqstp->rq_pages[info->ri_pageno];
  640. head->count++;
  641. destp = page_address(rqstp->rq_pages[info->ri_pageno]);
  642. memcpy(destp, srcp, remaining);
  643. info->ri_pageoff += remaining;
  644. }
  645. head->arg.page_len += tail_length;
  646. head->arg.len += tail_length;
  647. head->arg.buflen += tail_length;
  648. return 0;
  649. }
  650. /* Construct RDMA Reads to pull over a normal Read chunk. The chunk
  651. * data lands in the page list of head->arg.pages.
  652. *
  653. * Currently NFSD does not look at the head->arg.tail[0] iovec.
  654. * Therefore, XDR round-up of the Read chunk and trailing
  655. * inline content must both be added at the end of the pagelist.
  656. */
  657. static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
  658. struct svc_rdma_read_info *info,
  659. __be32 *p)
  660. {
  661. struct svc_rdma_op_ctxt *head = info->ri_readctxt;
  662. int ret;
  663. dprintk("svcrdma: Reading Read chunk at position %u\n",
  664. info->ri_position);
  665. info->ri_pageno = head->hdr_count;
  666. info->ri_pageoff = 0;
  667. ret = svc_rdma_build_read_chunk(rqstp, info, p);
  668. if (ret < 0)
  669. goto out;
  670. /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7).
  671. */
  672. if (info->ri_chunklen & 3) {
  673. u32 padlen = 4 - (info->ri_chunklen & 3);
  674. info->ri_chunklen += padlen;
  675. /* NB: data payload always starts on XDR alignment,
  676. * thus the pad can never contain a page boundary.
  677. */
  678. info->ri_pageoff += padlen;
  679. if (info->ri_pageoff == PAGE_SIZE) {
  680. info->ri_pageno++;
  681. info->ri_pageoff = 0;
  682. }
  683. }
  684. head->arg.page_len = info->ri_chunklen;
  685. head->arg.len += info->ri_chunklen;
  686. head->arg.buflen += info->ri_chunklen;
  687. if (info->ri_position < head->arg.head[0].iov_len) {
  688. ret = svc_rdma_copy_tail(rqstp, info);
  689. if (ret < 0)
  690. goto out;
  691. }
  692. head->arg.head[0].iov_len = info->ri_position;
  693. out:
  694. return ret;
  695. }
  696. /* Construct RDMA Reads to pull over a Position Zero Read chunk.
  697. * The start of the data lands in the first page just after
  698. * the Transport header, and the rest lands in the page list of
  699. * head->arg.pages.
  700. *
  701. * Assumptions:
  702. * - A PZRC has an XDR-aligned length (no implicit round-up).
  703. * - There can be no trailing inline content (IOW, we assume
  704. * a PZRC is never sent in an RDMA_MSG message, though it's
  705. * allowed by spec).
  706. */
  707. static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
  708. struct svc_rdma_read_info *info,
  709. __be32 *p)
  710. {
  711. struct svc_rdma_op_ctxt *head = info->ri_readctxt;
  712. int ret;
  713. dprintk("svcrdma: Reading Position Zero Read chunk\n");
  714. info->ri_pageno = head->hdr_count - 1;
  715. info->ri_pageoff = offset_in_page(head->byte_len);
  716. ret = svc_rdma_build_read_chunk(rqstp, info, p);
  717. if (ret < 0)
  718. goto out;
  719. head->arg.len += info->ri_chunklen;
  720. head->arg.buflen += info->ri_chunklen;
  721. if (head->arg.buflen <= head->sge[0].length) {
  722. /* Transport header and RPC message fit entirely
  723. * in page where head iovec resides.
  724. */
  725. head->arg.head[0].iov_len = info->ri_chunklen;
  726. } else {
  727. /* Transport header and part of RPC message reside
  728. * in the head iovec's page.
  729. */
  730. head->arg.head[0].iov_len =
  731. head->sge[0].length - head->byte_len;
  732. head->arg.page_len =
  733. info->ri_chunklen - head->arg.head[0].iov_len;
  734. }
  735. out:
  736. return ret;
  737. }
  738. /**
  739. * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
  740. * @rdma: controlling RDMA transport
  741. * @rqstp: set of pages to use as Read sink buffers
  742. * @head: pages under I/O collect here
  743. * @p: pointer to start of Read chunk
  744. *
  745. * Returns:
  746. * %0 if all needed RDMA Reads were posted successfully,
  747. * %-EINVAL if client provided too many segments,
  748. * %-ENOMEM if rdma_rw context pool was exhausted,
  749. * %-ENOTCONN if posting failed (connection is lost),
  750. * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
  751. *
  752. * Assumptions:
  753. * - All Read segments in @p have the same Position value.
  754. */
  755. int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
  756. struct svc_rdma_op_ctxt *head, __be32 *p)
  757. {
  758. struct svc_rdma_read_info *info;
  759. struct page **page;
  760. int ret;
  761. /* The request (with page list) is constructed in
  762. * head->arg. Pages involved with RDMA Read I/O are
  763. * transferred there.
  764. */
  765. head->hdr_count = head->count;
  766. head->arg.head[0] = rqstp->rq_arg.head[0];
  767. head->arg.tail[0] = rqstp->rq_arg.tail[0];
  768. head->arg.pages = head->pages;
  769. head->arg.page_base = 0;
  770. head->arg.page_len = 0;
  771. head->arg.len = rqstp->rq_arg.len;
  772. head->arg.buflen = rqstp->rq_arg.buflen;
  773. info = svc_rdma_read_info_alloc(rdma);
  774. if (!info)
  775. return -ENOMEM;
  776. info->ri_readctxt = head;
  777. info->ri_position = be32_to_cpup(p + 1);
  778. if (info->ri_position)
  779. ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
  780. else
  781. ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
  782. /* Mark the start of the pages that can be used for the reply */
  783. if (info->ri_pageoff > 0)
  784. info->ri_pageno++;
  785. rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno];
  786. rqstp->rq_next_page = rqstp->rq_respages + 1;
  787. if (ret < 0)
  788. goto out;
  789. ret = svc_rdma_post_chunk_ctxt(&info->ri_cc);
  790. out:
  791. /* Read sink pages have been moved from rqstp->rq_pages to
  792. * head->arg.pages. Force svc_recv to refill those slots
  793. * in rq_pages.
  794. */
  795. for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++)
  796. *page = NULL;
  797. if (ret < 0)
  798. svc_rdma_read_info_free(info);
  799. return ret;
  800. }