verbs.c 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603
  1. /*
  2. * Copyright (c) 2014-2017 Oracle. All rights reserved.
  3. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  4. *
  5. * This software is available to you under a choice of one of two
  6. * licenses. You may choose to be licensed under the terms of the GNU
  7. * General Public License (GPL) Version 2, available from the file
  8. * COPYING in the main directory of this source tree, or the BSD-type
  9. * license below:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions
  13. * are met:
  14. *
  15. * Redistributions of source code must retain the above copyright
  16. * notice, this list of conditions and the following disclaimer.
  17. *
  18. * Redistributions in binary form must reproduce the above
  19. * copyright notice, this list of conditions and the following
  20. * disclaimer in the documentation and/or other materials provided
  21. * with the distribution.
  22. *
  23. * Neither the name of the Network Appliance, Inc. nor the names of
  24. * its contributors may be used to endorse or promote products
  25. * derived from this software without specific prior written
  26. * permission.
  27. *
  28. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  29. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  30. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  31. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  32. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  33. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  34. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  35. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  36. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  37. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  38. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  39. */
  40. /*
  41. * verbs.c
  42. *
  43. * Encapsulates the major functions managing:
  44. * o adapters
  45. * o endpoints
  46. * o connections
  47. * o buffer memory
  48. */
  49. #include <linux/interrupt.h>
  50. #include <linux/slab.h>
  51. #include <linux/sunrpc/addr.h>
  52. #include <linux/sunrpc/svc_rdma.h>
  53. #include <asm-generic/barrier.h>
  54. #include <asm/bitops.h>
  55. #include <rdma/ib_cm.h>
  56. #include "xprt_rdma.h"
  57. /*
  58. * Globals/Macros
  59. */
  60. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  61. # define RPCDBG_FACILITY RPCDBG_TRANS
  62. #endif
  63. /*
  64. * internal functions
  65. */
  66. static void rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt);
  67. static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
  68. static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
  69. struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
  70. int
  71. rpcrdma_alloc_wq(void)
  72. {
  73. struct workqueue_struct *recv_wq;
  74. recv_wq = alloc_workqueue("xprtrdma_receive",
  75. WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
  76. 0);
  77. if (!recv_wq)
  78. return -ENOMEM;
  79. rpcrdma_receive_wq = recv_wq;
  80. return 0;
  81. }
  82. void
  83. rpcrdma_destroy_wq(void)
  84. {
  85. struct workqueue_struct *wq;
  86. if (rpcrdma_receive_wq) {
  87. wq = rpcrdma_receive_wq;
  88. rpcrdma_receive_wq = NULL;
  89. destroy_workqueue(wq);
  90. }
  91. }
  92. static void
  93. rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
  94. {
  95. struct rpcrdma_ep *ep = context;
  96. pr_err("rpcrdma: %s on device %s ep %p\n",
  97. ib_event_msg(event->event), event->device->name, context);
  98. if (ep->rep_connected == 1) {
  99. ep->rep_connected = -EIO;
  100. rpcrdma_conn_func(ep);
  101. wake_up_all(&ep->rep_connect_wait);
  102. }
  103. }
  104. /**
  105. * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
  106. * @cq: completion queue (ignored)
  107. * @wc: completed WR
  108. *
  109. */
  110. static void
  111. rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
  112. {
  113. struct ib_cqe *cqe = wc->wr_cqe;
  114. struct rpcrdma_sendctx *sc =
  115. container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
  116. /* WARNING: Only wr_cqe and status are reliable at this point */
  117. if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
  118. pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
  119. ib_wc_status_msg(wc->status),
  120. wc->status, wc->vendor_err);
  121. rpcrdma_sendctx_put_locked(sc);
  122. }
  123. /**
  124. * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
  125. * @cq: completion queue (ignored)
  126. * @wc: completed WR
  127. *
  128. */
  129. static void
  130. rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
  131. {
  132. struct ib_cqe *cqe = wc->wr_cqe;
  133. struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
  134. rr_cqe);
  135. /* WARNING: Only wr_id and status are reliable at this point */
  136. if (wc->status != IB_WC_SUCCESS)
  137. goto out_fail;
  138. /* status == SUCCESS means all fields in wc are trustworthy */
  139. dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
  140. __func__, rep, wc->byte_len);
  141. rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
  142. rep->rr_wc_flags = wc->wc_flags;
  143. rep->rr_inv_rkey = wc->ex.invalidate_rkey;
  144. ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
  145. rdmab_addr(rep->rr_rdmabuf),
  146. wc->byte_len, DMA_FROM_DEVICE);
  147. out_schedule:
  148. rpcrdma_reply_handler(rep);
  149. return;
  150. out_fail:
  151. if (wc->status != IB_WC_WR_FLUSH_ERR)
  152. pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
  153. ib_wc_status_msg(wc->status),
  154. wc->status, wc->vendor_err);
  155. rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
  156. goto out_schedule;
  157. }
  158. static void
  159. rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
  160. struct rdma_conn_param *param)
  161. {
  162. struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
  163. const struct rpcrdma_connect_private *pmsg = param->private_data;
  164. unsigned int rsize, wsize;
  165. /* Default settings for RPC-over-RDMA Version One */
  166. r_xprt->rx_ia.ri_reminv_expected = false;
  167. r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
  168. rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
  169. wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
  170. if (pmsg &&
  171. pmsg->cp_magic == rpcrdma_cmp_magic &&
  172. pmsg->cp_version == RPCRDMA_CMP_VERSION) {
  173. r_xprt->rx_ia.ri_reminv_expected = true;
  174. r_xprt->rx_ia.ri_implicit_roundup = true;
  175. rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
  176. wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
  177. }
  178. if (rsize < cdata->inline_rsize)
  179. cdata->inline_rsize = rsize;
  180. if (wsize < cdata->inline_wsize)
  181. cdata->inline_wsize = wsize;
  182. dprintk("RPC: %s: max send %u, max recv %u\n",
  183. __func__, cdata->inline_wsize, cdata->inline_rsize);
  184. rpcrdma_set_max_header_sizes(r_xprt);
  185. }
  186. static int
  187. rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
  188. {
  189. struct rpcrdma_xprt *xprt = id->context;
  190. struct rpcrdma_ia *ia = &xprt->rx_ia;
  191. struct rpcrdma_ep *ep = &xprt->rx_ep;
  192. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  193. struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
  194. #endif
  195. int connstate = 0;
  196. switch (event->event) {
  197. case RDMA_CM_EVENT_ADDR_RESOLVED:
  198. case RDMA_CM_EVENT_ROUTE_RESOLVED:
  199. ia->ri_async_rc = 0;
  200. complete(&ia->ri_done);
  201. break;
  202. case RDMA_CM_EVENT_ADDR_ERROR:
  203. ia->ri_async_rc = -EHOSTUNREACH;
  204. dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
  205. __func__, ep);
  206. complete(&ia->ri_done);
  207. break;
  208. case RDMA_CM_EVENT_ROUTE_ERROR:
  209. ia->ri_async_rc = -ENETUNREACH;
  210. dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
  211. __func__, ep);
  212. complete(&ia->ri_done);
  213. break;
  214. case RDMA_CM_EVENT_DEVICE_REMOVAL:
  215. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  216. pr_info("rpcrdma: removing device %s for %pIS:%u\n",
  217. ia->ri_device->name,
  218. sap, rpc_get_port(sap));
  219. #endif
  220. set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
  221. ep->rep_connected = -ENODEV;
  222. xprt_force_disconnect(&xprt->rx_xprt);
  223. wait_for_completion(&ia->ri_remove_done);
  224. ia->ri_id = NULL;
  225. ia->ri_pd = NULL;
  226. ia->ri_device = NULL;
  227. /* Return 1 to ensure the core destroys the id. */
  228. return 1;
  229. case RDMA_CM_EVENT_ESTABLISHED:
  230. connstate = 1;
  231. rpcrdma_update_connect_private(xprt, &event->param.conn);
  232. goto connected;
  233. case RDMA_CM_EVENT_CONNECT_ERROR:
  234. connstate = -ENOTCONN;
  235. goto connected;
  236. case RDMA_CM_EVENT_UNREACHABLE:
  237. connstate = -ENETDOWN;
  238. goto connected;
  239. case RDMA_CM_EVENT_REJECTED:
  240. dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n",
  241. sap, rpc_get_port(sap),
  242. rdma_reject_msg(id, event->status));
  243. connstate = -ECONNREFUSED;
  244. if (event->status == IB_CM_REJ_STALE_CONN)
  245. connstate = -EAGAIN;
  246. goto connected;
  247. case RDMA_CM_EVENT_DISCONNECTED:
  248. connstate = -ECONNABORTED;
  249. connected:
  250. xprt->rx_buf.rb_credits = 1;
  251. ep->rep_connected = connstate;
  252. rpcrdma_conn_func(ep);
  253. wake_up_all(&ep->rep_connect_wait);
  254. /*FALLTHROUGH*/
  255. default:
  256. dprintk("RPC: %s: %pIS:%u on %s/%s (ep 0x%p): %s\n",
  257. __func__, sap, rpc_get_port(sap),
  258. ia->ri_device->name, ia->ri_ops->ro_displayname,
  259. ep, rdma_event_msg(event->event));
  260. break;
  261. }
  262. return 0;
  263. }
  264. static struct rdma_cm_id *
  265. rpcrdma_create_id(struct rpcrdma_xprt *xprt,
  266. struct rpcrdma_ia *ia, struct sockaddr *addr)
  267. {
  268. unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
  269. struct rdma_cm_id *id;
  270. int rc;
  271. init_completion(&ia->ri_done);
  272. init_completion(&ia->ri_remove_done);
  273. id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
  274. IB_QPT_RC);
  275. if (IS_ERR(id)) {
  276. rc = PTR_ERR(id);
  277. dprintk("RPC: %s: rdma_create_id() failed %i\n",
  278. __func__, rc);
  279. return id;
  280. }
  281. ia->ri_async_rc = -ETIMEDOUT;
  282. rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
  283. if (rc) {
  284. dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
  285. __func__, rc);
  286. goto out;
  287. }
  288. rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
  289. if (rc < 0) {
  290. dprintk("RPC: %s: wait() exited: %i\n",
  291. __func__, rc);
  292. goto out;
  293. }
  294. rc = ia->ri_async_rc;
  295. if (rc)
  296. goto out;
  297. ia->ri_async_rc = -ETIMEDOUT;
  298. rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
  299. if (rc) {
  300. dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
  301. __func__, rc);
  302. goto out;
  303. }
  304. rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
  305. if (rc < 0) {
  306. dprintk("RPC: %s: wait() exited: %i\n",
  307. __func__, rc);
  308. goto out;
  309. }
  310. rc = ia->ri_async_rc;
  311. if (rc)
  312. goto out;
  313. return id;
  314. out:
  315. rdma_destroy_id(id);
  316. return ERR_PTR(rc);
  317. }
  318. /*
  319. * Exported functions.
  320. */
  321. /**
  322. * rpcrdma_ia_open - Open and initialize an Interface Adapter.
  323. * @xprt: controlling transport
  324. * @addr: IP address of remote peer
  325. *
  326. * Returns 0 on success, negative errno if an appropriate
  327. * Interface Adapter could not be found and opened.
  328. */
  329. int
  330. rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr)
  331. {
  332. struct rpcrdma_ia *ia = &xprt->rx_ia;
  333. int rc;
  334. ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
  335. if (IS_ERR(ia->ri_id)) {
  336. rc = PTR_ERR(ia->ri_id);
  337. goto out_err;
  338. }
  339. ia->ri_device = ia->ri_id->device;
  340. ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
  341. if (IS_ERR(ia->ri_pd)) {
  342. rc = PTR_ERR(ia->ri_pd);
  343. pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
  344. goto out_err;
  345. }
  346. switch (xprt_rdma_memreg_strategy) {
  347. case RPCRDMA_FRMR:
  348. if (frwr_is_supported(ia)) {
  349. ia->ri_ops = &rpcrdma_frwr_memreg_ops;
  350. break;
  351. }
  352. /*FALLTHROUGH*/
  353. case RPCRDMA_MTHCAFMR:
  354. if (fmr_is_supported(ia)) {
  355. ia->ri_ops = &rpcrdma_fmr_memreg_ops;
  356. break;
  357. }
  358. /*FALLTHROUGH*/
  359. default:
  360. pr_err("rpcrdma: Device %s does not support memreg mode %d\n",
  361. ia->ri_device->name, xprt_rdma_memreg_strategy);
  362. rc = -EINVAL;
  363. goto out_err;
  364. }
  365. return 0;
  366. out_err:
  367. rpcrdma_ia_close(ia);
  368. return rc;
  369. }
  370. /**
  371. * rpcrdma_ia_remove - Handle device driver unload
  372. * @ia: interface adapter being removed
  373. *
  374. * Divest transport H/W resources associated with this adapter,
  375. * but allow it to be restored later.
  376. */
  377. void
  378. rpcrdma_ia_remove(struct rpcrdma_ia *ia)
  379. {
  380. struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
  381. rx_ia);
  382. struct rpcrdma_ep *ep = &r_xprt->rx_ep;
  383. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  384. struct rpcrdma_req *req;
  385. struct rpcrdma_rep *rep;
  386. cancel_delayed_work_sync(&buf->rb_refresh_worker);
  387. /* This is similar to rpcrdma_ep_destroy, but:
  388. * - Don't cancel the connect worker.
  389. * - Don't call rpcrdma_ep_disconnect, which waits
  390. * for another conn upcall, which will deadlock.
  391. * - rdma_disconnect is unneeded, the underlying
  392. * connection is already gone.
  393. */
  394. if (ia->ri_id->qp) {
  395. ib_drain_qp(ia->ri_id->qp);
  396. rdma_destroy_qp(ia->ri_id);
  397. ia->ri_id->qp = NULL;
  398. }
  399. ib_free_cq(ep->rep_attr.recv_cq);
  400. ib_free_cq(ep->rep_attr.send_cq);
  401. /* The ULP is responsible for ensuring all DMA
  402. * mappings and MRs are gone.
  403. */
  404. list_for_each_entry(rep, &buf->rb_recv_bufs, rr_list)
  405. rpcrdma_dma_unmap_regbuf(rep->rr_rdmabuf);
  406. list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
  407. rpcrdma_dma_unmap_regbuf(req->rl_rdmabuf);
  408. rpcrdma_dma_unmap_regbuf(req->rl_sendbuf);
  409. rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
  410. }
  411. rpcrdma_destroy_mrs(buf);
  412. /* Allow waiters to continue */
  413. complete(&ia->ri_remove_done);
  414. }
  415. /**
  416. * rpcrdma_ia_close - Clean up/close an IA.
  417. * @ia: interface adapter to close
  418. *
  419. */
  420. void
  421. rpcrdma_ia_close(struct rpcrdma_ia *ia)
  422. {
  423. dprintk("RPC: %s: entering\n", __func__);
  424. if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
  425. if (ia->ri_id->qp)
  426. rdma_destroy_qp(ia->ri_id);
  427. rdma_destroy_id(ia->ri_id);
  428. }
  429. ia->ri_id = NULL;
  430. ia->ri_device = NULL;
  431. /* If the pd is still busy, xprtrdma missed freeing a resource */
  432. if (ia->ri_pd && !IS_ERR(ia->ri_pd))
  433. ib_dealloc_pd(ia->ri_pd);
  434. ia->ri_pd = NULL;
  435. }
  436. /*
  437. * Create unconnected endpoint.
  438. */
  439. int
  440. rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
  441. struct rpcrdma_create_data_internal *cdata)
  442. {
  443. struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
  444. unsigned int max_qp_wr, max_sge;
  445. struct ib_cq *sendcq, *recvcq;
  446. int rc;
  447. max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge,
  448. RPCRDMA_MAX_SEND_SGES);
  449. if (max_sge < RPCRDMA_MIN_SEND_SGES) {
  450. pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
  451. return -ENOMEM;
  452. }
  453. ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES;
  454. if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
  455. dprintk("RPC: %s: insufficient wqe's available\n",
  456. __func__);
  457. return -ENOMEM;
  458. }
  459. max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
  460. /* check provider's send/recv wr limits */
  461. if (cdata->max_requests > max_qp_wr)
  462. cdata->max_requests = max_qp_wr;
  463. ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
  464. ep->rep_attr.qp_context = ep;
  465. ep->rep_attr.srq = NULL;
  466. ep->rep_attr.cap.max_send_wr = cdata->max_requests;
  467. ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
  468. ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */
  469. rc = ia->ri_ops->ro_open(ia, ep, cdata);
  470. if (rc)
  471. return rc;
  472. ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
  473. ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
  474. ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */
  475. ep->rep_attr.cap.max_send_sge = max_sge;
  476. ep->rep_attr.cap.max_recv_sge = 1;
  477. ep->rep_attr.cap.max_inline_data = 0;
  478. ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
  479. ep->rep_attr.qp_type = IB_QPT_RC;
  480. ep->rep_attr.port_num = ~0;
  481. dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
  482. "iovs: send %d recv %d\n",
  483. __func__,
  484. ep->rep_attr.cap.max_send_wr,
  485. ep->rep_attr.cap.max_recv_wr,
  486. ep->rep_attr.cap.max_send_sge,
  487. ep->rep_attr.cap.max_recv_sge);
  488. /* set trigger for requesting send completion */
  489. ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
  490. cdata->max_requests >> 2);
  491. ep->rep_send_count = ep->rep_send_batch;
  492. init_waitqueue_head(&ep->rep_connect_wait);
  493. INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
  494. sendcq = ib_alloc_cq(ia->ri_device, NULL,
  495. ep->rep_attr.cap.max_send_wr + 1,
  496. 1, IB_POLL_WORKQUEUE);
  497. if (IS_ERR(sendcq)) {
  498. rc = PTR_ERR(sendcq);
  499. dprintk("RPC: %s: failed to create send CQ: %i\n",
  500. __func__, rc);
  501. goto out1;
  502. }
  503. recvcq = ib_alloc_cq(ia->ri_device, NULL,
  504. ep->rep_attr.cap.max_recv_wr + 1,
  505. 0, IB_POLL_WORKQUEUE);
  506. if (IS_ERR(recvcq)) {
  507. rc = PTR_ERR(recvcq);
  508. dprintk("RPC: %s: failed to create recv CQ: %i\n",
  509. __func__, rc);
  510. goto out2;
  511. }
  512. ep->rep_attr.send_cq = sendcq;
  513. ep->rep_attr.recv_cq = recvcq;
  514. /* Initialize cma parameters */
  515. memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
  516. /* Prepare RDMA-CM private message */
  517. pmsg->cp_magic = rpcrdma_cmp_magic;
  518. pmsg->cp_version = RPCRDMA_CMP_VERSION;
  519. pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
  520. pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
  521. pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
  522. ep->rep_remote_cma.private_data = pmsg;
  523. ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
  524. /* Client offers RDMA Read but does not initiate */
  525. ep->rep_remote_cma.initiator_depth = 0;
  526. if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
  527. ep->rep_remote_cma.responder_resources = 32;
  528. else
  529. ep->rep_remote_cma.responder_resources =
  530. ia->ri_device->attrs.max_qp_rd_atom;
  531. /* Limit transport retries so client can detect server
  532. * GID changes quickly. RPC layer handles re-establishing
  533. * transport connection and retransmission.
  534. */
  535. ep->rep_remote_cma.retry_count = 6;
  536. /* RPC-over-RDMA handles its own flow control. In addition,
  537. * make all RNR NAKs visible so we know that RPC-over-RDMA
  538. * flow control is working correctly (no NAKs should be seen).
  539. */
  540. ep->rep_remote_cma.flow_control = 0;
  541. ep->rep_remote_cma.rnr_retry_count = 0;
  542. return 0;
  543. out2:
  544. ib_free_cq(sendcq);
  545. out1:
  546. return rc;
  547. }
  548. /*
  549. * rpcrdma_ep_destroy
  550. *
  551. * Disconnect and destroy endpoint. After this, the only
  552. * valid operations on the ep are to free it (if dynamically
  553. * allocated) or re-create it.
  554. */
  555. void
  556. rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  557. {
  558. dprintk("RPC: %s: entering, connected is %d\n",
  559. __func__, ep->rep_connected);
  560. cancel_delayed_work_sync(&ep->rep_connect_worker);
  561. if (ia->ri_id->qp) {
  562. rpcrdma_ep_disconnect(ep, ia);
  563. rdma_destroy_qp(ia->ri_id);
  564. ia->ri_id->qp = NULL;
  565. }
  566. ib_free_cq(ep->rep_attr.recv_cq);
  567. ib_free_cq(ep->rep_attr.send_cq);
  568. }
  569. /* Re-establish a connection after a device removal event.
  570. * Unlike a normal reconnection, a fresh PD and a new set
  571. * of MRs and buffers is needed.
  572. */
  573. static int
  574. rpcrdma_ep_recreate_xprt(struct rpcrdma_xprt *r_xprt,
  575. struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  576. {
  577. struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr;
  578. int rc, err;
  579. pr_info("%s: r_xprt = %p\n", __func__, r_xprt);
  580. rc = -EHOSTUNREACH;
  581. if (rpcrdma_ia_open(r_xprt, sap))
  582. goto out1;
  583. rc = -ENOMEM;
  584. err = rpcrdma_ep_create(ep, ia, &r_xprt->rx_data);
  585. if (err) {
  586. pr_err("rpcrdma: rpcrdma_ep_create returned %d\n", err);
  587. goto out2;
  588. }
  589. rc = -ENETUNREACH;
  590. err = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
  591. if (err) {
  592. pr_err("rpcrdma: rdma_create_qp returned %d\n", err);
  593. goto out3;
  594. }
  595. rpcrdma_create_mrs(r_xprt);
  596. return 0;
  597. out3:
  598. rpcrdma_ep_destroy(ep, ia);
  599. out2:
  600. rpcrdma_ia_close(ia);
  601. out1:
  602. return rc;
  603. }
  604. static int
  605. rpcrdma_ep_reconnect(struct rpcrdma_xprt *r_xprt, struct rpcrdma_ep *ep,
  606. struct rpcrdma_ia *ia)
  607. {
  608. struct sockaddr *sap = (struct sockaddr *)&r_xprt->rx_data.addr;
  609. struct rdma_cm_id *id, *old;
  610. int err, rc;
  611. dprintk("RPC: %s: reconnecting...\n", __func__);
  612. rpcrdma_ep_disconnect(ep, ia);
  613. rc = -EHOSTUNREACH;
  614. id = rpcrdma_create_id(r_xprt, ia, sap);
  615. if (IS_ERR(id))
  616. goto out;
  617. /* As long as the new ID points to the same device as the
  618. * old ID, we can reuse the transport's existing PD and all
  619. * previously allocated MRs. Also, the same device means
  620. * the transport's previous DMA mappings are still valid.
  621. *
  622. * This is a sanity check only. There should be no way these
  623. * point to two different devices here.
  624. */
  625. old = id;
  626. rc = -ENETUNREACH;
  627. if (ia->ri_device != id->device) {
  628. pr_err("rpcrdma: can't reconnect on different device!\n");
  629. goto out_destroy;
  630. }
  631. err = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
  632. if (err) {
  633. dprintk("RPC: %s: rdma_create_qp returned %d\n",
  634. __func__, err);
  635. goto out_destroy;
  636. }
  637. /* Atomically replace the transport's ID and QP. */
  638. rc = 0;
  639. old = ia->ri_id;
  640. ia->ri_id = id;
  641. rdma_destroy_qp(old);
  642. out_destroy:
  643. rdma_destroy_id(old);
  644. out:
  645. return rc;
  646. }
  647. /*
  648. * Connect unconnected endpoint.
  649. */
  650. int
  651. rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  652. {
  653. struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
  654. rx_ia);
  655. unsigned int extras;
  656. int rc;
  657. retry:
  658. switch (ep->rep_connected) {
  659. case 0:
  660. dprintk("RPC: %s: connecting...\n", __func__);
  661. rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
  662. if (rc) {
  663. dprintk("RPC: %s: rdma_create_qp failed %i\n",
  664. __func__, rc);
  665. rc = -ENETUNREACH;
  666. goto out_noupdate;
  667. }
  668. break;
  669. case -ENODEV:
  670. rc = rpcrdma_ep_recreate_xprt(r_xprt, ep, ia);
  671. if (rc)
  672. goto out_noupdate;
  673. break;
  674. default:
  675. rc = rpcrdma_ep_reconnect(r_xprt, ep, ia);
  676. if (rc)
  677. goto out;
  678. }
  679. ep->rep_connected = 0;
  680. rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
  681. if (rc) {
  682. dprintk("RPC: %s: rdma_connect() failed with %i\n",
  683. __func__, rc);
  684. goto out;
  685. }
  686. wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
  687. if (ep->rep_connected <= 0) {
  688. if (ep->rep_connected == -EAGAIN)
  689. goto retry;
  690. rc = ep->rep_connected;
  691. goto out;
  692. }
  693. dprintk("RPC: %s: connected\n", __func__);
  694. extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
  695. if (extras)
  696. rpcrdma_ep_post_extra_recv(r_xprt, extras);
  697. out:
  698. if (rc)
  699. ep->rep_connected = rc;
  700. out_noupdate:
  701. return rc;
  702. }
  703. /*
  704. * rpcrdma_ep_disconnect
  705. *
  706. * This is separate from destroy to facilitate the ability
  707. * to reconnect without recreating the endpoint.
  708. *
  709. * This call is not reentrant, and must not be made in parallel
  710. * on the same endpoint.
  711. */
  712. void
  713. rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
  714. {
  715. int rc;
  716. rc = rdma_disconnect(ia->ri_id);
  717. if (!rc) {
  718. /* returns without wait if not connected */
  719. wait_event_interruptible(ep->rep_connect_wait,
  720. ep->rep_connected != 1);
  721. dprintk("RPC: %s: after wait, %sconnected\n", __func__,
  722. (ep->rep_connected == 1) ? "still " : "dis");
  723. } else {
  724. dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
  725. ep->rep_connected = rc;
  726. }
  727. ib_drain_qp(ia->ri_id->qp);
  728. }
  729. /* Fixed-size circular FIFO queue. This implementation is wait-free and
  730. * lock-free.
  731. *
  732. * Consumer is the code path that posts Sends. This path dequeues a
  733. * sendctx for use by a Send operation. Multiple consumer threads
  734. * are serialized by the RPC transport lock, which allows only one
  735. * ->send_request call at a time.
  736. *
  737. * Producer is the code path that handles Send completions. This path
  738. * enqueues a sendctx that has been completed. Multiple producer
  739. * threads are serialized by the ib_poll_cq() function.
  740. */
  741. /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
  742. * queue activity, and ib_drain_qp has flushed all remaining Send
  743. * requests.
  744. */
  745. static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
  746. {
  747. unsigned long i;
  748. for (i = 0; i <= buf->rb_sc_last; i++)
  749. kfree(buf->rb_sc_ctxs[i]);
  750. kfree(buf->rb_sc_ctxs);
  751. }
  752. static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
  753. {
  754. struct rpcrdma_sendctx *sc;
  755. sc = kzalloc(sizeof(*sc) +
  756. ia->ri_max_send_sges * sizeof(struct ib_sge),
  757. GFP_KERNEL);
  758. if (!sc)
  759. return NULL;
  760. sc->sc_wr.wr_cqe = &sc->sc_cqe;
  761. sc->sc_wr.sg_list = sc->sc_sges;
  762. sc->sc_wr.opcode = IB_WR_SEND;
  763. sc->sc_cqe.done = rpcrdma_wc_send;
  764. return sc;
  765. }
  766. static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
  767. {
  768. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  769. struct rpcrdma_sendctx *sc;
  770. unsigned long i;
  771. /* Maximum number of concurrent outstanding Send WRs. Capping
  772. * the circular queue size stops Send Queue overflow by causing
  773. * the ->send_request call to fail temporarily before too many
  774. * Sends are posted.
  775. */
  776. i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
  777. dprintk("RPC: %s: allocating %lu send_ctxs\n", __func__, i);
  778. buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
  779. if (!buf->rb_sc_ctxs)
  780. return -ENOMEM;
  781. buf->rb_sc_last = i - 1;
  782. for (i = 0; i <= buf->rb_sc_last; i++) {
  783. sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
  784. if (!sc)
  785. goto out_destroy;
  786. sc->sc_xprt = r_xprt;
  787. buf->rb_sc_ctxs[i] = sc;
  788. }
  789. return 0;
  790. out_destroy:
  791. rpcrdma_sendctxs_destroy(buf);
  792. return -ENOMEM;
  793. }
  794. /* The sendctx queue is not guaranteed to have a size that is a
  795. * power of two, thus the helpers in circ_buf.h cannot be used.
  796. * The other option is to use modulus (%), which can be expensive.
  797. */
  798. static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
  799. unsigned long item)
  800. {
  801. return likely(item < buf->rb_sc_last) ? item + 1 : 0;
  802. }
  803. /**
  804. * rpcrdma_sendctx_get_locked - Acquire a send context
  805. * @buf: transport buffers from which to acquire an unused context
  806. *
  807. * Returns pointer to a free send completion context; or NULL if
  808. * the queue is empty.
  809. *
  810. * Usage: Called to acquire an SGE array before preparing a Send WR.
  811. *
  812. * The caller serializes calls to this function (per rpcrdma_buffer),
  813. * and provides an effective memory barrier that flushes the new value
  814. * of rb_sc_head.
  815. */
  816. struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
  817. {
  818. struct rpcrdma_xprt *r_xprt;
  819. struct rpcrdma_sendctx *sc;
  820. unsigned long next_head;
  821. next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
  822. if (next_head == READ_ONCE(buf->rb_sc_tail))
  823. goto out_emptyq;
  824. /* ORDER: item must be accessed _before_ head is updated */
  825. sc = buf->rb_sc_ctxs[next_head];
  826. /* Releasing the lock in the caller acts as a memory
  827. * barrier that flushes rb_sc_head.
  828. */
  829. buf->rb_sc_head = next_head;
  830. return sc;
  831. out_emptyq:
  832. /* The queue is "empty" if there have not been enough Send
  833. * completions recently. This is a sign the Send Queue is
  834. * backing up. Cause the caller to pause and try again.
  835. */
  836. dprintk("RPC: %s: empty sendctx queue\n", __func__);
  837. r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
  838. r_xprt->rx_stats.empty_sendctx_q++;
  839. return NULL;
  840. }
  841. /**
  842. * rpcrdma_sendctx_put_locked - Release a send context
  843. * @sc: send context to release
  844. *
  845. * Usage: Called from Send completion to return a sendctxt
  846. * to the queue.
  847. *
  848. * The caller serializes calls to this function (per rpcrdma_buffer).
  849. */
  850. void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
  851. {
  852. struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
  853. unsigned long next_tail;
  854. /* Unmap SGEs of previously completed by unsignaled
  855. * Sends by walking up the queue until @sc is found.
  856. */
  857. next_tail = buf->rb_sc_tail;
  858. do {
  859. next_tail = rpcrdma_sendctx_next(buf, next_tail);
  860. /* ORDER: item must be accessed _before_ tail is updated */
  861. rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
  862. } while (buf->rb_sc_ctxs[next_tail] != sc);
  863. /* Paired with READ_ONCE */
  864. smp_store_release(&buf->rb_sc_tail, next_tail);
  865. }
  866. static void
  867. rpcrdma_mr_recovery_worker(struct work_struct *work)
  868. {
  869. struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
  870. rb_recovery_worker.work);
  871. struct rpcrdma_mw *mw;
  872. spin_lock(&buf->rb_recovery_lock);
  873. while (!list_empty(&buf->rb_stale_mrs)) {
  874. mw = rpcrdma_pop_mw(&buf->rb_stale_mrs);
  875. spin_unlock(&buf->rb_recovery_lock);
  876. dprintk("RPC: %s: recovering MR %p\n", __func__, mw);
  877. mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
  878. spin_lock(&buf->rb_recovery_lock);
  879. }
  880. spin_unlock(&buf->rb_recovery_lock);
  881. }
  882. void
  883. rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
  884. {
  885. struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
  886. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  887. spin_lock(&buf->rb_recovery_lock);
  888. rpcrdma_push_mw(mw, &buf->rb_stale_mrs);
  889. spin_unlock(&buf->rb_recovery_lock);
  890. schedule_delayed_work(&buf->rb_recovery_worker, 0);
  891. }
  892. static void
  893. rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
  894. {
  895. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  896. struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  897. unsigned int count;
  898. LIST_HEAD(free);
  899. LIST_HEAD(all);
  900. for (count = 0; count < 32; count++) {
  901. struct rpcrdma_mw *mw;
  902. int rc;
  903. mw = kzalloc(sizeof(*mw), GFP_KERNEL);
  904. if (!mw)
  905. break;
  906. rc = ia->ri_ops->ro_init_mr(ia, mw);
  907. if (rc) {
  908. kfree(mw);
  909. break;
  910. }
  911. mw->mw_xprt = r_xprt;
  912. list_add(&mw->mw_list, &free);
  913. list_add(&mw->mw_all, &all);
  914. }
  915. spin_lock(&buf->rb_mwlock);
  916. list_splice(&free, &buf->rb_mws);
  917. list_splice(&all, &buf->rb_all);
  918. r_xprt->rx_stats.mrs_allocated += count;
  919. spin_unlock(&buf->rb_mwlock);
  920. dprintk("RPC: %s: created %u MRs\n", __func__, count);
  921. }
  922. static void
  923. rpcrdma_mr_refresh_worker(struct work_struct *work)
  924. {
  925. struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
  926. rb_refresh_worker.work);
  927. struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
  928. rx_buf);
  929. rpcrdma_create_mrs(r_xprt);
  930. }
  931. struct rpcrdma_req *
  932. rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
  933. {
  934. struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
  935. struct rpcrdma_req *req;
  936. req = kzalloc(sizeof(*req), GFP_KERNEL);
  937. if (req == NULL)
  938. return ERR_PTR(-ENOMEM);
  939. spin_lock(&buffer->rb_reqslock);
  940. list_add(&req->rl_all, &buffer->rb_allreqs);
  941. spin_unlock(&buffer->rb_reqslock);
  942. req->rl_buffer = &r_xprt->rx_buf;
  943. INIT_LIST_HEAD(&req->rl_registered);
  944. return req;
  945. }
  946. struct rpcrdma_rep *
  947. rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
  948. {
  949. struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
  950. struct rpcrdma_rep *rep;
  951. int rc;
  952. rc = -ENOMEM;
  953. rep = kzalloc(sizeof(*rep), GFP_KERNEL);
  954. if (rep == NULL)
  955. goto out;
  956. rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
  957. DMA_FROM_DEVICE, GFP_KERNEL);
  958. if (IS_ERR(rep->rr_rdmabuf)) {
  959. rc = PTR_ERR(rep->rr_rdmabuf);
  960. goto out_free;
  961. }
  962. xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
  963. rdmab_length(rep->rr_rdmabuf));
  964. rep->rr_cqe.done = rpcrdma_wc_receive;
  965. rep->rr_rxprt = r_xprt;
  966. INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
  967. rep->rr_recv_wr.next = NULL;
  968. rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
  969. rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
  970. rep->rr_recv_wr.num_sge = 1;
  971. return rep;
  972. out_free:
  973. kfree(rep);
  974. out:
  975. return ERR_PTR(rc);
  976. }
  977. int
  978. rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
  979. {
  980. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  981. int i, rc;
  982. buf->rb_max_requests = r_xprt->rx_data.max_requests;
  983. buf->rb_bc_srv_max_requests = 0;
  984. spin_lock_init(&buf->rb_mwlock);
  985. spin_lock_init(&buf->rb_lock);
  986. spin_lock_init(&buf->rb_recovery_lock);
  987. INIT_LIST_HEAD(&buf->rb_mws);
  988. INIT_LIST_HEAD(&buf->rb_all);
  989. INIT_LIST_HEAD(&buf->rb_stale_mrs);
  990. INIT_DELAYED_WORK(&buf->rb_refresh_worker,
  991. rpcrdma_mr_refresh_worker);
  992. INIT_DELAYED_WORK(&buf->rb_recovery_worker,
  993. rpcrdma_mr_recovery_worker);
  994. rpcrdma_create_mrs(r_xprt);
  995. INIT_LIST_HEAD(&buf->rb_send_bufs);
  996. INIT_LIST_HEAD(&buf->rb_allreqs);
  997. spin_lock_init(&buf->rb_reqslock);
  998. for (i = 0; i < buf->rb_max_requests; i++) {
  999. struct rpcrdma_req *req;
  1000. req = rpcrdma_create_req(r_xprt);
  1001. if (IS_ERR(req)) {
  1002. dprintk("RPC: %s: request buffer %d alloc"
  1003. " failed\n", __func__, i);
  1004. rc = PTR_ERR(req);
  1005. goto out;
  1006. }
  1007. list_add(&req->rl_list, &buf->rb_send_bufs);
  1008. }
  1009. INIT_LIST_HEAD(&buf->rb_recv_bufs);
  1010. for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) {
  1011. struct rpcrdma_rep *rep;
  1012. rep = rpcrdma_create_rep(r_xprt);
  1013. if (IS_ERR(rep)) {
  1014. dprintk("RPC: %s: reply buffer %d alloc failed\n",
  1015. __func__, i);
  1016. rc = PTR_ERR(rep);
  1017. goto out;
  1018. }
  1019. list_add(&rep->rr_list, &buf->rb_recv_bufs);
  1020. }
  1021. rc = rpcrdma_sendctxs_create(r_xprt);
  1022. if (rc)
  1023. goto out;
  1024. return 0;
  1025. out:
  1026. rpcrdma_buffer_destroy(buf);
  1027. return rc;
  1028. }
  1029. static struct rpcrdma_req *
  1030. rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
  1031. {
  1032. struct rpcrdma_req *req;
  1033. req = list_first_entry(&buf->rb_send_bufs,
  1034. struct rpcrdma_req, rl_list);
  1035. list_del_init(&req->rl_list);
  1036. return req;
  1037. }
  1038. static struct rpcrdma_rep *
  1039. rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
  1040. {
  1041. struct rpcrdma_rep *rep;
  1042. rep = list_first_entry(&buf->rb_recv_bufs,
  1043. struct rpcrdma_rep, rr_list);
  1044. list_del(&rep->rr_list);
  1045. return rep;
  1046. }
  1047. static void
  1048. rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
  1049. {
  1050. rpcrdma_free_regbuf(rep->rr_rdmabuf);
  1051. kfree(rep);
  1052. }
  1053. void
  1054. rpcrdma_destroy_req(struct rpcrdma_req *req)
  1055. {
  1056. rpcrdma_free_regbuf(req->rl_recvbuf);
  1057. rpcrdma_free_regbuf(req->rl_sendbuf);
  1058. rpcrdma_free_regbuf(req->rl_rdmabuf);
  1059. kfree(req);
  1060. }
  1061. static void
  1062. rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
  1063. {
  1064. struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
  1065. rx_buf);
  1066. struct rpcrdma_ia *ia = rdmab_to_ia(buf);
  1067. struct rpcrdma_mw *mw;
  1068. unsigned int count;
  1069. count = 0;
  1070. spin_lock(&buf->rb_mwlock);
  1071. while (!list_empty(&buf->rb_all)) {
  1072. mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
  1073. list_del(&mw->mw_all);
  1074. spin_unlock(&buf->rb_mwlock);
  1075. ia->ri_ops->ro_release_mr(mw);
  1076. count++;
  1077. spin_lock(&buf->rb_mwlock);
  1078. }
  1079. spin_unlock(&buf->rb_mwlock);
  1080. r_xprt->rx_stats.mrs_allocated = 0;
  1081. dprintk("RPC: %s: released %u MRs\n", __func__, count);
  1082. }
  1083. void
  1084. rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
  1085. {
  1086. cancel_delayed_work_sync(&buf->rb_recovery_worker);
  1087. cancel_delayed_work_sync(&buf->rb_refresh_worker);
  1088. rpcrdma_sendctxs_destroy(buf);
  1089. while (!list_empty(&buf->rb_recv_bufs)) {
  1090. struct rpcrdma_rep *rep;
  1091. rep = rpcrdma_buffer_get_rep_locked(buf);
  1092. rpcrdma_destroy_rep(rep);
  1093. }
  1094. buf->rb_send_count = 0;
  1095. spin_lock(&buf->rb_reqslock);
  1096. while (!list_empty(&buf->rb_allreqs)) {
  1097. struct rpcrdma_req *req;
  1098. req = list_first_entry(&buf->rb_allreqs,
  1099. struct rpcrdma_req, rl_all);
  1100. list_del(&req->rl_all);
  1101. spin_unlock(&buf->rb_reqslock);
  1102. rpcrdma_destroy_req(req);
  1103. spin_lock(&buf->rb_reqslock);
  1104. }
  1105. spin_unlock(&buf->rb_reqslock);
  1106. buf->rb_recv_count = 0;
  1107. rpcrdma_destroy_mrs(buf);
  1108. }
  1109. struct rpcrdma_mw *
  1110. rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
  1111. {
  1112. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  1113. struct rpcrdma_mw *mw = NULL;
  1114. spin_lock(&buf->rb_mwlock);
  1115. if (!list_empty(&buf->rb_mws))
  1116. mw = rpcrdma_pop_mw(&buf->rb_mws);
  1117. spin_unlock(&buf->rb_mwlock);
  1118. if (!mw)
  1119. goto out_nomws;
  1120. mw->mw_flags = 0;
  1121. return mw;
  1122. out_nomws:
  1123. dprintk("RPC: %s: no MWs available\n", __func__);
  1124. if (r_xprt->rx_ep.rep_connected != -ENODEV)
  1125. schedule_delayed_work(&buf->rb_refresh_worker, 0);
  1126. /* Allow the reply handler and refresh worker to run */
  1127. cond_resched();
  1128. return NULL;
  1129. }
  1130. void
  1131. rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
  1132. {
  1133. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  1134. spin_lock(&buf->rb_mwlock);
  1135. rpcrdma_push_mw(mw, &buf->rb_mws);
  1136. spin_unlock(&buf->rb_mwlock);
  1137. }
  1138. static struct rpcrdma_rep *
  1139. rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
  1140. {
  1141. /* If an RPC previously completed without a reply (say, a
  1142. * credential problem or a soft timeout occurs) then hold off
  1143. * on supplying more Receive buffers until the number of new
  1144. * pending RPCs catches up to the number of posted Receives.
  1145. */
  1146. if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
  1147. return NULL;
  1148. if (unlikely(list_empty(&buffers->rb_recv_bufs)))
  1149. return NULL;
  1150. buffers->rb_recv_count++;
  1151. return rpcrdma_buffer_get_rep_locked(buffers);
  1152. }
  1153. /*
  1154. * Get a set of request/reply buffers.
  1155. *
  1156. * Reply buffer (if available) is attached to send buffer upon return.
  1157. */
  1158. struct rpcrdma_req *
  1159. rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  1160. {
  1161. struct rpcrdma_req *req;
  1162. spin_lock(&buffers->rb_lock);
  1163. if (list_empty(&buffers->rb_send_bufs))
  1164. goto out_reqbuf;
  1165. buffers->rb_send_count++;
  1166. req = rpcrdma_buffer_get_req_locked(buffers);
  1167. req->rl_reply = rpcrdma_buffer_get_rep(buffers);
  1168. spin_unlock(&buffers->rb_lock);
  1169. return req;
  1170. out_reqbuf:
  1171. spin_unlock(&buffers->rb_lock);
  1172. pr_warn("RPC: %s: out of request buffers\n", __func__);
  1173. return NULL;
  1174. }
  1175. /*
  1176. * Put request/reply buffers back into pool.
  1177. * Pre-decrement counter/array index.
  1178. */
  1179. void
  1180. rpcrdma_buffer_put(struct rpcrdma_req *req)
  1181. {
  1182. struct rpcrdma_buffer *buffers = req->rl_buffer;
  1183. struct rpcrdma_rep *rep = req->rl_reply;
  1184. req->rl_reply = NULL;
  1185. spin_lock(&buffers->rb_lock);
  1186. buffers->rb_send_count--;
  1187. list_add_tail(&req->rl_list, &buffers->rb_send_bufs);
  1188. if (rep) {
  1189. buffers->rb_recv_count--;
  1190. list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
  1191. }
  1192. spin_unlock(&buffers->rb_lock);
  1193. }
  1194. /*
  1195. * Recover reply buffers from pool.
  1196. * This happens when recovering from disconnect.
  1197. */
  1198. void
  1199. rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
  1200. {
  1201. struct rpcrdma_buffer *buffers = req->rl_buffer;
  1202. spin_lock(&buffers->rb_lock);
  1203. req->rl_reply = rpcrdma_buffer_get_rep(buffers);
  1204. spin_unlock(&buffers->rb_lock);
  1205. }
  1206. /*
  1207. * Put reply buffers back into pool when not attached to
  1208. * request. This happens in error conditions.
  1209. */
  1210. void
  1211. rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
  1212. {
  1213. struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
  1214. spin_lock(&buffers->rb_lock);
  1215. buffers->rb_recv_count--;
  1216. list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
  1217. spin_unlock(&buffers->rb_lock);
  1218. }
  1219. /**
  1220. * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
  1221. * @size: size of buffer to be allocated, in bytes
  1222. * @direction: direction of data movement
  1223. * @flags: GFP flags
  1224. *
  1225. * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
  1226. * can be persistently DMA-mapped for I/O.
  1227. *
  1228. * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
  1229. * receiving the payload of RDMA RECV operations. During Long Calls
  1230. * or Replies they may be registered externally via ro_map.
  1231. */
  1232. struct rpcrdma_regbuf *
  1233. rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
  1234. gfp_t flags)
  1235. {
  1236. struct rpcrdma_regbuf *rb;
  1237. rb = kmalloc(sizeof(*rb) + size, flags);
  1238. if (rb == NULL)
  1239. return ERR_PTR(-ENOMEM);
  1240. rb->rg_device = NULL;
  1241. rb->rg_direction = direction;
  1242. rb->rg_iov.length = size;
  1243. return rb;
  1244. }
  1245. /**
  1246. * __rpcrdma_map_regbuf - DMA-map a regbuf
  1247. * @ia: controlling rpcrdma_ia
  1248. * @rb: regbuf to be mapped
  1249. */
  1250. bool
  1251. __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
  1252. {
  1253. struct ib_device *device = ia->ri_device;
  1254. if (rb->rg_direction == DMA_NONE)
  1255. return false;
  1256. rb->rg_iov.addr = ib_dma_map_single(device,
  1257. (void *)rb->rg_base,
  1258. rdmab_length(rb),
  1259. rb->rg_direction);
  1260. if (ib_dma_mapping_error(device, rdmab_addr(rb)))
  1261. return false;
  1262. rb->rg_device = device;
  1263. rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
  1264. return true;
  1265. }
  1266. static void
  1267. rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
  1268. {
  1269. if (!rpcrdma_regbuf_is_mapped(rb))
  1270. return;
  1271. ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
  1272. rdmab_length(rb), rb->rg_direction);
  1273. rb->rg_device = NULL;
  1274. }
  1275. /**
  1276. * rpcrdma_free_regbuf - deregister and free registered buffer
  1277. * @rb: regbuf to be deregistered and freed
  1278. */
  1279. void
  1280. rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
  1281. {
  1282. if (!rb)
  1283. return;
  1284. rpcrdma_dma_unmap_regbuf(rb);
  1285. kfree(rb);
  1286. }
  1287. /*
  1288. * Prepost any receive buffer, then post send.
  1289. *
  1290. * Receive buffer is donated to hardware, reclaimed upon recv completion.
  1291. */
  1292. int
  1293. rpcrdma_ep_post(struct rpcrdma_ia *ia,
  1294. struct rpcrdma_ep *ep,
  1295. struct rpcrdma_req *req)
  1296. {
  1297. struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
  1298. struct ib_send_wr *send_wr_fail;
  1299. int rc;
  1300. if (req->rl_reply) {
  1301. rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
  1302. if (rc)
  1303. return rc;
  1304. req->rl_reply = NULL;
  1305. }
  1306. dprintk("RPC: %s: posting %d s/g entries\n",
  1307. __func__, send_wr->num_sge);
  1308. if (!ep->rep_send_count ||
  1309. test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
  1310. send_wr->send_flags |= IB_SEND_SIGNALED;
  1311. ep->rep_send_count = ep->rep_send_batch;
  1312. } else {
  1313. send_wr->send_flags &= ~IB_SEND_SIGNALED;
  1314. --ep->rep_send_count;
  1315. }
  1316. rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
  1317. if (rc)
  1318. goto out_postsend_err;
  1319. return 0;
  1320. out_postsend_err:
  1321. pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
  1322. return -ENOTCONN;
  1323. }
  1324. int
  1325. rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
  1326. struct rpcrdma_rep *rep)
  1327. {
  1328. struct ib_recv_wr *recv_wr_fail;
  1329. int rc;
  1330. if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
  1331. goto out_map;
  1332. rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
  1333. if (rc)
  1334. goto out_postrecv;
  1335. return 0;
  1336. out_map:
  1337. pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
  1338. return -EIO;
  1339. out_postrecv:
  1340. pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
  1341. return -ENOTCONN;
  1342. }
  1343. /**
  1344. * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
  1345. * @r_xprt: transport associated with these backchannel resources
  1346. * @min_reqs: minimum number of incoming requests expected
  1347. *
  1348. * Returns zero if all requested buffers were posted, or a negative errno.
  1349. */
  1350. int
  1351. rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
  1352. {
  1353. struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
  1354. struct rpcrdma_ia *ia = &r_xprt->rx_ia;
  1355. struct rpcrdma_rep *rep;
  1356. int rc;
  1357. while (count--) {
  1358. spin_lock(&buffers->rb_lock);
  1359. if (list_empty(&buffers->rb_recv_bufs))
  1360. goto out_reqbuf;
  1361. rep = rpcrdma_buffer_get_rep_locked(buffers);
  1362. spin_unlock(&buffers->rb_lock);
  1363. rc = rpcrdma_ep_post_recv(ia, rep);
  1364. if (rc)
  1365. goto out_rc;
  1366. }
  1367. return 0;
  1368. out_reqbuf:
  1369. spin_unlock(&buffers->rb_lock);
  1370. pr_warn("%s: no extra receive buffers\n", __func__);
  1371. return -ENOMEM;
  1372. out_rc:
  1373. rpcrdma_recv_buffer_put(rep);
  1374. return rc;
  1375. }