rpc_rdma.c 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449
  1. /*
  2. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  3. *
  4. * This software is available to you under a choice of one of two
  5. * licenses. You may choose to be licensed under the terms of the GNU
  6. * General Public License (GPL) Version 2, available from the file
  7. * COPYING in the main directory of this source tree, or the BSD-type
  8. * license below:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions
  12. * are met:
  13. *
  14. * Redistributions of source code must retain the above copyright
  15. * notice, this list of conditions and the following disclaimer.
  16. *
  17. * Redistributions in binary form must reproduce the above
  18. * copyright notice, this list of conditions and the following
  19. * disclaimer in the documentation and/or other materials provided
  20. * with the distribution.
  21. *
  22. * Neither the name of the Network Appliance, Inc. nor the names of
  23. * its contributors may be used to endorse or promote products
  24. * derived from this software without specific prior written
  25. * permission.
  26. *
  27. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  28. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  29. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  30. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  31. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  32. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  33. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  34. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  35. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  36. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  37. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  38. */
  39. /*
  40. * rpc_rdma.c
  41. *
  42. * This file contains the guts of the RPC RDMA protocol, and
  43. * does marshaling/unmarshaling, etc. It is also where interfacing
  44. * to the Linux RPC framework lives.
  45. */
  46. #include "xprt_rdma.h"
  47. #include <linux/highmem.h>
  48. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  49. # define RPCDBG_FACILITY RPCDBG_TRANS
  50. #endif
  51. static const char transfertypes[][12] = {
  52. "inline", /* no chunks */
  53. "read list", /* some argument via rdma read */
  54. "*read list", /* entire request via rdma read */
  55. "write list", /* some result via rdma write */
  56. "reply chunk" /* entire reply via rdma write */
  57. };
  58. /* Returns size of largest RPC-over-RDMA header in a Call message
  59. *
  60. * The largest Call header contains a full-size Read list and a
  61. * minimal Reply chunk.
  62. */
  63. static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
  64. {
  65. unsigned int size;
  66. /* Fixed header fields and list discriminators */
  67. size = RPCRDMA_HDRLEN_MIN;
  68. /* Maximum Read list size */
  69. maxsegs += 2; /* segment for head and tail buffers */
  70. size = maxsegs * sizeof(struct rpcrdma_read_chunk);
  71. /* Minimal Read chunk size */
  72. size += sizeof(__be32); /* segment count */
  73. size += sizeof(struct rpcrdma_segment);
  74. size += sizeof(__be32); /* list discriminator */
  75. dprintk("RPC: %s: max call header size = %u\n",
  76. __func__, size);
  77. return size;
  78. }
  79. /* Returns size of largest RPC-over-RDMA header in a Reply message
  80. *
  81. * There is only one Write list or one Reply chunk per Reply
  82. * message. The larger list is the Write list.
  83. */
  84. static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
  85. {
  86. unsigned int size;
  87. /* Fixed header fields and list discriminators */
  88. size = RPCRDMA_HDRLEN_MIN;
  89. /* Maximum Write list size */
  90. maxsegs += 2; /* segment for head and tail buffers */
  91. size = sizeof(__be32); /* segment count */
  92. size += maxsegs * sizeof(struct rpcrdma_segment);
  93. size += sizeof(__be32); /* list discriminator */
  94. dprintk("RPC: %s: max reply header size = %u\n",
  95. __func__, size);
  96. return size;
  97. }
  98. void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
  99. {
  100. struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
  101. struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  102. unsigned int maxsegs = ia->ri_max_segs;
  103. ia->ri_max_inline_write = cdata->inline_wsize -
  104. rpcrdma_max_call_header_size(maxsegs);
  105. ia->ri_max_inline_read = cdata->inline_rsize -
  106. rpcrdma_max_reply_header_size(maxsegs);
  107. }
  108. /* The client can send a request inline as long as the RPCRDMA header
  109. * plus the RPC call fit under the transport's inline limit. If the
  110. * combined call message size exceeds that limit, the client must use
  111. * a Read chunk for this operation.
  112. *
  113. * A Read chunk is also required if sending the RPC call inline would
  114. * exceed this device's max_sge limit.
  115. */
  116. static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
  117. struct rpc_rqst *rqst)
  118. {
  119. struct xdr_buf *xdr = &rqst->rq_snd_buf;
  120. unsigned int count, remaining, offset;
  121. if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
  122. return false;
  123. if (xdr->page_len) {
  124. remaining = xdr->page_len;
  125. offset = offset_in_page(xdr->page_base);
  126. count = 0;
  127. while (remaining) {
  128. remaining -= min_t(unsigned int,
  129. PAGE_SIZE - offset, remaining);
  130. offset = 0;
  131. if (++count > r_xprt->rx_ia.ri_max_send_sges)
  132. return false;
  133. }
  134. }
  135. return true;
  136. }
  137. /* The client can't know how large the actual reply will be. Thus it
  138. * plans for the largest possible reply for that particular ULP
  139. * operation. If the maximum combined reply message size exceeds that
  140. * limit, the client must provide a write list or a reply chunk for
  141. * this request.
  142. */
  143. static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
  144. struct rpc_rqst *rqst)
  145. {
  146. struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  147. return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
  148. }
  149. /* Split @vec on page boundaries into SGEs. FMR registers pages, not
  150. * a byte range. Other modes coalesce these SGEs into a single MR
  151. * when they can.
  152. *
  153. * Returns pointer to next available SGE, and bumps the total number
  154. * of SGEs consumed.
  155. */
  156. static struct rpcrdma_mr_seg *
  157. rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
  158. unsigned int *n)
  159. {
  160. u32 remaining, page_offset;
  161. char *base;
  162. base = vec->iov_base;
  163. page_offset = offset_in_page(base);
  164. remaining = vec->iov_len;
  165. while (remaining) {
  166. seg->mr_page = NULL;
  167. seg->mr_offset = base;
  168. seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
  169. remaining -= seg->mr_len;
  170. base += seg->mr_len;
  171. ++seg;
  172. ++(*n);
  173. page_offset = 0;
  174. }
  175. return seg;
  176. }
  177. /* Convert @xdrbuf into SGEs no larger than a page each. As they
  178. * are registered, these SGEs are then coalesced into RDMA segments
  179. * when the selected memreg mode supports it.
  180. *
  181. * Returns positive number of SGEs consumed, or a negative errno.
  182. */
  183. static int
  184. rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
  185. unsigned int pos, enum rpcrdma_chunktype type,
  186. struct rpcrdma_mr_seg *seg)
  187. {
  188. unsigned long page_base;
  189. unsigned int len, n;
  190. struct page **ppages;
  191. n = 0;
  192. if (pos == 0)
  193. seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
  194. len = xdrbuf->page_len;
  195. ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
  196. page_base = offset_in_page(xdrbuf->page_base);
  197. while (len) {
  198. if (unlikely(!*ppages)) {
  199. /* XXX: Certain upper layer operations do
  200. * not provide receive buffer pages.
  201. */
  202. *ppages = alloc_page(GFP_ATOMIC);
  203. if (!*ppages)
  204. return -EAGAIN;
  205. }
  206. seg->mr_page = *ppages;
  207. seg->mr_offset = (char *)page_base;
  208. seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
  209. len -= seg->mr_len;
  210. ++ppages;
  211. ++seg;
  212. ++n;
  213. page_base = 0;
  214. }
  215. /* When encoding a Read chunk, the tail iovec contains an
  216. * XDR pad and may be omitted.
  217. */
  218. if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
  219. goto out;
  220. /* When encoding a Write chunk, some servers need to see an
  221. * extra segment for non-XDR-aligned Write chunks. The upper
  222. * layer provides space in the tail iovec that may be used
  223. * for this purpose.
  224. */
  225. if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
  226. goto out;
  227. if (xdrbuf->tail[0].iov_len)
  228. seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
  229. out:
  230. if (unlikely(n > RPCRDMA_MAX_SEGS))
  231. return -EIO;
  232. return n;
  233. }
  234. static inline int
  235. encode_item_present(struct xdr_stream *xdr)
  236. {
  237. __be32 *p;
  238. p = xdr_reserve_space(xdr, sizeof(*p));
  239. if (unlikely(!p))
  240. return -EMSGSIZE;
  241. *p = xdr_one;
  242. return 0;
  243. }
  244. static inline int
  245. encode_item_not_present(struct xdr_stream *xdr)
  246. {
  247. __be32 *p;
  248. p = xdr_reserve_space(xdr, sizeof(*p));
  249. if (unlikely(!p))
  250. return -EMSGSIZE;
  251. *p = xdr_zero;
  252. return 0;
  253. }
  254. static void
  255. xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
  256. {
  257. *iptr++ = cpu_to_be32(mw->mw_handle);
  258. *iptr++ = cpu_to_be32(mw->mw_length);
  259. xdr_encode_hyper(iptr, mw->mw_offset);
  260. }
  261. static int
  262. encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw)
  263. {
  264. __be32 *p;
  265. p = xdr_reserve_space(xdr, 4 * sizeof(*p));
  266. if (unlikely(!p))
  267. return -EMSGSIZE;
  268. xdr_encode_rdma_segment(p, mw);
  269. return 0;
  270. }
  271. static int
  272. encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw,
  273. u32 position)
  274. {
  275. __be32 *p;
  276. p = xdr_reserve_space(xdr, 6 * sizeof(*p));
  277. if (unlikely(!p))
  278. return -EMSGSIZE;
  279. *p++ = xdr_one; /* Item present */
  280. *p++ = cpu_to_be32(position);
  281. xdr_encode_rdma_segment(p, mw);
  282. return 0;
  283. }
  284. /* Register and XDR encode the Read list. Supports encoding a list of read
  285. * segments that belong to a single read chunk.
  286. *
  287. * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  288. *
  289. * Read chunklist (a linked list):
  290. * N elements, position P (same P for all chunks of same arg!):
  291. * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
  292. *
  293. * Returns zero on success, or a negative errno if a failure occurred.
  294. * @xdr is advanced to the next position in the stream.
  295. *
  296. * Only a single @pos value is currently supported.
  297. */
  298. static noinline int
  299. rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
  300. struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
  301. {
  302. struct xdr_stream *xdr = &req->rl_stream;
  303. struct rpcrdma_mr_seg *seg;
  304. struct rpcrdma_mw *mw;
  305. unsigned int pos;
  306. int nsegs;
  307. pos = rqst->rq_snd_buf.head[0].iov_len;
  308. if (rtype == rpcrdma_areadch)
  309. pos = 0;
  310. seg = req->rl_segments;
  311. nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
  312. rtype, seg);
  313. if (nsegs < 0)
  314. return nsegs;
  315. do {
  316. seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
  317. false, &mw);
  318. if (IS_ERR(seg))
  319. return PTR_ERR(seg);
  320. rpcrdma_push_mw(mw, &req->rl_registered);
  321. if (encode_read_segment(xdr, mw, pos) < 0)
  322. return -EMSGSIZE;
  323. dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
  324. rqst->rq_task->tk_pid, __func__, pos,
  325. mw->mw_length, (unsigned long long)mw->mw_offset,
  326. mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
  327. r_xprt->rx_stats.read_chunk_count++;
  328. nsegs -= mw->mw_nents;
  329. } while (nsegs);
  330. return 0;
  331. }
  332. /* Register and XDR encode the Write list. Supports encoding a list
  333. * containing one array of plain segments that belong to a single
  334. * write chunk.
  335. *
  336. * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  337. *
  338. * Write chunklist (a list of (one) counted array):
  339. * N elements:
  340. * 1 - N - HLOO - HLOO - ... - HLOO - 0
  341. *
  342. * Returns zero on success, or a negative errno if a failure occurred.
  343. * @xdr is advanced to the next position in the stream.
  344. *
  345. * Only a single Write chunk is currently supported.
  346. */
  347. static noinline int
  348. rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
  349. struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
  350. {
  351. struct xdr_stream *xdr = &req->rl_stream;
  352. struct rpcrdma_mr_seg *seg;
  353. struct rpcrdma_mw *mw;
  354. int nsegs, nchunks;
  355. __be32 *segcount;
  356. seg = req->rl_segments;
  357. nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
  358. rqst->rq_rcv_buf.head[0].iov_len,
  359. wtype, seg);
  360. if (nsegs < 0)
  361. return nsegs;
  362. if (encode_item_present(xdr) < 0)
  363. return -EMSGSIZE;
  364. segcount = xdr_reserve_space(xdr, sizeof(*segcount));
  365. if (unlikely(!segcount))
  366. return -EMSGSIZE;
  367. /* Actual value encoded below */
  368. nchunks = 0;
  369. do {
  370. seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
  371. true, &mw);
  372. if (IS_ERR(seg))
  373. return PTR_ERR(seg);
  374. rpcrdma_push_mw(mw, &req->rl_registered);
  375. if (encode_rdma_segment(xdr, mw) < 0)
  376. return -EMSGSIZE;
  377. dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
  378. rqst->rq_task->tk_pid, __func__,
  379. mw->mw_length, (unsigned long long)mw->mw_offset,
  380. mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
  381. r_xprt->rx_stats.write_chunk_count++;
  382. r_xprt->rx_stats.total_rdma_request += seg->mr_len;
  383. nchunks++;
  384. nsegs -= mw->mw_nents;
  385. } while (nsegs);
  386. /* Update count of segments in this Write chunk */
  387. *segcount = cpu_to_be32(nchunks);
  388. return 0;
  389. }
  390. /* Register and XDR encode the Reply chunk. Supports encoding an array
  391. * of plain segments that belong to a single write (reply) chunk.
  392. *
  393. * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  394. *
  395. * Reply chunk (a counted array):
  396. * N elements:
  397. * 1 - N - HLOO - HLOO - ... - HLOO
  398. *
  399. * Returns zero on success, or a negative errno if a failure occurred.
  400. * @xdr is advanced to the next position in the stream.
  401. */
  402. static noinline int
  403. rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
  404. struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
  405. {
  406. struct xdr_stream *xdr = &req->rl_stream;
  407. struct rpcrdma_mr_seg *seg;
  408. struct rpcrdma_mw *mw;
  409. int nsegs, nchunks;
  410. __be32 *segcount;
  411. seg = req->rl_segments;
  412. nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
  413. if (nsegs < 0)
  414. return nsegs;
  415. if (encode_item_present(xdr) < 0)
  416. return -EMSGSIZE;
  417. segcount = xdr_reserve_space(xdr, sizeof(*segcount));
  418. if (unlikely(!segcount))
  419. return -EMSGSIZE;
  420. /* Actual value encoded below */
  421. nchunks = 0;
  422. do {
  423. seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
  424. true, &mw);
  425. if (IS_ERR(seg))
  426. return PTR_ERR(seg);
  427. rpcrdma_push_mw(mw, &req->rl_registered);
  428. if (encode_rdma_segment(xdr, mw) < 0)
  429. return -EMSGSIZE;
  430. dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
  431. rqst->rq_task->tk_pid, __func__,
  432. mw->mw_length, (unsigned long long)mw->mw_offset,
  433. mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
  434. r_xprt->rx_stats.reply_chunk_count++;
  435. r_xprt->rx_stats.total_rdma_request += seg->mr_len;
  436. nchunks++;
  437. nsegs -= mw->mw_nents;
  438. } while (nsegs);
  439. /* Update count of segments in the Reply chunk */
  440. *segcount = cpu_to_be32(nchunks);
  441. return 0;
  442. }
  443. /**
  444. * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
  445. * @sc: sendctx containing SGEs to unmap
  446. *
  447. */
  448. void
  449. rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
  450. {
  451. struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
  452. struct ib_sge *sge;
  453. unsigned int count;
  454. dprintk("RPC: %s: unmapping %u sges for sc=%p\n",
  455. __func__, sc->sc_unmap_count, sc);
  456. /* The first two SGEs contain the transport header and
  457. * the inline buffer. These are always left mapped so
  458. * they can be cheaply re-used.
  459. */
  460. sge = &sc->sc_sges[2];
  461. for (count = sc->sc_unmap_count; count; ++sge, --count)
  462. ib_dma_unmap_page(ia->ri_device,
  463. sge->addr, sge->length, DMA_TO_DEVICE);
  464. if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
  465. smp_mb__after_atomic();
  466. wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
  467. }
  468. }
  469. /* Prepare an SGE for the RPC-over-RDMA transport header.
  470. */
  471. static bool
  472. rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
  473. u32 len)
  474. {
  475. struct rpcrdma_sendctx *sc = req->rl_sendctx;
  476. struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
  477. struct ib_sge *sge = sc->sc_sges;
  478. if (!rpcrdma_dma_map_regbuf(ia, rb))
  479. goto out_regbuf;
  480. sge->addr = rdmab_addr(rb);
  481. sge->length = len;
  482. sge->lkey = rdmab_lkey(rb);
  483. ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
  484. sge->length, DMA_TO_DEVICE);
  485. sc->sc_wr.num_sge++;
  486. return true;
  487. out_regbuf:
  488. pr_err("rpcrdma: failed to DMA map a Send buffer\n");
  489. return false;
  490. }
  491. /* Prepare the Send SGEs. The head and tail iovec, and each entry
  492. * in the page list, gets its own SGE.
  493. */
  494. static bool
  495. rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
  496. struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
  497. {
  498. struct rpcrdma_sendctx *sc = req->rl_sendctx;
  499. unsigned int sge_no, page_base, len, remaining;
  500. struct rpcrdma_regbuf *rb = req->rl_sendbuf;
  501. struct ib_device *device = ia->ri_device;
  502. struct ib_sge *sge = sc->sc_sges;
  503. u32 lkey = ia->ri_pd->local_dma_lkey;
  504. struct page *page, **ppages;
  505. /* The head iovec is straightforward, as it is already
  506. * DMA-mapped. Sync the content that has changed.
  507. */
  508. if (!rpcrdma_dma_map_regbuf(ia, rb))
  509. goto out_regbuf;
  510. sge_no = 1;
  511. sge[sge_no].addr = rdmab_addr(rb);
  512. sge[sge_no].length = xdr->head[0].iov_len;
  513. sge[sge_no].lkey = rdmab_lkey(rb);
  514. ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
  515. sge[sge_no].length, DMA_TO_DEVICE);
  516. /* If there is a Read chunk, the page list is being handled
  517. * via explicit RDMA, and thus is skipped here. However, the
  518. * tail iovec may include an XDR pad for the page list, as
  519. * well as additional content, and may not reside in the
  520. * same page as the head iovec.
  521. */
  522. if (rtype == rpcrdma_readch) {
  523. len = xdr->tail[0].iov_len;
  524. /* Do not include the tail if it is only an XDR pad */
  525. if (len < 4)
  526. goto out;
  527. page = virt_to_page(xdr->tail[0].iov_base);
  528. page_base = offset_in_page(xdr->tail[0].iov_base);
  529. /* If the content in the page list is an odd length,
  530. * xdr_write_pages() has added a pad at the beginning
  531. * of the tail iovec. Force the tail's non-pad content
  532. * to land at the next XDR position in the Send message.
  533. */
  534. page_base += len & 3;
  535. len -= len & 3;
  536. goto map_tail;
  537. }
  538. /* If there is a page list present, temporarily DMA map
  539. * and prepare an SGE for each page to be sent.
  540. */
  541. if (xdr->page_len) {
  542. ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
  543. page_base = offset_in_page(xdr->page_base);
  544. remaining = xdr->page_len;
  545. while (remaining) {
  546. sge_no++;
  547. if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
  548. goto out_mapping_overflow;
  549. len = min_t(u32, PAGE_SIZE - page_base, remaining);
  550. sge[sge_no].addr = ib_dma_map_page(device, *ppages,
  551. page_base, len,
  552. DMA_TO_DEVICE);
  553. if (ib_dma_mapping_error(device, sge[sge_no].addr))
  554. goto out_mapping_err;
  555. sge[sge_no].length = len;
  556. sge[sge_no].lkey = lkey;
  557. sc->sc_unmap_count++;
  558. ppages++;
  559. remaining -= len;
  560. page_base = 0;
  561. }
  562. }
  563. /* The tail iovec is not always constructed in the same
  564. * page where the head iovec resides (see, for example,
  565. * gss_wrap_req_priv). To neatly accommodate that case,
  566. * DMA map it separately.
  567. */
  568. if (xdr->tail[0].iov_len) {
  569. page = virt_to_page(xdr->tail[0].iov_base);
  570. page_base = offset_in_page(xdr->tail[0].iov_base);
  571. len = xdr->tail[0].iov_len;
  572. map_tail:
  573. sge_no++;
  574. sge[sge_no].addr = ib_dma_map_page(device, page,
  575. page_base, len,
  576. DMA_TO_DEVICE);
  577. if (ib_dma_mapping_error(device, sge[sge_no].addr))
  578. goto out_mapping_err;
  579. sge[sge_no].length = len;
  580. sge[sge_no].lkey = lkey;
  581. sc->sc_unmap_count++;
  582. }
  583. out:
  584. sc->sc_wr.num_sge += sge_no;
  585. if (sc->sc_unmap_count)
  586. __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
  587. return true;
  588. out_regbuf:
  589. pr_err("rpcrdma: failed to DMA map a Send buffer\n");
  590. return false;
  591. out_mapping_overflow:
  592. rpcrdma_unmap_sendctx(sc);
  593. pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
  594. return false;
  595. out_mapping_err:
  596. rpcrdma_unmap_sendctx(sc);
  597. pr_err("rpcrdma: Send mapping error\n");
  598. return false;
  599. }
  600. /**
  601. * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
  602. * @r_xprt: controlling transport
  603. * @req: context of RPC Call being marshalled
  604. * @hdrlen: size of transport header, in bytes
  605. * @xdr: xdr_buf containing RPC Call
  606. * @rtype: chunk type being encoded
  607. *
  608. * Returns 0 on success; otherwise a negative errno is returned.
  609. */
  610. int
  611. rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
  612. struct rpcrdma_req *req, u32 hdrlen,
  613. struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
  614. {
  615. req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
  616. if (!req->rl_sendctx)
  617. return -ENOBUFS;
  618. req->rl_sendctx->sc_wr.num_sge = 0;
  619. req->rl_sendctx->sc_unmap_count = 0;
  620. req->rl_sendctx->sc_req = req;
  621. __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
  622. if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
  623. return -EIO;
  624. if (rtype != rpcrdma_areadch)
  625. if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
  626. return -EIO;
  627. return 0;
  628. }
  629. /**
  630. * rpcrdma_marshal_req - Marshal and send one RPC request
  631. * @r_xprt: controlling transport
  632. * @rqst: RPC request to be marshaled
  633. *
  634. * For the RPC in "rqst", this function:
  635. * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
  636. * - Registers Read, Write, and Reply chunks
  637. * - Constructs the transport header
  638. * - Posts a Send WR to send the transport header and request
  639. *
  640. * Returns:
  641. * %0 if the RPC was sent successfully,
  642. * %-ENOTCONN if the connection was lost,
  643. * %-EAGAIN if not enough pages are available for on-demand reply buffer,
  644. * %-ENOBUFS if no MRs are available to register chunks,
  645. * %-EMSGSIZE if the transport header is too small,
  646. * %-EIO if a permanent problem occurred while marshaling.
  647. */
  648. int
  649. rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
  650. {
  651. struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
  652. struct xdr_stream *xdr = &req->rl_stream;
  653. enum rpcrdma_chunktype rtype, wtype;
  654. bool ddp_allowed;
  655. __be32 *p;
  656. int ret;
  657. #if defined(CONFIG_SUNRPC_BACKCHANNEL)
  658. if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
  659. return rpcrdma_bc_marshal_reply(rqst);
  660. #endif
  661. rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
  662. xdr_init_encode(xdr, &req->rl_hdrbuf,
  663. req->rl_rdmabuf->rg_base);
  664. /* Fixed header fields */
  665. ret = -EMSGSIZE;
  666. p = xdr_reserve_space(xdr, 4 * sizeof(*p));
  667. if (!p)
  668. goto out_err;
  669. *p++ = rqst->rq_xid;
  670. *p++ = rpcrdma_version;
  671. *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
  672. /* When the ULP employs a GSS flavor that guarantees integrity
  673. * or privacy, direct data placement of individual data items
  674. * is not allowed.
  675. */
  676. ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
  677. RPCAUTH_AUTH_DATATOUCH);
  678. /*
  679. * Chunks needed for results?
  680. *
  681. * o If the expected result is under the inline threshold, all ops
  682. * return as inline.
  683. * o Large read ops return data as write chunk(s), header as
  684. * inline.
  685. * o Large non-read ops return as a single reply chunk.
  686. */
  687. if (rpcrdma_results_inline(r_xprt, rqst))
  688. wtype = rpcrdma_noch;
  689. else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
  690. wtype = rpcrdma_writech;
  691. else
  692. wtype = rpcrdma_replych;
  693. /*
  694. * Chunks needed for arguments?
  695. *
  696. * o If the total request is under the inline threshold, all ops
  697. * are sent as inline.
  698. * o Large write ops transmit data as read chunk(s), header as
  699. * inline.
  700. * o Large non-write ops are sent with the entire message as a
  701. * single read chunk (protocol 0-position special case).
  702. *
  703. * This assumes that the upper layer does not present a request
  704. * that both has a data payload, and whose non-data arguments
  705. * by themselves are larger than the inline threshold.
  706. */
  707. if (rpcrdma_args_inline(r_xprt, rqst)) {
  708. *p++ = rdma_msg;
  709. rtype = rpcrdma_noch;
  710. } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
  711. *p++ = rdma_msg;
  712. rtype = rpcrdma_readch;
  713. } else {
  714. r_xprt->rx_stats.nomsg_call_count++;
  715. *p++ = rdma_nomsg;
  716. rtype = rpcrdma_areadch;
  717. }
  718. /* This implementation supports the following combinations
  719. * of chunk lists in one RPC-over-RDMA Call message:
  720. *
  721. * - Read list
  722. * - Write list
  723. * - Reply chunk
  724. * - Read list + Reply chunk
  725. *
  726. * It might not yet support the following combinations:
  727. *
  728. * - Read list + Write list
  729. *
  730. * It does not support the following combinations:
  731. *
  732. * - Write list + Reply chunk
  733. * - Read list + Write list + Reply chunk
  734. *
  735. * This implementation supports only a single chunk in each
  736. * Read or Write list. Thus for example the client cannot
  737. * send a Call message with a Position Zero Read chunk and a
  738. * regular Read chunk at the same time.
  739. */
  740. if (rtype != rpcrdma_noch) {
  741. ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
  742. if (ret)
  743. goto out_err;
  744. }
  745. ret = encode_item_not_present(xdr);
  746. if (ret)
  747. goto out_err;
  748. if (wtype == rpcrdma_writech) {
  749. ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
  750. if (ret)
  751. goto out_err;
  752. }
  753. ret = encode_item_not_present(xdr);
  754. if (ret)
  755. goto out_err;
  756. if (wtype != rpcrdma_replych)
  757. ret = encode_item_not_present(xdr);
  758. else
  759. ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
  760. if (ret)
  761. goto out_err;
  762. dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n",
  763. rqst->rq_task->tk_pid, __func__,
  764. transfertypes[rtype], transfertypes[wtype],
  765. xdr_stream_pos(xdr));
  766. ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
  767. &rqst->rq_snd_buf, rtype);
  768. if (ret)
  769. goto out_err;
  770. return 0;
  771. out_err:
  772. if (ret != -ENOBUFS) {
  773. pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
  774. r_xprt->rx_stats.failed_marshal_count++;
  775. }
  776. return ret;
  777. }
  778. /**
  779. * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
  780. * @rqst: controlling RPC request
  781. * @srcp: points to RPC message payload in receive buffer
  782. * @copy_len: remaining length of receive buffer content
  783. * @pad: Write chunk pad bytes needed (zero for pure inline)
  784. *
  785. * The upper layer has set the maximum number of bytes it can
  786. * receive in each component of rq_rcv_buf. These values are set in
  787. * the head.iov_len, page_len, tail.iov_len, and buflen fields.
  788. *
  789. * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
  790. * many cases this function simply updates iov_base pointers in
  791. * rq_rcv_buf to point directly to the received reply data, to
  792. * avoid copying reply data.
  793. *
  794. * Returns the count of bytes which had to be memcopied.
  795. */
  796. static unsigned long
  797. rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
  798. {
  799. unsigned long fixup_copy_count;
  800. int i, npages, curlen;
  801. char *destp;
  802. struct page **ppages;
  803. int page_base;
  804. /* The head iovec is redirected to the RPC reply message
  805. * in the receive buffer, to avoid a memcopy.
  806. */
  807. rqst->rq_rcv_buf.head[0].iov_base = srcp;
  808. rqst->rq_private_buf.head[0].iov_base = srcp;
  809. /* The contents of the receive buffer that follow
  810. * head.iov_len bytes are copied into the page list.
  811. */
  812. curlen = rqst->rq_rcv_buf.head[0].iov_len;
  813. if (curlen > copy_len)
  814. curlen = copy_len;
  815. dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
  816. __func__, srcp, copy_len, curlen);
  817. srcp += curlen;
  818. copy_len -= curlen;
  819. ppages = rqst->rq_rcv_buf.pages +
  820. (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
  821. page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
  822. fixup_copy_count = 0;
  823. if (copy_len && rqst->rq_rcv_buf.page_len) {
  824. int pagelist_len;
  825. pagelist_len = rqst->rq_rcv_buf.page_len;
  826. if (pagelist_len > copy_len)
  827. pagelist_len = copy_len;
  828. npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
  829. for (i = 0; i < npages; i++) {
  830. curlen = PAGE_SIZE - page_base;
  831. if (curlen > pagelist_len)
  832. curlen = pagelist_len;
  833. dprintk("RPC: %s: page %d"
  834. " srcp 0x%p len %d curlen %d\n",
  835. __func__, i, srcp, copy_len, curlen);
  836. destp = kmap_atomic(ppages[i]);
  837. memcpy(destp + page_base, srcp, curlen);
  838. flush_dcache_page(ppages[i]);
  839. kunmap_atomic(destp);
  840. srcp += curlen;
  841. copy_len -= curlen;
  842. fixup_copy_count += curlen;
  843. pagelist_len -= curlen;
  844. if (!pagelist_len)
  845. break;
  846. page_base = 0;
  847. }
  848. /* Implicit padding for the last segment in a Write
  849. * chunk is inserted inline at the front of the tail
  850. * iovec. The upper layer ignores the content of
  851. * the pad. Simply ensure inline content in the tail
  852. * that follows the Write chunk is properly aligned.
  853. */
  854. if (pad)
  855. srcp -= pad;
  856. }
  857. /* The tail iovec is redirected to the remaining data
  858. * in the receive buffer, to avoid a memcopy.
  859. */
  860. if (copy_len || pad) {
  861. rqst->rq_rcv_buf.tail[0].iov_base = srcp;
  862. rqst->rq_private_buf.tail[0].iov_base = srcp;
  863. }
  864. return fixup_copy_count;
  865. }
  866. /* Caller must guarantee @rep remains stable during this call.
  867. */
  868. static void
  869. rpcrdma_mark_remote_invalidation(struct list_head *mws,
  870. struct rpcrdma_rep *rep)
  871. {
  872. struct rpcrdma_mw *mw;
  873. if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE))
  874. return;
  875. list_for_each_entry(mw, mws, mw_list)
  876. if (mw->mw_handle == rep->rr_inv_rkey) {
  877. mw->mw_flags = RPCRDMA_MW_F_RI;
  878. break; /* only one invalidated MR per RPC */
  879. }
  880. }
  881. /* By convention, backchannel calls arrive via rdma_msg type
  882. * messages, and never populate the chunk lists. This makes
  883. * the RPC/RDMA header small and fixed in size, so it is
  884. * straightforward to check the RPC header's direction field.
  885. */
  886. static bool
  887. rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
  888. #if defined(CONFIG_SUNRPC_BACKCHANNEL)
  889. {
  890. struct xdr_stream *xdr = &rep->rr_stream;
  891. __be32 *p;
  892. if (rep->rr_proc != rdma_msg)
  893. return false;
  894. /* Peek at stream contents without advancing. */
  895. p = xdr_inline_decode(xdr, 0);
  896. /* Chunk lists */
  897. if (*p++ != xdr_zero)
  898. return false;
  899. if (*p++ != xdr_zero)
  900. return false;
  901. if (*p++ != xdr_zero)
  902. return false;
  903. /* RPC header */
  904. if (*p++ != rep->rr_xid)
  905. return false;
  906. if (*p != cpu_to_be32(RPC_CALL))
  907. return false;
  908. /* Now that we are sure this is a backchannel call,
  909. * advance to the RPC header.
  910. */
  911. p = xdr_inline_decode(xdr, 3 * sizeof(*p));
  912. if (unlikely(!p))
  913. goto out_short;
  914. rpcrdma_bc_receive_call(r_xprt, rep);
  915. return true;
  916. out_short:
  917. pr_warn("RPC/RDMA short backward direction call\n");
  918. if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
  919. xprt_disconnect_done(&r_xprt->rx_xprt);
  920. return true;
  921. }
  922. #else /* CONFIG_SUNRPC_BACKCHANNEL */
  923. {
  924. return false;
  925. }
  926. #endif /* CONFIG_SUNRPC_BACKCHANNEL */
  927. static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
  928. {
  929. __be32 *p;
  930. p = xdr_inline_decode(xdr, 4 * sizeof(*p));
  931. if (unlikely(!p))
  932. return -EIO;
  933. ifdebug(FACILITY) {
  934. u64 offset;
  935. u32 handle;
  936. handle = be32_to_cpup(p++);
  937. *length = be32_to_cpup(p++);
  938. xdr_decode_hyper(p, &offset);
  939. dprintk("RPC: %s: segment %u@0x%016llx:0x%08x\n",
  940. __func__, *length, (unsigned long long)offset,
  941. handle);
  942. } else {
  943. *length = be32_to_cpup(p + 1);
  944. }
  945. return 0;
  946. }
  947. static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
  948. {
  949. u32 segcount, seglength;
  950. __be32 *p;
  951. p = xdr_inline_decode(xdr, sizeof(*p));
  952. if (unlikely(!p))
  953. return -EIO;
  954. *length = 0;
  955. segcount = be32_to_cpup(p);
  956. while (segcount--) {
  957. if (decode_rdma_segment(xdr, &seglength))
  958. return -EIO;
  959. *length += seglength;
  960. }
  961. dprintk("RPC: %s: segcount=%u, %u bytes\n",
  962. __func__, be32_to_cpup(p), *length);
  963. return 0;
  964. }
  965. /* In RPC-over-RDMA Version One replies, a Read list is never
  966. * expected. This decoder is a stub that returns an error if
  967. * a Read list is present.
  968. */
  969. static int decode_read_list(struct xdr_stream *xdr)
  970. {
  971. __be32 *p;
  972. p = xdr_inline_decode(xdr, sizeof(*p));
  973. if (unlikely(!p))
  974. return -EIO;
  975. if (unlikely(*p != xdr_zero))
  976. return -EIO;
  977. return 0;
  978. }
  979. /* Supports only one Write chunk in the Write list
  980. */
  981. static int decode_write_list(struct xdr_stream *xdr, u32 *length)
  982. {
  983. u32 chunklen;
  984. bool first;
  985. __be32 *p;
  986. *length = 0;
  987. first = true;
  988. do {
  989. p = xdr_inline_decode(xdr, sizeof(*p));
  990. if (unlikely(!p))
  991. return -EIO;
  992. if (*p == xdr_zero)
  993. break;
  994. if (!first)
  995. return -EIO;
  996. if (decode_write_chunk(xdr, &chunklen))
  997. return -EIO;
  998. *length += chunklen;
  999. first = false;
  1000. } while (true);
  1001. return 0;
  1002. }
  1003. static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
  1004. {
  1005. __be32 *p;
  1006. p = xdr_inline_decode(xdr, sizeof(*p));
  1007. if (unlikely(!p))
  1008. return -EIO;
  1009. *length = 0;
  1010. if (*p != xdr_zero)
  1011. if (decode_write_chunk(xdr, length))
  1012. return -EIO;
  1013. return 0;
  1014. }
  1015. static int
  1016. rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
  1017. struct rpc_rqst *rqst)
  1018. {
  1019. struct xdr_stream *xdr = &rep->rr_stream;
  1020. u32 writelist, replychunk, rpclen;
  1021. char *base;
  1022. /* Decode the chunk lists */
  1023. if (decode_read_list(xdr))
  1024. return -EIO;
  1025. if (decode_write_list(xdr, &writelist))
  1026. return -EIO;
  1027. if (decode_reply_chunk(xdr, &replychunk))
  1028. return -EIO;
  1029. /* RDMA_MSG sanity checks */
  1030. if (unlikely(replychunk))
  1031. return -EIO;
  1032. /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
  1033. base = (char *)xdr_inline_decode(xdr, 0);
  1034. rpclen = xdr_stream_remaining(xdr);
  1035. r_xprt->rx_stats.fixup_copy_count +=
  1036. rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
  1037. r_xprt->rx_stats.total_rdma_reply += writelist;
  1038. return rpclen + xdr_align_size(writelist);
  1039. }
  1040. static noinline int
  1041. rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
  1042. {
  1043. struct xdr_stream *xdr = &rep->rr_stream;
  1044. u32 writelist, replychunk;
  1045. /* Decode the chunk lists */
  1046. if (decode_read_list(xdr))
  1047. return -EIO;
  1048. if (decode_write_list(xdr, &writelist))
  1049. return -EIO;
  1050. if (decode_reply_chunk(xdr, &replychunk))
  1051. return -EIO;
  1052. /* RDMA_NOMSG sanity checks */
  1053. if (unlikely(writelist))
  1054. return -EIO;
  1055. if (unlikely(!replychunk))
  1056. return -EIO;
  1057. /* Reply chunk buffer already is the reply vector */
  1058. r_xprt->rx_stats.total_rdma_reply += replychunk;
  1059. return replychunk;
  1060. }
  1061. static noinline int
  1062. rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
  1063. struct rpc_rqst *rqst)
  1064. {
  1065. struct xdr_stream *xdr = &rep->rr_stream;
  1066. __be32 *p;
  1067. p = xdr_inline_decode(xdr, sizeof(*p));
  1068. if (unlikely(!p))
  1069. return -EIO;
  1070. switch (*p) {
  1071. case err_vers:
  1072. p = xdr_inline_decode(xdr, 2 * sizeof(*p));
  1073. if (!p)
  1074. break;
  1075. dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
  1076. rqst->rq_task->tk_pid, __func__,
  1077. be32_to_cpup(p), be32_to_cpu(*(p + 1)));
  1078. break;
  1079. case err_chunk:
  1080. dprintk("RPC: %5u: %s: server reports header decoding error\n",
  1081. rqst->rq_task->tk_pid, __func__);
  1082. break;
  1083. default:
  1084. dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
  1085. rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
  1086. }
  1087. r_xprt->rx_stats.bad_reply_count++;
  1088. return -EREMOTEIO;
  1089. }
  1090. /* Perform XID lookup, reconstruction of the RPC reply, and
  1091. * RPC completion while holding the transport lock to ensure
  1092. * the rep, rqst, and rq_task pointers remain stable.
  1093. */
  1094. void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
  1095. {
  1096. struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
  1097. struct rpc_xprt *xprt = &r_xprt->rx_xprt;
  1098. struct rpc_rqst *rqst = rep->rr_rqst;
  1099. unsigned long cwnd;
  1100. int status;
  1101. xprt->reestablish_timeout = 0;
  1102. switch (rep->rr_proc) {
  1103. case rdma_msg:
  1104. status = rpcrdma_decode_msg(r_xprt, rep, rqst);
  1105. break;
  1106. case rdma_nomsg:
  1107. status = rpcrdma_decode_nomsg(r_xprt, rep);
  1108. break;
  1109. case rdma_error:
  1110. status = rpcrdma_decode_error(r_xprt, rep, rqst);
  1111. break;
  1112. default:
  1113. status = -EIO;
  1114. }
  1115. if (status < 0)
  1116. goto out_badheader;
  1117. out:
  1118. spin_lock(&xprt->recv_lock);
  1119. cwnd = xprt->cwnd;
  1120. xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
  1121. if (xprt->cwnd > cwnd)
  1122. xprt_release_rqst_cong(rqst->rq_task);
  1123. xprt_complete_rqst(rqst->rq_task, status);
  1124. xprt_unpin_rqst(rqst);
  1125. spin_unlock(&xprt->recv_lock);
  1126. return;
  1127. /* If the incoming reply terminated a pending RPC, the next
  1128. * RPC call will post a replacement receive buffer as it is
  1129. * being marshaled.
  1130. */
  1131. out_badheader:
  1132. dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
  1133. rqst->rq_task->tk_pid, __func__, be32_to_cpu(rep->rr_proc));
  1134. r_xprt->rx_stats.bad_reply_count++;
  1135. status = -EIO;
  1136. goto out;
  1137. }
  1138. void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
  1139. {
  1140. /* Invalidate and unmap the data payloads before waking
  1141. * the waiting application. This guarantees the memory
  1142. * regions are properly fenced from the server before the
  1143. * application accesses the data. It also ensures proper
  1144. * send flow control: waking the next RPC waits until this
  1145. * RPC has relinquished all its Send Queue entries.
  1146. */
  1147. if (!list_empty(&req->rl_registered))
  1148. r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
  1149. &req->rl_registered);
  1150. /* Ensure that any DMA mapped pages associated with
  1151. * the Send of the RPC Call have been unmapped before
  1152. * allowing the RPC to complete. This protects argument
  1153. * memory not controlled by the RPC client from being
  1154. * re-used before we're done with it.
  1155. */
  1156. if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
  1157. r_xprt->rx_stats.reply_waits_for_send++;
  1158. out_of_line_wait_on_bit(&req->rl_flags,
  1159. RPCRDMA_REQ_F_TX_RESOURCES,
  1160. bit_wait,
  1161. TASK_UNINTERRUPTIBLE);
  1162. }
  1163. }
  1164. /* Reply handling runs in the poll worker thread. Anything that
  1165. * might wait is deferred to a separate workqueue.
  1166. */
  1167. void rpcrdma_deferred_completion(struct work_struct *work)
  1168. {
  1169. struct rpcrdma_rep *rep =
  1170. container_of(work, struct rpcrdma_rep, rr_work);
  1171. struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
  1172. rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
  1173. rpcrdma_release_rqst(rep->rr_rxprt, req);
  1174. rpcrdma_complete_rqst(rep);
  1175. }
  1176. /* Process received RPC/RDMA messages.
  1177. *
  1178. * Errors must result in the RPC task either being awakened, or
  1179. * allowed to timeout, to discover the errors at that time.
  1180. */
  1181. void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
  1182. {
  1183. struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
  1184. struct rpc_xprt *xprt = &r_xprt->rx_xprt;
  1185. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  1186. struct rpcrdma_req *req;
  1187. struct rpc_rqst *rqst;
  1188. u32 credits;
  1189. __be32 *p;
  1190. dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
  1191. if (rep->rr_hdrbuf.head[0].iov_len == 0)
  1192. goto out_badstatus;
  1193. xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
  1194. rep->rr_hdrbuf.head[0].iov_base);
  1195. /* Fixed transport header fields */
  1196. p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
  1197. if (unlikely(!p))
  1198. goto out_shortreply;
  1199. rep->rr_xid = *p++;
  1200. rep->rr_vers = *p++;
  1201. credits = be32_to_cpu(*p++);
  1202. rep->rr_proc = *p++;
  1203. if (rep->rr_vers != rpcrdma_version)
  1204. goto out_badversion;
  1205. if (rpcrdma_is_bcall(r_xprt, rep))
  1206. return;
  1207. /* Match incoming rpcrdma_rep to an rpcrdma_req to
  1208. * get context for handling any incoming chunks.
  1209. */
  1210. spin_lock(&xprt->recv_lock);
  1211. rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
  1212. if (!rqst)
  1213. goto out_norqst;
  1214. xprt_pin_rqst(rqst);
  1215. if (credits == 0)
  1216. credits = 1; /* don't deadlock */
  1217. else if (credits > buf->rb_max_requests)
  1218. credits = buf->rb_max_requests;
  1219. buf->rb_credits = credits;
  1220. spin_unlock(&xprt->recv_lock);
  1221. req = rpcr_to_rdmar(rqst);
  1222. req->rl_reply = rep;
  1223. rep->rr_rqst = rqst;
  1224. clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
  1225. dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
  1226. __func__, rep, req, be32_to_cpu(rep->rr_xid));
  1227. if (list_empty(&req->rl_registered) &&
  1228. !test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
  1229. rpcrdma_complete_rqst(rep);
  1230. else
  1231. queue_work(rpcrdma_receive_wq, &rep->rr_work);
  1232. return;
  1233. out_badstatus:
  1234. rpcrdma_recv_buffer_put(rep);
  1235. if (r_xprt->rx_ep.rep_connected == 1) {
  1236. r_xprt->rx_ep.rep_connected = -EIO;
  1237. rpcrdma_conn_func(&r_xprt->rx_ep);
  1238. }
  1239. return;
  1240. out_badversion:
  1241. dprintk("RPC: %s: invalid version %d\n",
  1242. __func__, be32_to_cpu(rep->rr_vers));
  1243. goto repost;
  1244. /* The RPC transaction has already been terminated, or the header
  1245. * is corrupt.
  1246. */
  1247. out_norqst:
  1248. spin_unlock(&xprt->recv_lock);
  1249. dprintk("RPC: %s: no match for incoming xid 0x%08x\n",
  1250. __func__, be32_to_cpu(rep->rr_xid));
  1251. goto repost;
  1252. out_shortreply:
  1253. dprintk("RPC: %s: short/invalid reply\n", __func__);
  1254. /* If no pending RPC transaction was matched, post a replacement
  1255. * receive buffer before returning.
  1256. */
  1257. repost:
  1258. r_xprt->rx_stats.bad_reply_count++;
  1259. if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
  1260. rpcrdma_recv_buffer_put(rep);
  1261. }