svc_rdma_recvfrom.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. /*
  2. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  3. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  4. *
  5. * This software is available to you under a choice of one of two
  6. * licenses. You may choose to be licensed under the terms of the GNU
  7. * General Public License (GPL) Version 2, available from the file
  8. * COPYING in the main directory of this source tree, or the BSD-type
  9. * license below:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions
  13. * are met:
  14. *
  15. * Redistributions of source code must retain the above copyright
  16. * notice, this list of conditions and the following disclaimer.
  17. *
  18. * Redistributions in binary form must reproduce the above
  19. * copyright notice, this list of conditions and the following
  20. * disclaimer in the documentation and/or other materials provided
  21. * with the distribution.
  22. *
  23. * Neither the name of the Network Appliance, Inc. nor the names of
  24. * its contributors may be used to endorse or promote products
  25. * derived from this software without specific prior written
  26. * permission.
  27. *
  28. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39. *
  40. * Author: Tom Tucker <tom@opengridcomputing.com>
  41. */
  42. #include <linux/sunrpc/debug.h>
  43. #include <linux/sunrpc/rpc_rdma.h>
  44. #include <linux/spinlock.h>
  45. #include <asm/unaligned.h>
  46. #include <rdma/ib_verbs.h>
  47. #include <rdma/rdma_cm.h>
  48. #include <linux/sunrpc/svc_rdma.h>
  49. #define RPCDBG_FACILITY RPCDBG_SVCXPRT
  50. /*
  51. * Replace the pages in the rq_argpages array with the pages from the SGE in
  52. * the RDMA_RECV completion. The SGL should contain full pages up until the
  53. * last one.
  54. */
  55. static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
  56. struct svc_rdma_op_ctxt *ctxt,
  57. u32 byte_count)
  58. {
  59. struct page *page;
  60. u32 bc;
  61. int sge_no;
  62. /* Swap the page in the SGE with the page in argpages */
  63. page = ctxt->pages[0];
  64. put_page(rqstp->rq_pages[0]);
  65. rqstp->rq_pages[0] = page;
  66. /* Set up the XDR head */
  67. rqstp->rq_arg.head[0].iov_base = page_address(page);
  68. rqstp->rq_arg.head[0].iov_len =
  69. min_t(size_t, byte_count, ctxt->sge[0].length);
  70. rqstp->rq_arg.len = byte_count;
  71. rqstp->rq_arg.buflen = byte_count;
  72. /* Compute bytes past head in the SGL */
  73. bc = byte_count - rqstp->rq_arg.head[0].iov_len;
  74. /* If data remains, store it in the pagelist */
  75. rqstp->rq_arg.page_len = bc;
  76. rqstp->rq_arg.page_base = 0;
  77. rqstp->rq_arg.pages = &rqstp->rq_pages[1];
  78. sge_no = 1;
  79. while (bc && sge_no < ctxt->count) {
  80. page = ctxt->pages[sge_no];
  81. put_page(rqstp->rq_pages[sge_no]);
  82. rqstp->rq_pages[sge_no] = page;
  83. bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
  84. rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
  85. sge_no++;
  86. }
  87. rqstp->rq_respages = &rqstp->rq_pages[sge_no];
  88. rqstp->rq_next_page = rqstp->rq_respages + 1;
  89. /* We should never run out of SGE because the limit is defined to
  90. * support the max allowed RPC data length
  91. */
  92. BUG_ON(bc && (sge_no == ctxt->count));
  93. BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
  94. != byte_count);
  95. BUG_ON(rqstp->rq_arg.len != byte_count);
  96. /* If not all pages were used from the SGL, free the remaining ones */
  97. bc = sge_no;
  98. while (sge_no < ctxt->count) {
  99. page = ctxt->pages[sge_no++];
  100. put_page(page);
  101. }
  102. ctxt->count = bc;
  103. /* Set up tail */
  104. rqstp->rq_arg.tail[0].iov_base = NULL;
  105. rqstp->rq_arg.tail[0].iov_len = 0;
  106. }
  107. static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
  108. {
  109. if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
  110. RDMA_TRANSPORT_IWARP)
  111. return 1;
  112. else
  113. return min_t(int, sge_count, xprt->sc_max_sge);
  114. }
  115. typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
  116. struct svc_rqst *rqstp,
  117. struct svc_rdma_op_ctxt *head,
  118. int *page_no,
  119. u32 *page_offset,
  120. u32 rs_handle,
  121. u32 rs_length,
  122. u64 rs_offset,
  123. int last);
  124. /* Issue an RDMA_READ using the local lkey to map the data sink */
  125. static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
  126. struct svc_rqst *rqstp,
  127. struct svc_rdma_op_ctxt *head,
  128. int *page_no,
  129. u32 *page_offset,
  130. u32 rs_handle,
  131. u32 rs_length,
  132. u64 rs_offset,
  133. int last)
  134. {
  135. struct ib_send_wr read_wr;
  136. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  137. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  138. int ret, read, pno;
  139. u32 pg_off = *page_offset;
  140. u32 pg_no = *page_no;
  141. ctxt->direction = DMA_FROM_DEVICE;
  142. ctxt->read_hdr = head;
  143. pages_needed =
  144. min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
  145. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  146. for (pno = 0; pno < pages_needed; pno++) {
  147. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  148. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  149. head->arg.page_len += len;
  150. head->arg.len += len;
  151. if (!pg_off)
  152. head->count++;
  153. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  154. rqstp->rq_next_page = rqstp->rq_respages + 1;
  155. ctxt->sge[pno].addr =
  156. ib_dma_map_page(xprt->sc_cm_id->device,
  157. head->arg.pages[pg_no], pg_off,
  158. PAGE_SIZE - pg_off,
  159. DMA_FROM_DEVICE);
  160. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  161. ctxt->sge[pno].addr);
  162. if (ret)
  163. goto err;
  164. atomic_inc(&xprt->sc_dma_used);
  165. /* The lkey here is either a local dma lkey or a dma_mr lkey */
  166. ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
  167. ctxt->sge[pno].length = len;
  168. ctxt->count++;
  169. /* adjust offset and wrap to next page if needed */
  170. pg_off += len;
  171. if (pg_off == PAGE_SIZE) {
  172. pg_off = 0;
  173. pg_no++;
  174. }
  175. rs_length -= len;
  176. }
  177. if (last && rs_length == 0)
  178. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  179. else
  180. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  181. memset(&read_wr, 0, sizeof(read_wr));
  182. read_wr.wr_id = (unsigned long)ctxt;
  183. read_wr.opcode = IB_WR_RDMA_READ;
  184. ctxt->wr_op = read_wr.opcode;
  185. read_wr.send_flags = IB_SEND_SIGNALED;
  186. read_wr.wr.rdma.rkey = rs_handle;
  187. read_wr.wr.rdma.remote_addr = rs_offset;
  188. read_wr.sg_list = ctxt->sge;
  189. read_wr.num_sge = pages_needed;
  190. ret = svc_rdma_send(xprt, &read_wr);
  191. if (ret) {
  192. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  193. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  194. goto err;
  195. }
  196. /* return current location in page array */
  197. *page_no = pg_no;
  198. *page_offset = pg_off;
  199. ret = read;
  200. atomic_inc(&rdma_stat_read);
  201. return ret;
  202. err:
  203. svc_rdma_unmap_dma(ctxt);
  204. svc_rdma_put_context(ctxt, 0);
  205. return ret;
  206. }
  207. /* Issue an RDMA_READ using an FRMR to map the data sink */
  208. static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
  209. struct svc_rqst *rqstp,
  210. struct svc_rdma_op_ctxt *head,
  211. int *page_no,
  212. u32 *page_offset,
  213. u32 rs_handle,
  214. u32 rs_length,
  215. u64 rs_offset,
  216. int last)
  217. {
  218. struct ib_send_wr read_wr;
  219. struct ib_send_wr inv_wr;
  220. struct ib_send_wr fastreg_wr;
  221. u8 key;
  222. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  223. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  224. struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
  225. int ret, read, pno;
  226. u32 pg_off = *page_offset;
  227. u32 pg_no = *page_no;
  228. if (IS_ERR(frmr))
  229. return -ENOMEM;
  230. ctxt->direction = DMA_FROM_DEVICE;
  231. ctxt->frmr = frmr;
  232. pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
  233. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  234. frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
  235. frmr->direction = DMA_FROM_DEVICE;
  236. frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
  237. frmr->map_len = pages_needed << PAGE_SHIFT;
  238. frmr->page_list_len = pages_needed;
  239. for (pno = 0; pno < pages_needed; pno++) {
  240. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  241. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  242. head->arg.page_len += len;
  243. head->arg.len += len;
  244. if (!pg_off)
  245. head->count++;
  246. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  247. rqstp->rq_next_page = rqstp->rq_respages + 1;
  248. frmr->page_list->page_list[pno] =
  249. ib_dma_map_page(xprt->sc_cm_id->device,
  250. head->arg.pages[pg_no], 0,
  251. PAGE_SIZE, DMA_FROM_DEVICE);
  252. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  253. frmr->page_list->page_list[pno]);
  254. if (ret)
  255. goto err;
  256. atomic_inc(&xprt->sc_dma_used);
  257. /* adjust offset and wrap to next page if needed */
  258. pg_off += len;
  259. if (pg_off == PAGE_SIZE) {
  260. pg_off = 0;
  261. pg_no++;
  262. }
  263. rs_length -= len;
  264. }
  265. if (last && rs_length == 0)
  266. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  267. else
  268. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  269. /* Bump the key */
  270. key = (u8)(frmr->mr->lkey & 0x000000FF);
  271. ib_update_fast_reg_key(frmr->mr, ++key);
  272. ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
  273. ctxt->sge[0].lkey = frmr->mr->lkey;
  274. ctxt->sge[0].length = read;
  275. ctxt->count = 1;
  276. ctxt->read_hdr = head;
  277. /* Prepare FASTREG WR */
  278. memset(&fastreg_wr, 0, sizeof(fastreg_wr));
  279. fastreg_wr.opcode = IB_WR_FAST_REG_MR;
  280. fastreg_wr.send_flags = IB_SEND_SIGNALED;
  281. fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
  282. fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
  283. fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
  284. fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
  285. fastreg_wr.wr.fast_reg.length = frmr->map_len;
  286. fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
  287. fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
  288. fastreg_wr.next = &read_wr;
  289. /* Prepare RDMA_READ */
  290. memset(&read_wr, 0, sizeof(read_wr));
  291. read_wr.send_flags = IB_SEND_SIGNALED;
  292. read_wr.wr.rdma.rkey = rs_handle;
  293. read_wr.wr.rdma.remote_addr = rs_offset;
  294. read_wr.sg_list = ctxt->sge;
  295. read_wr.num_sge = 1;
  296. if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
  297. read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
  298. read_wr.wr_id = (unsigned long)ctxt;
  299. read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
  300. } else {
  301. read_wr.opcode = IB_WR_RDMA_READ;
  302. read_wr.next = &inv_wr;
  303. /* Prepare invalidate */
  304. memset(&inv_wr, 0, sizeof(inv_wr));
  305. inv_wr.wr_id = (unsigned long)ctxt;
  306. inv_wr.opcode = IB_WR_LOCAL_INV;
  307. inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
  308. inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
  309. }
  310. ctxt->wr_op = read_wr.opcode;
  311. /* Post the chain */
  312. ret = svc_rdma_send(xprt, &fastreg_wr);
  313. if (ret) {
  314. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  315. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  316. goto err;
  317. }
  318. /* return current location in page array */
  319. *page_no = pg_no;
  320. *page_offset = pg_off;
  321. ret = read;
  322. atomic_inc(&rdma_stat_read);
  323. return ret;
  324. err:
  325. svc_rdma_unmap_dma(ctxt);
  326. svc_rdma_put_context(ctxt, 0);
  327. svc_rdma_put_frmr(xprt, frmr);
  328. return ret;
  329. }
  330. static int rdma_read_chunks(struct svcxprt_rdma *xprt,
  331. struct rpcrdma_msg *rmsgp,
  332. struct svc_rqst *rqstp,
  333. struct svc_rdma_op_ctxt *head)
  334. {
  335. int page_no, ch_count, ret;
  336. struct rpcrdma_read_chunk *ch;
  337. u32 page_offset, byte_count;
  338. u64 rs_offset;
  339. rdma_reader_fn reader;
  340. /* If no read list is present, return 0 */
  341. ch = svc_rdma_get_read_chunk(rmsgp);
  342. if (!ch)
  343. return 0;
  344. svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
  345. if (ch_count > RPCSVC_MAXPAGES)
  346. return -EINVAL;
  347. /* The request is completed when the RDMA_READs complete. The
  348. * head context keeps all the pages that comprise the
  349. * request.
  350. */
  351. head->arg.head[0] = rqstp->rq_arg.head[0];
  352. head->arg.tail[0] = rqstp->rq_arg.tail[0];
  353. head->arg.pages = &head->pages[head->count];
  354. head->hdr_count = head->count;
  355. head->arg.page_base = 0;
  356. head->arg.page_len = 0;
  357. head->arg.len = rqstp->rq_arg.len;
  358. head->arg.buflen = rqstp->rq_arg.buflen;
  359. /* Use FRMR if supported */
  360. if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
  361. reader = rdma_read_chunk_frmr;
  362. else
  363. reader = rdma_read_chunk_lcl;
  364. page_no = 0; page_offset = 0;
  365. for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
  366. ch->rc_discrim != 0; ch++) {
  367. xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
  368. &rs_offset);
  369. byte_count = ntohl(ch->rc_target.rs_length);
  370. while (byte_count > 0) {
  371. ret = reader(xprt, rqstp, head,
  372. &page_no, &page_offset,
  373. ntohl(ch->rc_target.rs_handle),
  374. byte_count, rs_offset,
  375. ((ch+1)->rc_discrim == 0) /* last */
  376. );
  377. if (ret < 0)
  378. goto err;
  379. byte_count -= ret;
  380. rs_offset += ret;
  381. head->arg.buflen += ret;
  382. }
  383. }
  384. ret = 1;
  385. err:
  386. /* Detach arg pages. svc_recv will replenish them */
  387. for (page_no = 0;
  388. &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
  389. rqstp->rq_pages[page_no] = NULL;
  390. return ret;
  391. }
  392. static int rdma_read_complete(struct svc_rqst *rqstp,
  393. struct svc_rdma_op_ctxt *head)
  394. {
  395. int page_no;
  396. int ret;
  397. BUG_ON(!head);
  398. /* Copy RPC pages */
  399. for (page_no = 0; page_no < head->count; page_no++) {
  400. put_page(rqstp->rq_pages[page_no]);
  401. rqstp->rq_pages[page_no] = head->pages[page_no];
  402. }
  403. /* Point rq_arg.pages past header */
  404. rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
  405. rqstp->rq_arg.page_len = head->arg.page_len;
  406. rqstp->rq_arg.page_base = head->arg.page_base;
  407. /* rq_respages starts after the last arg page */
  408. rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
  409. rqstp->rq_next_page = rqstp->rq_respages + 1;
  410. /* Rebuild rq_arg head and tail. */
  411. rqstp->rq_arg.head[0] = head->arg.head[0];
  412. rqstp->rq_arg.tail[0] = head->arg.tail[0];
  413. rqstp->rq_arg.len = head->arg.len;
  414. rqstp->rq_arg.buflen = head->arg.buflen;
  415. /* Free the context */
  416. svc_rdma_put_context(head, 0);
  417. /* XXX: What should this be? */
  418. rqstp->rq_prot = IPPROTO_MAX;
  419. svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
  420. ret = rqstp->rq_arg.head[0].iov_len
  421. + rqstp->rq_arg.page_len
  422. + rqstp->rq_arg.tail[0].iov_len;
  423. dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
  424. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
  425. ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
  426. rqstp->rq_arg.head[0].iov_len);
  427. return ret;
  428. }
  429. /*
  430. * Set up the rqstp thread context to point to the RQ buffer. If
  431. * necessary, pull additional data from the client with an RDMA_READ
  432. * request.
  433. */
  434. int svc_rdma_recvfrom(struct svc_rqst *rqstp)
  435. {
  436. struct svc_xprt *xprt = rqstp->rq_xprt;
  437. struct svcxprt_rdma *rdma_xprt =
  438. container_of(xprt, struct svcxprt_rdma, sc_xprt);
  439. struct svc_rdma_op_ctxt *ctxt = NULL;
  440. struct rpcrdma_msg *rmsgp;
  441. int ret = 0;
  442. int len;
  443. dprintk("svcrdma: rqstp=%p\n", rqstp);
  444. spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
  445. if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
  446. ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
  447. struct svc_rdma_op_ctxt,
  448. dto_q);
  449. list_del_init(&ctxt->dto_q);
  450. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  451. return rdma_read_complete(rqstp, ctxt);
  452. } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
  453. ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
  454. struct svc_rdma_op_ctxt,
  455. dto_q);
  456. list_del_init(&ctxt->dto_q);
  457. } else {
  458. atomic_inc(&rdma_stat_rq_starve);
  459. clear_bit(XPT_DATA, &xprt->xpt_flags);
  460. ctxt = NULL;
  461. }
  462. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  463. if (!ctxt) {
  464. /* This is the EAGAIN path. The svc_recv routine will
  465. * return -EAGAIN, the nfsd thread will go to call into
  466. * svc_recv again and we shouldn't be on the active
  467. * transport list
  468. */
  469. if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
  470. goto close_out;
  471. goto out;
  472. }
  473. dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
  474. ctxt, rdma_xprt, rqstp, ctxt->wc_status);
  475. BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
  476. atomic_inc(&rdma_stat_recv);
  477. /* Build up the XDR from the receive buffers. */
  478. rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
  479. /* Decode the RDMA header. */
  480. len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
  481. rqstp->rq_xprt_hlen = len;
  482. /* If the request is invalid, reply with an error */
  483. if (len < 0) {
  484. if (len == -ENOSYS)
  485. svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
  486. goto close_out;
  487. }
  488. /* Read read-list data. */
  489. ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
  490. if (ret > 0) {
  491. /* read-list posted, defer until data received from client. */
  492. goto defer;
  493. } else if (ret < 0) {
  494. /* Post of read-list failed, free context. */
  495. svc_rdma_put_context(ctxt, 1);
  496. return 0;
  497. }
  498. ret = rqstp->rq_arg.head[0].iov_len
  499. + rqstp->rq_arg.page_len
  500. + rqstp->rq_arg.tail[0].iov_len;
  501. svc_rdma_put_context(ctxt, 0);
  502. out:
  503. dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
  504. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
  505. ret, rqstp->rq_arg.len,
  506. rqstp->rq_arg.head[0].iov_base,
  507. rqstp->rq_arg.head[0].iov_len);
  508. rqstp->rq_prot = IPPROTO_MAX;
  509. svc_xprt_copy_addrs(rqstp, xprt);
  510. return ret;
  511. close_out:
  512. if (ctxt)
  513. svc_rdma_put_context(ctxt, 1);
  514. dprintk("svcrdma: transport %p is closing\n", xprt);
  515. /*
  516. * Set the close bit and enqueue it. svc_recv will see the
  517. * close bit and call svc_xprt_delete
  518. */
  519. set_bit(XPT_CLOSE, &xprt->xpt_flags);
  520. defer:
  521. return 0;
  522. }