svc_rdma_recvfrom.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. /*
  2. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  3. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  4. *
  5. * This software is available to you under a choice of one of two
  6. * licenses. You may choose to be licensed under the terms of the GNU
  7. * General Public License (GPL) Version 2, available from the file
  8. * COPYING in the main directory of this source tree, or the BSD-type
  9. * license below:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions
  13. * are met:
  14. *
  15. * Redistributions of source code must retain the above copyright
  16. * notice, this list of conditions and the following disclaimer.
  17. *
  18. * Redistributions in binary form must reproduce the above
  19. * copyright notice, this list of conditions and the following
  20. * disclaimer in the documentation and/or other materials provided
  21. * with the distribution.
  22. *
  23. * Neither the name of the Network Appliance, Inc. nor the names of
  24. * its contributors may be used to endorse or promote products
  25. * derived from this software without specific prior written
  26. * permission.
  27. *
  28. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39. *
  40. * Author: Tom Tucker <tom@opengridcomputing.com>
  41. */
  42. #include <linux/sunrpc/debug.h>
  43. #include <linux/sunrpc/rpc_rdma.h>
  44. #include <linux/spinlock.h>
  45. #include <asm/unaligned.h>
  46. #include <rdma/ib_verbs.h>
  47. #include <rdma/rdma_cm.h>
  48. #include <linux/sunrpc/svc_rdma.h>
  49. #define RPCDBG_FACILITY RPCDBG_SVCXPRT
  50. /*
  51. * Replace the pages in the rq_argpages array with the pages from the SGE in
  52. * the RDMA_RECV completion. The SGL should contain full pages up until the
  53. * last one.
  54. */
  55. static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
  56. struct svc_rdma_op_ctxt *ctxt,
  57. u32 byte_count)
  58. {
  59. struct rpcrdma_msg *rmsgp;
  60. struct page *page;
  61. u32 bc;
  62. int sge_no;
  63. /* Swap the page in the SGE with the page in argpages */
  64. page = ctxt->pages[0];
  65. put_page(rqstp->rq_pages[0]);
  66. rqstp->rq_pages[0] = page;
  67. /* Set up the XDR head */
  68. rqstp->rq_arg.head[0].iov_base = page_address(page);
  69. rqstp->rq_arg.head[0].iov_len =
  70. min_t(size_t, byte_count, ctxt->sge[0].length);
  71. rqstp->rq_arg.len = byte_count;
  72. rqstp->rq_arg.buflen = byte_count;
  73. /* Compute bytes past head in the SGL */
  74. bc = byte_count - rqstp->rq_arg.head[0].iov_len;
  75. /* If data remains, store it in the pagelist */
  76. rqstp->rq_arg.page_len = bc;
  77. rqstp->rq_arg.page_base = 0;
  78. /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
  79. rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
  80. if (be32_to_cpu(rmsgp->rm_type) == RDMA_NOMSG)
  81. rqstp->rq_arg.pages = &rqstp->rq_pages[0];
  82. else
  83. rqstp->rq_arg.pages = &rqstp->rq_pages[1];
  84. sge_no = 1;
  85. while (bc && sge_no < ctxt->count) {
  86. page = ctxt->pages[sge_no];
  87. put_page(rqstp->rq_pages[sge_no]);
  88. rqstp->rq_pages[sge_no] = page;
  89. bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
  90. rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
  91. sge_no++;
  92. }
  93. rqstp->rq_respages = &rqstp->rq_pages[sge_no];
  94. rqstp->rq_next_page = rqstp->rq_respages + 1;
  95. /* If not all pages were used from the SGL, free the remaining ones */
  96. bc = sge_no;
  97. while (sge_no < ctxt->count) {
  98. page = ctxt->pages[sge_no++];
  99. put_page(page);
  100. }
  101. ctxt->count = bc;
  102. /* Set up tail */
  103. rqstp->rq_arg.tail[0].iov_base = NULL;
  104. rqstp->rq_arg.tail[0].iov_len = 0;
  105. }
  106. static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
  107. {
  108. if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
  109. RDMA_TRANSPORT_IWARP)
  110. return 1;
  111. else
  112. return min_t(int, sge_count, xprt->sc_max_sge);
  113. }
  114. /* Issue an RDMA_READ using the local lkey to map the data sink */
  115. int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
  116. struct svc_rqst *rqstp,
  117. struct svc_rdma_op_ctxt *head,
  118. int *page_no,
  119. u32 *page_offset,
  120. u32 rs_handle,
  121. u32 rs_length,
  122. u64 rs_offset,
  123. bool last)
  124. {
  125. struct ib_send_wr read_wr;
  126. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  127. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  128. int ret, read, pno;
  129. u32 pg_off = *page_offset;
  130. u32 pg_no = *page_no;
  131. ctxt->direction = DMA_FROM_DEVICE;
  132. ctxt->read_hdr = head;
  133. pages_needed =
  134. min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
  135. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  136. for (pno = 0; pno < pages_needed; pno++) {
  137. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  138. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  139. head->arg.page_len += len;
  140. head->arg.len += len;
  141. if (!pg_off)
  142. head->count++;
  143. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  144. rqstp->rq_next_page = rqstp->rq_respages + 1;
  145. ctxt->sge[pno].addr =
  146. ib_dma_map_page(xprt->sc_cm_id->device,
  147. head->arg.pages[pg_no], pg_off,
  148. PAGE_SIZE - pg_off,
  149. DMA_FROM_DEVICE);
  150. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  151. ctxt->sge[pno].addr);
  152. if (ret)
  153. goto err;
  154. atomic_inc(&xprt->sc_dma_used);
  155. /* The lkey here is either a local dma lkey or a dma_mr lkey */
  156. ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
  157. ctxt->sge[pno].length = len;
  158. ctxt->count++;
  159. /* adjust offset and wrap to next page if needed */
  160. pg_off += len;
  161. if (pg_off == PAGE_SIZE) {
  162. pg_off = 0;
  163. pg_no++;
  164. }
  165. rs_length -= len;
  166. }
  167. if (last && rs_length == 0)
  168. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  169. else
  170. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  171. memset(&read_wr, 0, sizeof(read_wr));
  172. read_wr.wr_id = (unsigned long)ctxt;
  173. read_wr.opcode = IB_WR_RDMA_READ;
  174. ctxt->wr_op = read_wr.opcode;
  175. read_wr.send_flags = IB_SEND_SIGNALED;
  176. read_wr.wr.rdma.rkey = rs_handle;
  177. read_wr.wr.rdma.remote_addr = rs_offset;
  178. read_wr.sg_list = ctxt->sge;
  179. read_wr.num_sge = pages_needed;
  180. ret = svc_rdma_send(xprt, &read_wr);
  181. if (ret) {
  182. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  183. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  184. goto err;
  185. }
  186. /* return current location in page array */
  187. *page_no = pg_no;
  188. *page_offset = pg_off;
  189. ret = read;
  190. atomic_inc(&rdma_stat_read);
  191. return ret;
  192. err:
  193. svc_rdma_unmap_dma(ctxt);
  194. svc_rdma_put_context(ctxt, 0);
  195. return ret;
  196. }
  197. /* Issue an RDMA_READ using an FRMR to map the data sink */
  198. int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
  199. struct svc_rqst *rqstp,
  200. struct svc_rdma_op_ctxt *head,
  201. int *page_no,
  202. u32 *page_offset,
  203. u32 rs_handle,
  204. u32 rs_length,
  205. u64 rs_offset,
  206. bool last)
  207. {
  208. struct ib_send_wr read_wr;
  209. struct ib_send_wr inv_wr;
  210. struct ib_send_wr fastreg_wr;
  211. u8 key;
  212. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  213. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  214. struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
  215. int ret, read, pno;
  216. u32 pg_off = *page_offset;
  217. u32 pg_no = *page_no;
  218. if (IS_ERR(frmr))
  219. return -ENOMEM;
  220. ctxt->direction = DMA_FROM_DEVICE;
  221. ctxt->frmr = frmr;
  222. pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
  223. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  224. frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
  225. frmr->direction = DMA_FROM_DEVICE;
  226. frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
  227. frmr->map_len = pages_needed << PAGE_SHIFT;
  228. frmr->page_list_len = pages_needed;
  229. for (pno = 0; pno < pages_needed; pno++) {
  230. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  231. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  232. head->arg.page_len += len;
  233. head->arg.len += len;
  234. if (!pg_off)
  235. head->count++;
  236. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  237. rqstp->rq_next_page = rqstp->rq_respages + 1;
  238. frmr->page_list->page_list[pno] =
  239. ib_dma_map_page(xprt->sc_cm_id->device,
  240. head->arg.pages[pg_no], 0,
  241. PAGE_SIZE, DMA_FROM_DEVICE);
  242. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  243. frmr->page_list->page_list[pno]);
  244. if (ret)
  245. goto err;
  246. atomic_inc(&xprt->sc_dma_used);
  247. /* adjust offset and wrap to next page if needed */
  248. pg_off += len;
  249. if (pg_off == PAGE_SIZE) {
  250. pg_off = 0;
  251. pg_no++;
  252. }
  253. rs_length -= len;
  254. }
  255. if (last && rs_length == 0)
  256. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  257. else
  258. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  259. /* Bump the key */
  260. key = (u8)(frmr->mr->lkey & 0x000000FF);
  261. ib_update_fast_reg_key(frmr->mr, ++key);
  262. ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
  263. ctxt->sge[0].lkey = frmr->mr->lkey;
  264. ctxt->sge[0].length = read;
  265. ctxt->count = 1;
  266. ctxt->read_hdr = head;
  267. /* Prepare FASTREG WR */
  268. memset(&fastreg_wr, 0, sizeof(fastreg_wr));
  269. fastreg_wr.opcode = IB_WR_FAST_REG_MR;
  270. fastreg_wr.send_flags = IB_SEND_SIGNALED;
  271. fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
  272. fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
  273. fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
  274. fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
  275. fastreg_wr.wr.fast_reg.length = frmr->map_len;
  276. fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
  277. fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
  278. fastreg_wr.next = &read_wr;
  279. /* Prepare RDMA_READ */
  280. memset(&read_wr, 0, sizeof(read_wr));
  281. read_wr.send_flags = IB_SEND_SIGNALED;
  282. read_wr.wr.rdma.rkey = rs_handle;
  283. read_wr.wr.rdma.remote_addr = rs_offset;
  284. read_wr.sg_list = ctxt->sge;
  285. read_wr.num_sge = 1;
  286. if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
  287. read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
  288. read_wr.wr_id = (unsigned long)ctxt;
  289. read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
  290. } else {
  291. read_wr.opcode = IB_WR_RDMA_READ;
  292. read_wr.next = &inv_wr;
  293. /* Prepare invalidate */
  294. memset(&inv_wr, 0, sizeof(inv_wr));
  295. inv_wr.wr_id = (unsigned long)ctxt;
  296. inv_wr.opcode = IB_WR_LOCAL_INV;
  297. inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
  298. inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
  299. }
  300. ctxt->wr_op = read_wr.opcode;
  301. /* Post the chain */
  302. ret = svc_rdma_send(xprt, &fastreg_wr);
  303. if (ret) {
  304. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  305. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  306. goto err;
  307. }
  308. /* return current location in page array */
  309. *page_no = pg_no;
  310. *page_offset = pg_off;
  311. ret = read;
  312. atomic_inc(&rdma_stat_read);
  313. return ret;
  314. err:
  315. svc_rdma_unmap_dma(ctxt);
  316. svc_rdma_put_context(ctxt, 0);
  317. svc_rdma_put_frmr(xprt, frmr);
  318. return ret;
  319. }
  320. static unsigned int
  321. rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
  322. {
  323. unsigned int count;
  324. for (count = 0; ch->rc_discrim != xdr_zero; ch++)
  325. count++;
  326. return count;
  327. }
  328. /* If there was additional inline content, append it to the end of arg.pages.
  329. * Tail copy has to be done after the reader function has determined how many
  330. * pages are needed for RDMA READ.
  331. */
  332. static int
  333. rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
  334. u32 position, u32 byte_count, u32 page_offset, int page_no)
  335. {
  336. char *srcp, *destp;
  337. int ret;
  338. ret = 0;
  339. srcp = head->arg.head[0].iov_base + position;
  340. byte_count = head->arg.head[0].iov_len - position;
  341. if (byte_count > PAGE_SIZE) {
  342. dprintk("svcrdma: large tail unsupported\n");
  343. return 0;
  344. }
  345. /* Fit as much of the tail on the current page as possible */
  346. if (page_offset != PAGE_SIZE) {
  347. destp = page_address(rqstp->rq_arg.pages[page_no]);
  348. destp += page_offset;
  349. while (byte_count--) {
  350. *destp++ = *srcp++;
  351. page_offset++;
  352. if (page_offset == PAGE_SIZE && byte_count)
  353. goto more;
  354. }
  355. goto done;
  356. }
  357. more:
  358. /* Fit the rest on the next page */
  359. page_no++;
  360. destp = page_address(rqstp->rq_arg.pages[page_no]);
  361. while (byte_count--)
  362. *destp++ = *srcp++;
  363. rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
  364. rqstp->rq_next_page = rqstp->rq_respages + 1;
  365. done:
  366. byte_count = head->arg.head[0].iov_len - position;
  367. head->arg.page_len += byte_count;
  368. head->arg.len += byte_count;
  369. head->arg.buflen += byte_count;
  370. return 1;
  371. }
  372. static int rdma_read_chunks(struct svcxprt_rdma *xprt,
  373. struct rpcrdma_msg *rmsgp,
  374. struct svc_rqst *rqstp,
  375. struct svc_rdma_op_ctxt *head)
  376. {
  377. int page_no, ret;
  378. struct rpcrdma_read_chunk *ch;
  379. u32 handle, page_offset, byte_count;
  380. u32 position;
  381. u64 rs_offset;
  382. bool last;
  383. /* If no read list is present, return 0 */
  384. ch = svc_rdma_get_read_chunk(rmsgp);
  385. if (!ch)
  386. return 0;
  387. if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
  388. return -EINVAL;
  389. /* The request is completed when the RDMA_READs complete. The
  390. * head context keeps all the pages that comprise the
  391. * request.
  392. */
  393. head->arg.head[0] = rqstp->rq_arg.head[0];
  394. head->arg.tail[0] = rqstp->rq_arg.tail[0];
  395. head->hdr_count = head->count;
  396. head->arg.page_base = 0;
  397. head->arg.page_len = 0;
  398. head->arg.len = rqstp->rq_arg.len;
  399. head->arg.buflen = rqstp->rq_arg.buflen;
  400. ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
  401. position = be32_to_cpu(ch->rc_position);
  402. /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
  403. if (position == 0) {
  404. head->arg.pages = &head->pages[0];
  405. page_offset = head->byte_len;
  406. } else {
  407. head->arg.pages = &head->pages[head->count];
  408. page_offset = 0;
  409. }
  410. ret = 0;
  411. page_no = 0;
  412. for (; ch->rc_discrim != xdr_zero; ch++) {
  413. if (be32_to_cpu(ch->rc_position) != position)
  414. goto err;
  415. handle = be32_to_cpu(ch->rc_target.rs_handle),
  416. byte_count = be32_to_cpu(ch->rc_target.rs_length);
  417. xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
  418. &rs_offset);
  419. while (byte_count > 0) {
  420. last = (ch + 1)->rc_discrim == xdr_zero;
  421. ret = xprt->sc_reader(xprt, rqstp, head,
  422. &page_no, &page_offset,
  423. handle, byte_count,
  424. rs_offset, last);
  425. if (ret < 0)
  426. goto err;
  427. byte_count -= ret;
  428. rs_offset += ret;
  429. head->arg.buflen += ret;
  430. }
  431. }
  432. /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
  433. if (page_offset & 3) {
  434. u32 pad = 4 - (page_offset & 3);
  435. head->arg.page_len += pad;
  436. head->arg.len += pad;
  437. head->arg.buflen += pad;
  438. page_offset += pad;
  439. }
  440. ret = 1;
  441. if (position && position < head->arg.head[0].iov_len)
  442. ret = rdma_copy_tail(rqstp, head, position,
  443. byte_count, page_offset, page_no);
  444. head->arg.head[0].iov_len = position;
  445. head->position = position;
  446. err:
  447. /* Detach arg pages. svc_recv will replenish them */
  448. for (page_no = 0;
  449. &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
  450. rqstp->rq_pages[page_no] = NULL;
  451. return ret;
  452. }
  453. static int rdma_read_complete(struct svc_rqst *rqstp,
  454. struct svc_rdma_op_ctxt *head)
  455. {
  456. int page_no;
  457. int ret;
  458. /* Copy RPC pages */
  459. for (page_no = 0; page_no < head->count; page_no++) {
  460. put_page(rqstp->rq_pages[page_no]);
  461. rqstp->rq_pages[page_no] = head->pages[page_no];
  462. }
  463. /* Adjustments made for RDMA_NOMSG type requests */
  464. if (head->position == 0) {
  465. if (head->arg.len <= head->sge[0].length) {
  466. head->arg.head[0].iov_len = head->arg.len -
  467. head->byte_len;
  468. head->arg.page_len = 0;
  469. } else {
  470. head->arg.head[0].iov_len = head->sge[0].length -
  471. head->byte_len;
  472. head->arg.page_len = head->arg.len -
  473. head->sge[0].length;
  474. }
  475. }
  476. /* Point rq_arg.pages past header */
  477. rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
  478. rqstp->rq_arg.page_len = head->arg.page_len;
  479. rqstp->rq_arg.page_base = head->arg.page_base;
  480. /* rq_respages starts after the last arg page */
  481. rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
  482. rqstp->rq_next_page = rqstp->rq_respages + 1;
  483. /* Rebuild rq_arg head and tail. */
  484. rqstp->rq_arg.head[0] = head->arg.head[0];
  485. rqstp->rq_arg.tail[0] = head->arg.tail[0];
  486. rqstp->rq_arg.len = head->arg.len;
  487. rqstp->rq_arg.buflen = head->arg.buflen;
  488. /* Free the context */
  489. svc_rdma_put_context(head, 0);
  490. /* XXX: What should this be? */
  491. rqstp->rq_prot = IPPROTO_MAX;
  492. svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
  493. ret = rqstp->rq_arg.head[0].iov_len
  494. + rqstp->rq_arg.page_len
  495. + rqstp->rq_arg.tail[0].iov_len;
  496. dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
  497. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
  498. ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
  499. rqstp->rq_arg.head[0].iov_len);
  500. return ret;
  501. }
  502. /*
  503. * Set up the rqstp thread context to point to the RQ buffer. If
  504. * necessary, pull additional data from the client with an RDMA_READ
  505. * request.
  506. */
  507. int svc_rdma_recvfrom(struct svc_rqst *rqstp)
  508. {
  509. struct svc_xprt *xprt = rqstp->rq_xprt;
  510. struct svcxprt_rdma *rdma_xprt =
  511. container_of(xprt, struct svcxprt_rdma, sc_xprt);
  512. struct svc_rdma_op_ctxt *ctxt = NULL;
  513. struct rpcrdma_msg *rmsgp;
  514. int ret = 0;
  515. int len;
  516. dprintk("svcrdma: rqstp=%p\n", rqstp);
  517. spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
  518. if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
  519. ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
  520. struct svc_rdma_op_ctxt,
  521. dto_q);
  522. list_del_init(&ctxt->dto_q);
  523. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  524. return rdma_read_complete(rqstp, ctxt);
  525. } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
  526. ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
  527. struct svc_rdma_op_ctxt,
  528. dto_q);
  529. list_del_init(&ctxt->dto_q);
  530. } else {
  531. atomic_inc(&rdma_stat_rq_starve);
  532. clear_bit(XPT_DATA, &xprt->xpt_flags);
  533. ctxt = NULL;
  534. }
  535. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  536. if (!ctxt) {
  537. /* This is the EAGAIN path. The svc_recv routine will
  538. * return -EAGAIN, the nfsd thread will go to call into
  539. * svc_recv again and we shouldn't be on the active
  540. * transport list
  541. */
  542. if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
  543. goto close_out;
  544. goto out;
  545. }
  546. dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
  547. ctxt, rdma_xprt, rqstp, ctxt->wc_status);
  548. atomic_inc(&rdma_stat_recv);
  549. /* Build up the XDR from the receive buffers. */
  550. rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
  551. /* Decode the RDMA header. */
  552. len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
  553. rqstp->rq_xprt_hlen = len;
  554. /* If the request is invalid, reply with an error */
  555. if (len < 0) {
  556. if (len == -ENOSYS)
  557. svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
  558. goto close_out;
  559. }
  560. /* Read read-list data. */
  561. ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
  562. if (ret > 0) {
  563. /* read-list posted, defer until data received from client. */
  564. goto defer;
  565. } else if (ret < 0) {
  566. /* Post of read-list failed, free context. */
  567. svc_rdma_put_context(ctxt, 1);
  568. return 0;
  569. }
  570. ret = rqstp->rq_arg.head[0].iov_len
  571. + rqstp->rq_arg.page_len
  572. + rqstp->rq_arg.tail[0].iov_len;
  573. svc_rdma_put_context(ctxt, 0);
  574. out:
  575. dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
  576. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
  577. ret, rqstp->rq_arg.len,
  578. rqstp->rq_arg.head[0].iov_base,
  579. rqstp->rq_arg.head[0].iov_len);
  580. rqstp->rq_prot = IPPROTO_MAX;
  581. svc_xprt_copy_addrs(rqstp, xprt);
  582. return ret;
  583. close_out:
  584. if (ctxt)
  585. svc_rdma_put_context(ctxt, 1);
  586. dprintk("svcrdma: transport %p is closing\n", xprt);
  587. /*
  588. * Set the close bit and enqueue it. svc_recv will see the
  589. * close bit and call svc_xprt_delete
  590. */
  591. set_bit(XPT_CLOSE, &xprt->xpt_flags);
  592. defer:
  593. return 0;
  594. }