svc_rdma_recvfrom.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. /*
  2. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  3. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  4. *
  5. * This software is available to you under a choice of one of two
  6. * licenses. You may choose to be licensed under the terms of the GNU
  7. * General Public License (GPL) Version 2, available from the file
  8. * COPYING in the main directory of this source tree, or the BSD-type
  9. * license below:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions
  13. * are met:
  14. *
  15. * Redistributions of source code must retain the above copyright
  16. * notice, this list of conditions and the following disclaimer.
  17. *
  18. * Redistributions in binary form must reproduce the above
  19. * copyright notice, this list of conditions and the following
  20. * disclaimer in the documentation and/or other materials provided
  21. * with the distribution.
  22. *
  23. * Neither the name of the Network Appliance, Inc. nor the names of
  24. * its contributors may be used to endorse or promote products
  25. * derived from this software without specific prior written
  26. * permission.
  27. *
  28. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39. *
  40. * Author: Tom Tucker <tom@opengridcomputing.com>
  41. */
  42. #include <linux/sunrpc/debug.h>
  43. #include <linux/sunrpc/rpc_rdma.h>
  44. #include <linux/spinlock.h>
  45. #include <linux/highmem.h>
  46. #include <asm/unaligned.h>
  47. #include <rdma/ib_verbs.h>
  48. #include <rdma/rdma_cm.h>
  49. #include <linux/sunrpc/svc_rdma.h>
  50. #define RPCDBG_FACILITY RPCDBG_SVCXPRT
  51. /*
  52. * Replace the pages in the rq_argpages array with the pages from the SGE in
  53. * the RDMA_RECV completion. The SGL should contain full pages up until the
  54. * last one.
  55. */
  56. static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
  57. struct svc_rdma_op_ctxt *ctxt,
  58. u32 byte_count)
  59. {
  60. struct page *page;
  61. u32 bc;
  62. int sge_no;
  63. /* Swap the page in the SGE with the page in argpages */
  64. page = ctxt->pages[0];
  65. put_page(rqstp->rq_pages[0]);
  66. rqstp->rq_pages[0] = page;
  67. /* Set up the XDR head */
  68. rqstp->rq_arg.head[0].iov_base = page_address(page);
  69. rqstp->rq_arg.head[0].iov_len =
  70. min_t(size_t, byte_count, ctxt->sge[0].length);
  71. rqstp->rq_arg.len = byte_count;
  72. rqstp->rq_arg.buflen = byte_count;
  73. /* Compute bytes past head in the SGL */
  74. bc = byte_count - rqstp->rq_arg.head[0].iov_len;
  75. /* If data remains, store it in the pagelist */
  76. rqstp->rq_arg.page_len = bc;
  77. rqstp->rq_arg.page_base = 0;
  78. rqstp->rq_arg.pages = &rqstp->rq_pages[1];
  79. sge_no = 1;
  80. while (bc && sge_no < ctxt->count) {
  81. page = ctxt->pages[sge_no];
  82. put_page(rqstp->rq_pages[sge_no]);
  83. rqstp->rq_pages[sge_no] = page;
  84. bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
  85. rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
  86. sge_no++;
  87. }
  88. rqstp->rq_respages = &rqstp->rq_pages[sge_no];
  89. rqstp->rq_next_page = rqstp->rq_respages + 1;
  90. /* We should never run out of SGE because the limit is defined to
  91. * support the max allowed RPC data length
  92. */
  93. BUG_ON(bc && (sge_no == ctxt->count));
  94. BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
  95. != byte_count);
  96. BUG_ON(rqstp->rq_arg.len != byte_count);
  97. /* If not all pages were used from the SGL, free the remaining ones */
  98. bc = sge_no;
  99. while (sge_no < ctxt->count) {
  100. page = ctxt->pages[sge_no++];
  101. put_page(page);
  102. }
  103. ctxt->count = bc;
  104. /* Set up tail */
  105. rqstp->rq_arg.tail[0].iov_base = NULL;
  106. rqstp->rq_arg.tail[0].iov_len = 0;
  107. }
  108. static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
  109. {
  110. if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
  111. RDMA_TRANSPORT_IWARP)
  112. return 1;
  113. else
  114. return min_t(int, sge_count, xprt->sc_max_sge);
  115. }
  116. typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
  117. struct svc_rqst *rqstp,
  118. struct svc_rdma_op_ctxt *head,
  119. int *page_no,
  120. u32 *page_offset,
  121. u32 rs_handle,
  122. u32 rs_length,
  123. u64 rs_offset,
  124. int last);
  125. /* Issue an RDMA_READ using the local lkey to map the data sink */
  126. static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
  127. struct svc_rqst *rqstp,
  128. struct svc_rdma_op_ctxt *head,
  129. int *page_no,
  130. u32 *page_offset,
  131. u32 rs_handle,
  132. u32 rs_length,
  133. u64 rs_offset,
  134. int last)
  135. {
  136. struct ib_send_wr read_wr;
  137. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  138. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  139. int ret, read, pno;
  140. u32 pg_off = *page_offset;
  141. u32 pg_no = *page_no;
  142. ctxt->direction = DMA_FROM_DEVICE;
  143. ctxt->read_hdr = head;
  144. pages_needed =
  145. min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
  146. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  147. for (pno = 0; pno < pages_needed; pno++) {
  148. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  149. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  150. head->arg.page_len += len;
  151. head->arg.len += len;
  152. if (!pg_off)
  153. head->count++;
  154. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  155. rqstp->rq_next_page = rqstp->rq_respages + 1;
  156. ctxt->sge[pno].addr =
  157. ib_dma_map_page(xprt->sc_cm_id->device,
  158. head->arg.pages[pg_no], pg_off,
  159. PAGE_SIZE - pg_off,
  160. DMA_FROM_DEVICE);
  161. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  162. ctxt->sge[pno].addr);
  163. if (ret)
  164. goto err;
  165. atomic_inc(&xprt->sc_dma_used);
  166. /* The lkey here is either a local dma lkey or a dma_mr lkey */
  167. ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
  168. ctxt->sge[pno].length = len;
  169. ctxt->count++;
  170. /* adjust offset and wrap to next page if needed */
  171. pg_off += len;
  172. if (pg_off == PAGE_SIZE) {
  173. pg_off = 0;
  174. pg_no++;
  175. }
  176. rs_length -= len;
  177. }
  178. if (last && rs_length == 0)
  179. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  180. else
  181. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  182. memset(&read_wr, 0, sizeof(read_wr));
  183. read_wr.wr_id = (unsigned long)ctxt;
  184. read_wr.opcode = IB_WR_RDMA_READ;
  185. ctxt->wr_op = read_wr.opcode;
  186. read_wr.send_flags = IB_SEND_SIGNALED;
  187. read_wr.wr.rdma.rkey = rs_handle;
  188. read_wr.wr.rdma.remote_addr = rs_offset;
  189. read_wr.sg_list = ctxt->sge;
  190. read_wr.num_sge = pages_needed;
  191. ret = svc_rdma_send(xprt, &read_wr);
  192. if (ret) {
  193. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  194. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  195. goto err;
  196. }
  197. /* return current location in page array */
  198. *page_no = pg_no;
  199. *page_offset = pg_off;
  200. ret = read;
  201. atomic_inc(&rdma_stat_read);
  202. return ret;
  203. err:
  204. svc_rdma_unmap_dma(ctxt);
  205. svc_rdma_put_context(ctxt, 0);
  206. return ret;
  207. }
  208. /* Issue an RDMA_READ using an FRMR to map the data sink */
  209. static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
  210. struct svc_rqst *rqstp,
  211. struct svc_rdma_op_ctxt *head,
  212. int *page_no,
  213. u32 *page_offset,
  214. u32 rs_handle,
  215. u32 rs_length,
  216. u64 rs_offset,
  217. int last)
  218. {
  219. struct ib_send_wr read_wr;
  220. struct ib_send_wr inv_wr;
  221. struct ib_send_wr fastreg_wr;
  222. u8 key;
  223. int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
  224. struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
  225. struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
  226. int ret, read, pno;
  227. u32 pg_off = *page_offset;
  228. u32 pg_no = *page_no;
  229. if (IS_ERR(frmr))
  230. return -ENOMEM;
  231. ctxt->direction = DMA_FROM_DEVICE;
  232. ctxt->frmr = frmr;
  233. pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
  234. read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
  235. frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
  236. frmr->direction = DMA_FROM_DEVICE;
  237. frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
  238. frmr->map_len = pages_needed << PAGE_SHIFT;
  239. frmr->page_list_len = pages_needed;
  240. for (pno = 0; pno < pages_needed; pno++) {
  241. int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
  242. head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
  243. head->arg.page_len += len;
  244. head->arg.len += len;
  245. if (!pg_off)
  246. head->count++;
  247. rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
  248. rqstp->rq_next_page = rqstp->rq_respages + 1;
  249. frmr->page_list->page_list[pno] =
  250. ib_dma_map_page(xprt->sc_cm_id->device,
  251. head->arg.pages[pg_no], 0,
  252. PAGE_SIZE, DMA_FROM_DEVICE);
  253. ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
  254. frmr->page_list->page_list[pno]);
  255. if (ret)
  256. goto err;
  257. atomic_inc(&xprt->sc_dma_used);
  258. /* adjust offset and wrap to next page if needed */
  259. pg_off += len;
  260. if (pg_off == PAGE_SIZE) {
  261. pg_off = 0;
  262. pg_no++;
  263. }
  264. rs_length -= len;
  265. }
  266. if (last && rs_length == 0)
  267. set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  268. else
  269. clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
  270. /* Bump the key */
  271. key = (u8)(frmr->mr->lkey & 0x000000FF);
  272. ib_update_fast_reg_key(frmr->mr, ++key);
  273. ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
  274. ctxt->sge[0].lkey = frmr->mr->lkey;
  275. ctxt->sge[0].length = read;
  276. ctxt->count = 1;
  277. ctxt->read_hdr = head;
  278. /* Prepare FASTREG WR */
  279. memset(&fastreg_wr, 0, sizeof(fastreg_wr));
  280. fastreg_wr.opcode = IB_WR_FAST_REG_MR;
  281. fastreg_wr.send_flags = IB_SEND_SIGNALED;
  282. fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
  283. fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
  284. fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
  285. fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
  286. fastreg_wr.wr.fast_reg.length = frmr->map_len;
  287. fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
  288. fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
  289. fastreg_wr.next = &read_wr;
  290. /* Prepare RDMA_READ */
  291. memset(&read_wr, 0, sizeof(read_wr));
  292. read_wr.send_flags = IB_SEND_SIGNALED;
  293. read_wr.wr.rdma.rkey = rs_handle;
  294. read_wr.wr.rdma.remote_addr = rs_offset;
  295. read_wr.sg_list = ctxt->sge;
  296. read_wr.num_sge = 1;
  297. if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
  298. read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
  299. read_wr.wr_id = (unsigned long)ctxt;
  300. read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
  301. } else {
  302. read_wr.opcode = IB_WR_RDMA_READ;
  303. read_wr.next = &inv_wr;
  304. /* Prepare invalidate */
  305. memset(&inv_wr, 0, sizeof(inv_wr));
  306. inv_wr.wr_id = (unsigned long)ctxt;
  307. inv_wr.opcode = IB_WR_LOCAL_INV;
  308. inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
  309. inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
  310. }
  311. ctxt->wr_op = read_wr.opcode;
  312. /* Post the chain */
  313. ret = svc_rdma_send(xprt, &fastreg_wr);
  314. if (ret) {
  315. pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
  316. set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
  317. goto err;
  318. }
  319. /* return current location in page array */
  320. *page_no = pg_no;
  321. *page_offset = pg_off;
  322. ret = read;
  323. atomic_inc(&rdma_stat_read);
  324. return ret;
  325. err:
  326. svc_rdma_unmap_dma(ctxt);
  327. svc_rdma_put_context(ctxt, 0);
  328. svc_rdma_put_frmr(xprt, frmr);
  329. return ret;
  330. }
  331. static int rdma_read_chunks(struct svcxprt_rdma *xprt,
  332. struct rpcrdma_msg *rmsgp,
  333. struct svc_rqst *rqstp,
  334. struct svc_rdma_op_ctxt *head)
  335. {
  336. int page_no, ch_count, ret;
  337. struct rpcrdma_read_chunk *ch;
  338. u32 page_offset, byte_count;
  339. u64 rs_offset;
  340. rdma_reader_fn reader;
  341. /* If no read list is present, return 0 */
  342. ch = svc_rdma_get_read_chunk(rmsgp);
  343. if (!ch)
  344. return 0;
  345. svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
  346. if (ch_count > RPCSVC_MAXPAGES)
  347. return -EINVAL;
  348. /* The request is completed when the RDMA_READs complete. The
  349. * head context keeps all the pages that comprise the
  350. * request.
  351. */
  352. head->arg.head[0] = rqstp->rq_arg.head[0];
  353. head->arg.tail[0] = rqstp->rq_arg.tail[0];
  354. head->arg.pages = &head->pages[head->count];
  355. head->hdr_count = head->count;
  356. head->arg.page_base = 0;
  357. head->arg.page_len = 0;
  358. head->arg.len = rqstp->rq_arg.len;
  359. head->arg.buflen = rqstp->rq_arg.buflen;
  360. /* Use FRMR if supported */
  361. if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
  362. reader = rdma_read_chunk_frmr;
  363. else
  364. reader = rdma_read_chunk_lcl;
  365. page_no = 0; page_offset = 0;
  366. for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
  367. ch->rc_discrim != 0; ch++) {
  368. xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
  369. &rs_offset);
  370. byte_count = ntohl(ch->rc_target.rs_length);
  371. while (byte_count > 0) {
  372. ret = reader(xprt, rqstp, head,
  373. &page_no, &page_offset,
  374. ntohl(ch->rc_target.rs_handle),
  375. byte_count, rs_offset,
  376. ((ch+1)->rc_discrim == 0) /* last */
  377. );
  378. if (ret < 0)
  379. goto err;
  380. byte_count -= ret;
  381. rs_offset += ret;
  382. head->arg.buflen += ret;
  383. }
  384. }
  385. ret = 1;
  386. err:
  387. /* Detach arg pages. svc_recv will replenish them */
  388. for (page_no = 0;
  389. &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
  390. rqstp->rq_pages[page_no] = NULL;
  391. return ret;
  392. }
  393. /*
  394. * To avoid a separate RDMA READ just for a handful of zero bytes,
  395. * RFC 5666 section 3.7 allows the client to omit the XDR zero pad
  396. * in chunk lists.
  397. */
  398. static void
  399. rdma_fix_xdr_pad(struct xdr_buf *buf)
  400. {
  401. unsigned int page_len = buf->page_len;
  402. unsigned int size = (XDR_QUADLEN(page_len) << 2) - page_len;
  403. unsigned int offset, pg_no;
  404. char *p;
  405. if (size == 0)
  406. return;
  407. pg_no = page_len >> PAGE_SHIFT;
  408. offset = page_len & ~PAGE_MASK;
  409. p = page_address(buf->pages[pg_no]);
  410. memset(p + offset, 0, size);
  411. buf->page_len += size;
  412. buf->buflen += size;
  413. buf->len += size;
  414. }
  415. static int rdma_read_complete(struct svc_rqst *rqstp,
  416. struct svc_rdma_op_ctxt *head)
  417. {
  418. int page_no;
  419. int ret;
  420. BUG_ON(!head);
  421. /* Copy RPC pages */
  422. for (page_no = 0; page_no < head->count; page_no++) {
  423. put_page(rqstp->rq_pages[page_no]);
  424. rqstp->rq_pages[page_no] = head->pages[page_no];
  425. }
  426. /* Point rq_arg.pages past header */
  427. rdma_fix_xdr_pad(&head->arg);
  428. rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
  429. rqstp->rq_arg.page_len = head->arg.page_len;
  430. rqstp->rq_arg.page_base = head->arg.page_base;
  431. /* rq_respages starts after the last arg page */
  432. rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
  433. rqstp->rq_next_page = rqstp->rq_respages + 1;
  434. /* Rebuild rq_arg head and tail. */
  435. rqstp->rq_arg.head[0] = head->arg.head[0];
  436. rqstp->rq_arg.tail[0] = head->arg.tail[0];
  437. rqstp->rq_arg.len = head->arg.len;
  438. rqstp->rq_arg.buflen = head->arg.buflen;
  439. /* Free the context */
  440. svc_rdma_put_context(head, 0);
  441. /* XXX: What should this be? */
  442. rqstp->rq_prot = IPPROTO_MAX;
  443. svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
  444. ret = rqstp->rq_arg.head[0].iov_len
  445. + rqstp->rq_arg.page_len
  446. + rqstp->rq_arg.tail[0].iov_len;
  447. dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
  448. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
  449. ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
  450. rqstp->rq_arg.head[0].iov_len);
  451. return ret;
  452. }
  453. /*
  454. * Set up the rqstp thread context to point to the RQ buffer. If
  455. * necessary, pull additional data from the client with an RDMA_READ
  456. * request.
  457. */
  458. int svc_rdma_recvfrom(struct svc_rqst *rqstp)
  459. {
  460. struct svc_xprt *xprt = rqstp->rq_xprt;
  461. struct svcxprt_rdma *rdma_xprt =
  462. container_of(xprt, struct svcxprt_rdma, sc_xprt);
  463. struct svc_rdma_op_ctxt *ctxt = NULL;
  464. struct rpcrdma_msg *rmsgp;
  465. int ret = 0;
  466. int len;
  467. dprintk("svcrdma: rqstp=%p\n", rqstp);
  468. spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
  469. if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
  470. ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
  471. struct svc_rdma_op_ctxt,
  472. dto_q);
  473. list_del_init(&ctxt->dto_q);
  474. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  475. return rdma_read_complete(rqstp, ctxt);
  476. } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
  477. ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
  478. struct svc_rdma_op_ctxt,
  479. dto_q);
  480. list_del_init(&ctxt->dto_q);
  481. } else {
  482. atomic_inc(&rdma_stat_rq_starve);
  483. clear_bit(XPT_DATA, &xprt->xpt_flags);
  484. ctxt = NULL;
  485. }
  486. spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
  487. if (!ctxt) {
  488. /* This is the EAGAIN path. The svc_recv routine will
  489. * return -EAGAIN, the nfsd thread will go to call into
  490. * svc_recv again and we shouldn't be on the active
  491. * transport list
  492. */
  493. if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
  494. goto close_out;
  495. goto out;
  496. }
  497. dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
  498. ctxt, rdma_xprt, rqstp, ctxt->wc_status);
  499. BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
  500. atomic_inc(&rdma_stat_recv);
  501. /* Build up the XDR from the receive buffers. */
  502. rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
  503. /* Decode the RDMA header. */
  504. len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
  505. rqstp->rq_xprt_hlen = len;
  506. /* If the request is invalid, reply with an error */
  507. if (len < 0) {
  508. if (len == -ENOSYS)
  509. svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
  510. goto close_out;
  511. }
  512. /* Read read-list data. */
  513. ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
  514. if (ret > 0) {
  515. /* read-list posted, defer until data received from client. */
  516. goto defer;
  517. } else if (ret < 0) {
  518. /* Post of read-list failed, free context. */
  519. svc_rdma_put_context(ctxt, 1);
  520. return 0;
  521. }
  522. ret = rqstp->rq_arg.head[0].iov_len
  523. + rqstp->rq_arg.page_len
  524. + rqstp->rq_arg.tail[0].iov_len;
  525. svc_rdma_put_context(ctxt, 0);
  526. out:
  527. dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
  528. "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
  529. ret, rqstp->rq_arg.len,
  530. rqstp->rq_arg.head[0].iov_base,
  531. rqstp->rq_arg.head[0].iov_len);
  532. rqstp->rq_prot = IPPROTO_MAX;
  533. svc_xprt_copy_addrs(rqstp, xprt);
  534. return ret;
  535. close_out:
  536. if (ctxt)
  537. svc_rdma_put_context(ctxt, 1);
  538. dprintk("svcrdma: transport %p is closing\n", xprt);
  539. /*
  540. * Set the close bit and enqueue it. svc_recv will see the
  541. * close bit and call svc_xprt_delete
  542. */
  543. set_bit(XPT_CLOSE, &xprt->xpt_flags);
  544. defer:
  545. return 0;
  546. }