xprt.c 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936
  1. /*
  2. * linux/net/sunrpc/xprt.c
  3. *
  4. * This is a generic RPC call interface supporting congestion avoidance,
  5. * and asynchronous calls.
  6. *
  7. * The interface works like this:
  8. *
  9. * - When a process places a call, it allocates a request slot if
  10. * one is available. Otherwise, it sleeps on the backlog queue
  11. * (xprt_reserve).
  12. * - Next, the caller puts together the RPC message, stuffs it into
  13. * the request struct, and calls xprt_transmit().
  14. * - xprt_transmit sends the message and installs the caller on the
  15. * transport's wait list. At the same time, if a reply is expected,
  16. * it installs a timer that is run after the packet's timeout has
  17. * expired.
  18. * - When a packet arrives, the data_ready handler walks the list of
  19. * pending requests for that transport. If a matching XID is found, the
  20. * caller is woken up, and the timer removed.
  21. * - When no reply arrives within the timeout interval, the timer is
  22. * fired by the kernel and runs xprt_timer(). It either adjusts the
  23. * timeout values (minor timeout) or wakes up the caller with a status
  24. * of -ETIMEDOUT.
  25. * - When the caller receives a notification from RPC that a reply arrived,
  26. * it should release the RPC slot, and process the reply.
  27. * If the call timed out, it may choose to retry the operation by
  28. * adjusting the initial timeout value, and simply calling rpc_call
  29. * again.
  30. *
  31. * Support for async RPC is done through a set of RPC-specific scheduling
  32. * primitives that `transparently' work for processes as well as async
  33. * tasks that rely on callbacks.
  34. *
  35. * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
  36. *
  37. * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
  38. */
  39. #include <linux/module.h>
  40. #include <linux/types.h>
  41. #include <linux/interrupt.h>
  42. #include <linux/workqueue.h>
  43. #include <linux/net.h>
  44. #include <linux/ktime.h>
  45. #include <linux/sunrpc/clnt.h>
  46. #include <linux/sunrpc/metrics.h>
  47. #include <linux/sunrpc/bc_xprt.h>
  48. #include <linux/rcupdate.h>
  49. #include <trace/events/sunrpc.h>
  50. #include "sunrpc.h"
  51. /*
  52. * Local variables
  53. */
  54. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  55. # define RPCDBG_FACILITY RPCDBG_XPRT
  56. #endif
  57. /*
  58. * Local functions
  59. */
  60. static void xprt_init(struct rpc_xprt *xprt, struct net *net);
  61. static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
  62. static void xprt_connect_status(struct rpc_task *task);
  63. static void xprt_destroy(struct rpc_xprt *xprt);
  64. static DEFINE_SPINLOCK(xprt_list_lock);
  65. static LIST_HEAD(xprt_list);
  66. /**
  67. * xprt_register_transport - register a transport implementation
  68. * @transport: transport to register
  69. *
  70. * If a transport implementation is loaded as a kernel module, it can
  71. * call this interface to make itself known to the RPC client.
  72. *
  73. * Returns:
  74. * 0: transport successfully registered
  75. * -EEXIST: transport already registered
  76. * -EINVAL: transport module being unloaded
  77. */
  78. int xprt_register_transport(struct xprt_class *transport)
  79. {
  80. struct xprt_class *t;
  81. int result;
  82. result = -EEXIST;
  83. spin_lock(&xprt_list_lock);
  84. list_for_each_entry(t, &xprt_list, list) {
  85. /* don't register the same transport class twice */
  86. if (t->ident == transport->ident)
  87. goto out;
  88. }
  89. list_add_tail(&transport->list, &xprt_list);
  90. printk(KERN_INFO "RPC: Registered %s transport module.\n",
  91. transport->name);
  92. result = 0;
  93. out:
  94. spin_unlock(&xprt_list_lock);
  95. return result;
  96. }
  97. EXPORT_SYMBOL_GPL(xprt_register_transport);
  98. /**
  99. * xprt_unregister_transport - unregister a transport implementation
  100. * @transport: transport to unregister
  101. *
  102. * Returns:
  103. * 0: transport successfully unregistered
  104. * -ENOENT: transport never registered
  105. */
  106. int xprt_unregister_transport(struct xprt_class *transport)
  107. {
  108. struct xprt_class *t;
  109. int result;
  110. result = 0;
  111. spin_lock(&xprt_list_lock);
  112. list_for_each_entry(t, &xprt_list, list) {
  113. if (t == transport) {
  114. printk(KERN_INFO
  115. "RPC: Unregistered %s transport module.\n",
  116. transport->name);
  117. list_del_init(&transport->list);
  118. goto out;
  119. }
  120. }
  121. result = -ENOENT;
  122. out:
  123. spin_unlock(&xprt_list_lock);
  124. return result;
  125. }
  126. EXPORT_SYMBOL_GPL(xprt_unregister_transport);
  127. /**
  128. * xprt_load_transport - load a transport implementation
  129. * @transport_name: transport to load
  130. *
  131. * Returns:
  132. * 0: transport successfully loaded
  133. * -ENOENT: transport module not available
  134. */
  135. int xprt_load_transport(const char *transport_name)
  136. {
  137. struct xprt_class *t;
  138. int result;
  139. result = 0;
  140. spin_lock(&xprt_list_lock);
  141. list_for_each_entry(t, &xprt_list, list) {
  142. if (strcmp(t->name, transport_name) == 0) {
  143. spin_unlock(&xprt_list_lock);
  144. goto out;
  145. }
  146. }
  147. spin_unlock(&xprt_list_lock);
  148. result = request_module("xprt%s", transport_name);
  149. out:
  150. return result;
  151. }
  152. EXPORT_SYMBOL_GPL(xprt_load_transport);
  153. static void xprt_clear_locked(struct rpc_xprt *xprt)
  154. {
  155. xprt->snd_task = NULL;
  156. if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
  157. smp_mb__before_atomic();
  158. clear_bit(XPRT_LOCKED, &xprt->state);
  159. smp_mb__after_atomic();
  160. } else
  161. queue_work(xprtiod_workqueue, &xprt->task_cleanup);
  162. }
  163. /**
  164. * xprt_reserve_xprt - serialize write access to transports
  165. * @task: task that is requesting access to the transport
  166. * @xprt: pointer to the target transport
  167. *
  168. * This prevents mixing the payload of separate requests, and prevents
  169. * transport connects from colliding with writes. No congestion control
  170. * is provided.
  171. */
  172. int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
  173. {
  174. struct rpc_rqst *req = task->tk_rqstp;
  175. if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
  176. if (task == xprt->snd_task)
  177. return 1;
  178. goto out_sleep;
  179. }
  180. if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
  181. goto out_unlock;
  182. xprt->snd_task = task;
  183. return 1;
  184. out_unlock:
  185. xprt_clear_locked(xprt);
  186. out_sleep:
  187. dprintk("RPC: %5u failed to lock transport %p\n",
  188. task->tk_pid, xprt);
  189. task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
  190. task->tk_status = -EAGAIN;
  191. rpc_sleep_on(&xprt->sending, task, NULL);
  192. return 0;
  193. }
  194. EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
  195. static bool
  196. xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
  197. {
  198. return test_bit(XPRT_CWND_WAIT, &xprt->state);
  199. }
  200. static void
  201. xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
  202. {
  203. if (!list_empty(&xprt->xmit_queue)) {
  204. /* Peek at head of queue to see if it can make progress */
  205. if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
  206. rq_xmit)->rq_cong)
  207. return;
  208. }
  209. set_bit(XPRT_CWND_WAIT, &xprt->state);
  210. }
  211. static void
  212. xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
  213. {
  214. if (!RPCXPRT_CONGESTED(xprt))
  215. clear_bit(XPRT_CWND_WAIT, &xprt->state);
  216. }
  217. /*
  218. * xprt_reserve_xprt_cong - serialize write access to transports
  219. * @task: task that is requesting access to the transport
  220. *
  221. * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
  222. * integrated into the decision of whether a request is allowed to be
  223. * woken up and given access to the transport.
  224. * Note that the lock is only granted if we know there are free slots.
  225. */
  226. int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  227. {
  228. struct rpc_rqst *req = task->tk_rqstp;
  229. if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
  230. if (task == xprt->snd_task)
  231. return 1;
  232. goto out_sleep;
  233. }
  234. if (req == NULL) {
  235. xprt->snd_task = task;
  236. return 1;
  237. }
  238. if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
  239. goto out_unlock;
  240. if (!xprt_need_congestion_window_wait(xprt)) {
  241. xprt->snd_task = task;
  242. return 1;
  243. }
  244. out_unlock:
  245. xprt_clear_locked(xprt);
  246. out_sleep:
  247. dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
  248. task->tk_timeout = RPC_IS_SOFT(task) ? req->rq_timeout : 0;
  249. task->tk_status = -EAGAIN;
  250. rpc_sleep_on(&xprt->sending, task, NULL);
  251. return 0;
  252. }
  253. EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
  254. static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
  255. {
  256. int retval;
  257. if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
  258. return 1;
  259. spin_lock_bh(&xprt->transport_lock);
  260. retval = xprt->ops->reserve_xprt(xprt, task);
  261. spin_unlock_bh(&xprt->transport_lock);
  262. return retval;
  263. }
  264. static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
  265. {
  266. struct rpc_xprt *xprt = data;
  267. xprt->snd_task = task;
  268. return true;
  269. }
  270. static void __xprt_lock_write_next(struct rpc_xprt *xprt)
  271. {
  272. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  273. return;
  274. if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
  275. goto out_unlock;
  276. if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
  277. __xprt_lock_write_func, xprt))
  278. return;
  279. out_unlock:
  280. xprt_clear_locked(xprt);
  281. }
  282. static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
  283. {
  284. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  285. return;
  286. if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
  287. goto out_unlock;
  288. if (xprt_need_congestion_window_wait(xprt))
  289. goto out_unlock;
  290. if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
  291. __xprt_lock_write_func, xprt))
  292. return;
  293. out_unlock:
  294. xprt_clear_locked(xprt);
  295. }
  296. /**
  297. * xprt_release_xprt - allow other requests to use a transport
  298. * @xprt: transport with other tasks potentially waiting
  299. * @task: task that is releasing access to the transport
  300. *
  301. * Note that "task" can be NULL. No congestion control is provided.
  302. */
  303. void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
  304. {
  305. if (xprt->snd_task == task) {
  306. xprt_clear_locked(xprt);
  307. __xprt_lock_write_next(xprt);
  308. }
  309. }
  310. EXPORT_SYMBOL_GPL(xprt_release_xprt);
  311. /**
  312. * xprt_release_xprt_cong - allow other requests to use a transport
  313. * @xprt: transport with other tasks potentially waiting
  314. * @task: task that is releasing access to the transport
  315. *
  316. * Note that "task" can be NULL. Another task is awoken to use the
  317. * transport if the transport's congestion window allows it.
  318. */
  319. void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
  320. {
  321. if (xprt->snd_task == task) {
  322. xprt_clear_locked(xprt);
  323. __xprt_lock_write_next_cong(xprt);
  324. }
  325. }
  326. EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
  327. static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
  328. {
  329. if (xprt->snd_task != task)
  330. return;
  331. spin_lock_bh(&xprt->transport_lock);
  332. xprt->ops->release_xprt(xprt, task);
  333. spin_unlock_bh(&xprt->transport_lock);
  334. }
  335. /*
  336. * Van Jacobson congestion avoidance. Check if the congestion window
  337. * overflowed. Put the task to sleep if this is the case.
  338. */
  339. static int
  340. __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  341. {
  342. if (req->rq_cong)
  343. return 1;
  344. dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
  345. req->rq_task->tk_pid, xprt->cong, xprt->cwnd);
  346. if (RPCXPRT_CONGESTED(xprt)) {
  347. xprt_set_congestion_window_wait(xprt);
  348. return 0;
  349. }
  350. req->rq_cong = 1;
  351. xprt->cong += RPC_CWNDSCALE;
  352. return 1;
  353. }
  354. /*
  355. * Adjust the congestion window, and wake up the next task
  356. * that has been sleeping due to congestion
  357. */
  358. static void
  359. __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  360. {
  361. if (!req->rq_cong)
  362. return;
  363. req->rq_cong = 0;
  364. xprt->cong -= RPC_CWNDSCALE;
  365. xprt_test_and_clear_congestion_window_wait(xprt);
  366. __xprt_lock_write_next_cong(xprt);
  367. }
  368. /**
  369. * xprt_request_get_cong - Request congestion control credits
  370. * @xprt: pointer to transport
  371. * @req: pointer to RPC request
  372. *
  373. * Useful for transports that require congestion control.
  374. */
  375. bool
  376. xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
  377. {
  378. bool ret = false;
  379. if (req->rq_cong)
  380. return true;
  381. spin_lock_bh(&xprt->transport_lock);
  382. ret = __xprt_get_cong(xprt, req) != 0;
  383. spin_unlock_bh(&xprt->transport_lock);
  384. return ret;
  385. }
  386. EXPORT_SYMBOL_GPL(xprt_request_get_cong);
  387. /**
  388. * xprt_release_rqst_cong - housekeeping when request is complete
  389. * @task: RPC request that recently completed
  390. *
  391. * Useful for transports that require congestion control.
  392. */
  393. void xprt_release_rqst_cong(struct rpc_task *task)
  394. {
  395. struct rpc_rqst *req = task->tk_rqstp;
  396. __xprt_put_cong(req->rq_xprt, req);
  397. }
  398. EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
  399. /*
  400. * Clear the congestion window wait flag and wake up the next
  401. * entry on xprt->sending
  402. */
  403. static void
  404. xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
  405. {
  406. if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
  407. spin_lock_bh(&xprt->transport_lock);
  408. __xprt_lock_write_next_cong(xprt);
  409. spin_unlock_bh(&xprt->transport_lock);
  410. }
  411. }
  412. /**
  413. * xprt_adjust_cwnd - adjust transport congestion window
  414. * @xprt: pointer to xprt
  415. * @task: recently completed RPC request used to adjust window
  416. * @result: result code of completed RPC request
  417. *
  418. * The transport code maintains an estimate on the maximum number of out-
  419. * standing RPC requests, using a smoothed version of the congestion
  420. * avoidance implemented in 44BSD. This is basically the Van Jacobson
  421. * congestion algorithm: If a retransmit occurs, the congestion window is
  422. * halved; otherwise, it is incremented by 1/cwnd when
  423. *
  424. * - a reply is received and
  425. * - a full number of requests are outstanding and
  426. * - the congestion window hasn't been updated recently.
  427. */
  428. void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
  429. {
  430. struct rpc_rqst *req = task->tk_rqstp;
  431. unsigned long cwnd = xprt->cwnd;
  432. if (result >= 0 && cwnd <= xprt->cong) {
  433. /* The (cwnd >> 1) term makes sure
  434. * the result gets rounded properly. */
  435. cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
  436. if (cwnd > RPC_MAXCWND(xprt))
  437. cwnd = RPC_MAXCWND(xprt);
  438. __xprt_lock_write_next_cong(xprt);
  439. } else if (result == -ETIMEDOUT) {
  440. cwnd >>= 1;
  441. if (cwnd < RPC_CWNDSCALE)
  442. cwnd = RPC_CWNDSCALE;
  443. }
  444. dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
  445. xprt->cong, xprt->cwnd, cwnd);
  446. xprt->cwnd = cwnd;
  447. __xprt_put_cong(xprt, req);
  448. }
  449. EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
  450. /**
  451. * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
  452. * @xprt: transport with waiting tasks
  453. * @status: result code to plant in each task before waking it
  454. *
  455. */
  456. void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
  457. {
  458. if (status < 0)
  459. rpc_wake_up_status(&xprt->pending, status);
  460. else
  461. rpc_wake_up(&xprt->pending);
  462. }
  463. EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
  464. /**
  465. * xprt_wait_for_buffer_space - wait for transport output buffer to clear
  466. * @xprt: transport
  467. *
  468. * Note that we only set the timer for the case of RPC_IS_SOFT(), since
  469. * we don't in general want to force a socket disconnection due to
  470. * an incomplete RPC call transmission.
  471. */
  472. void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
  473. {
  474. set_bit(XPRT_WRITE_SPACE, &xprt->state);
  475. }
  476. EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
  477. static bool
  478. xprt_clear_write_space_locked(struct rpc_xprt *xprt)
  479. {
  480. if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
  481. __xprt_lock_write_next(xprt);
  482. dprintk("RPC: write space: waking waiting task on "
  483. "xprt %p\n", xprt);
  484. return true;
  485. }
  486. return false;
  487. }
  488. /**
  489. * xprt_write_space - wake the task waiting for transport output buffer space
  490. * @xprt: transport with waiting tasks
  491. *
  492. * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
  493. */
  494. bool xprt_write_space(struct rpc_xprt *xprt)
  495. {
  496. bool ret;
  497. if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
  498. return false;
  499. spin_lock_bh(&xprt->transport_lock);
  500. ret = xprt_clear_write_space_locked(xprt);
  501. spin_unlock_bh(&xprt->transport_lock);
  502. return ret;
  503. }
  504. EXPORT_SYMBOL_GPL(xprt_write_space);
  505. /**
  506. * xprt_set_retrans_timeout_def - set a request's retransmit timeout
  507. * @task: task whose timeout is to be set
  508. *
  509. * Set a request's retransmit timeout based on the transport's
  510. * default timeout parameters. Used by transports that don't adjust
  511. * the retransmit timeout based on round-trip time estimation.
  512. */
  513. void xprt_set_retrans_timeout_def(struct rpc_task *task)
  514. {
  515. task->tk_timeout = task->tk_rqstp->rq_timeout;
  516. }
  517. EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
  518. /**
  519. * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
  520. * @task: task whose timeout is to be set
  521. *
  522. * Set a request's retransmit timeout using the RTT estimator.
  523. */
  524. void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
  525. {
  526. int timer = task->tk_msg.rpc_proc->p_timer;
  527. struct rpc_clnt *clnt = task->tk_client;
  528. struct rpc_rtt *rtt = clnt->cl_rtt;
  529. struct rpc_rqst *req = task->tk_rqstp;
  530. unsigned long max_timeout = clnt->cl_timeout->to_maxval;
  531. task->tk_timeout = rpc_calc_rto(rtt, timer);
  532. task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
  533. if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
  534. task->tk_timeout = max_timeout;
  535. }
  536. EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
  537. static void xprt_reset_majortimeo(struct rpc_rqst *req)
  538. {
  539. const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
  540. req->rq_majortimeo = req->rq_timeout;
  541. if (to->to_exponential)
  542. req->rq_majortimeo <<= to->to_retries;
  543. else
  544. req->rq_majortimeo += to->to_increment * to->to_retries;
  545. if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
  546. req->rq_majortimeo = to->to_maxval;
  547. req->rq_majortimeo += jiffies;
  548. }
  549. /**
  550. * xprt_adjust_timeout - adjust timeout values for next retransmit
  551. * @req: RPC request containing parameters to use for the adjustment
  552. *
  553. */
  554. int xprt_adjust_timeout(struct rpc_rqst *req)
  555. {
  556. struct rpc_xprt *xprt = req->rq_xprt;
  557. const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
  558. int status = 0;
  559. if (time_before(jiffies, req->rq_majortimeo)) {
  560. if (to->to_exponential)
  561. req->rq_timeout <<= 1;
  562. else
  563. req->rq_timeout += to->to_increment;
  564. if (to->to_maxval && req->rq_timeout >= to->to_maxval)
  565. req->rq_timeout = to->to_maxval;
  566. req->rq_retries++;
  567. } else {
  568. req->rq_timeout = to->to_initval;
  569. req->rq_retries = 0;
  570. xprt_reset_majortimeo(req);
  571. /* Reset the RTT counters == "slow start" */
  572. spin_lock_bh(&xprt->transport_lock);
  573. rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
  574. spin_unlock_bh(&xprt->transport_lock);
  575. status = -ETIMEDOUT;
  576. }
  577. if (req->rq_timeout == 0) {
  578. printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
  579. req->rq_timeout = 5 * HZ;
  580. }
  581. return status;
  582. }
  583. static void xprt_autoclose(struct work_struct *work)
  584. {
  585. struct rpc_xprt *xprt =
  586. container_of(work, struct rpc_xprt, task_cleanup);
  587. clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
  588. xprt->ops->close(xprt);
  589. xprt_release_write(xprt, NULL);
  590. wake_up_bit(&xprt->state, XPRT_LOCKED);
  591. }
  592. /**
  593. * xprt_disconnect_done - mark a transport as disconnected
  594. * @xprt: transport to flag for disconnect
  595. *
  596. */
  597. void xprt_disconnect_done(struct rpc_xprt *xprt)
  598. {
  599. dprintk("RPC: disconnected transport %p\n", xprt);
  600. spin_lock_bh(&xprt->transport_lock);
  601. xprt_clear_connected(xprt);
  602. xprt_clear_write_space_locked(xprt);
  603. xprt_wake_pending_tasks(xprt, -EAGAIN);
  604. spin_unlock_bh(&xprt->transport_lock);
  605. }
  606. EXPORT_SYMBOL_GPL(xprt_disconnect_done);
  607. /**
  608. * xprt_force_disconnect - force a transport to disconnect
  609. * @xprt: transport to disconnect
  610. *
  611. */
  612. void xprt_force_disconnect(struct rpc_xprt *xprt)
  613. {
  614. /* Don't race with the test_bit() in xprt_clear_locked() */
  615. spin_lock_bh(&xprt->transport_lock);
  616. set_bit(XPRT_CLOSE_WAIT, &xprt->state);
  617. /* Try to schedule an autoclose RPC call */
  618. if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
  619. queue_work(xprtiod_workqueue, &xprt->task_cleanup);
  620. xprt_wake_pending_tasks(xprt, -EAGAIN);
  621. spin_unlock_bh(&xprt->transport_lock);
  622. }
  623. EXPORT_SYMBOL_GPL(xprt_force_disconnect);
  624. static unsigned int
  625. xprt_connect_cookie(struct rpc_xprt *xprt)
  626. {
  627. return READ_ONCE(xprt->connect_cookie);
  628. }
  629. static bool
  630. xprt_request_retransmit_after_disconnect(struct rpc_task *task)
  631. {
  632. struct rpc_rqst *req = task->tk_rqstp;
  633. struct rpc_xprt *xprt = req->rq_xprt;
  634. return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
  635. !xprt_connected(xprt);
  636. }
  637. /**
  638. * xprt_conditional_disconnect - force a transport to disconnect
  639. * @xprt: transport to disconnect
  640. * @cookie: 'connection cookie'
  641. *
  642. * This attempts to break the connection if and only if 'cookie' matches
  643. * the current transport 'connection cookie'. It ensures that we don't
  644. * try to break the connection more than once when we need to retransmit
  645. * a batch of RPC requests.
  646. *
  647. */
  648. void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
  649. {
  650. /* Don't race with the test_bit() in xprt_clear_locked() */
  651. spin_lock_bh(&xprt->transport_lock);
  652. if (cookie != xprt->connect_cookie)
  653. goto out;
  654. if (test_bit(XPRT_CLOSING, &xprt->state))
  655. goto out;
  656. set_bit(XPRT_CLOSE_WAIT, &xprt->state);
  657. /* Try to schedule an autoclose RPC call */
  658. if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
  659. queue_work(xprtiod_workqueue, &xprt->task_cleanup);
  660. xprt_wake_pending_tasks(xprt, -EAGAIN);
  661. out:
  662. spin_unlock_bh(&xprt->transport_lock);
  663. }
  664. static bool
  665. xprt_has_timer(const struct rpc_xprt *xprt)
  666. {
  667. return xprt->idle_timeout != 0;
  668. }
  669. static void
  670. xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
  671. __must_hold(&xprt->transport_lock)
  672. {
  673. if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
  674. mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
  675. }
  676. static void
  677. xprt_init_autodisconnect(struct timer_list *t)
  678. {
  679. struct rpc_xprt *xprt = from_timer(xprt, t, timer);
  680. spin_lock(&xprt->transport_lock);
  681. if (!RB_EMPTY_ROOT(&xprt->recv_queue))
  682. goto out_abort;
  683. /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
  684. xprt->last_used = jiffies;
  685. if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
  686. goto out_abort;
  687. spin_unlock(&xprt->transport_lock);
  688. queue_work(xprtiod_workqueue, &xprt->task_cleanup);
  689. return;
  690. out_abort:
  691. spin_unlock(&xprt->transport_lock);
  692. }
  693. bool xprt_lock_connect(struct rpc_xprt *xprt,
  694. struct rpc_task *task,
  695. void *cookie)
  696. {
  697. bool ret = false;
  698. spin_lock_bh(&xprt->transport_lock);
  699. if (!test_bit(XPRT_LOCKED, &xprt->state))
  700. goto out;
  701. if (xprt->snd_task != task)
  702. goto out;
  703. xprt->snd_task = cookie;
  704. ret = true;
  705. out:
  706. spin_unlock_bh(&xprt->transport_lock);
  707. return ret;
  708. }
  709. void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
  710. {
  711. spin_lock_bh(&xprt->transport_lock);
  712. if (xprt->snd_task != cookie)
  713. goto out;
  714. if (!test_bit(XPRT_LOCKED, &xprt->state))
  715. goto out;
  716. xprt->snd_task =NULL;
  717. xprt->ops->release_xprt(xprt, NULL);
  718. xprt_schedule_autodisconnect(xprt);
  719. out:
  720. spin_unlock_bh(&xprt->transport_lock);
  721. wake_up_bit(&xprt->state, XPRT_LOCKED);
  722. }
  723. /**
  724. * xprt_connect - schedule a transport connect operation
  725. * @task: RPC task that is requesting the connect
  726. *
  727. */
  728. void xprt_connect(struct rpc_task *task)
  729. {
  730. struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
  731. dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
  732. xprt, (xprt_connected(xprt) ? "is" : "is not"));
  733. if (!xprt_bound(xprt)) {
  734. task->tk_status = -EAGAIN;
  735. return;
  736. }
  737. if (!xprt_lock_write(xprt, task))
  738. return;
  739. if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
  740. xprt->ops->close(xprt);
  741. if (!xprt_connected(xprt)) {
  742. task->tk_timeout = task->tk_rqstp->rq_timeout;
  743. task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
  744. rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
  745. if (test_bit(XPRT_CLOSING, &xprt->state))
  746. return;
  747. if (xprt_test_and_set_connecting(xprt))
  748. return;
  749. /* Race breaker */
  750. if (!xprt_connected(xprt)) {
  751. xprt->stat.connect_start = jiffies;
  752. xprt->ops->connect(xprt, task);
  753. } else {
  754. xprt_clear_connecting(xprt);
  755. task->tk_status = 0;
  756. rpc_wake_up_queued_task(&xprt->pending, task);
  757. }
  758. }
  759. xprt_release_write(xprt, task);
  760. }
  761. static void xprt_connect_status(struct rpc_task *task)
  762. {
  763. switch (task->tk_status) {
  764. case 0:
  765. dprintk("RPC: %5u xprt_connect_status: connection established\n",
  766. task->tk_pid);
  767. break;
  768. case -ECONNREFUSED:
  769. case -ECONNRESET:
  770. case -ECONNABORTED:
  771. case -ENETUNREACH:
  772. case -EHOSTUNREACH:
  773. case -EPIPE:
  774. case -EAGAIN:
  775. dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
  776. break;
  777. case -ETIMEDOUT:
  778. dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
  779. "out\n", task->tk_pid);
  780. break;
  781. default:
  782. dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
  783. "server %s\n", task->tk_pid, -task->tk_status,
  784. task->tk_rqstp->rq_xprt->servername);
  785. task->tk_status = -EIO;
  786. }
  787. }
  788. enum xprt_xid_rb_cmp {
  789. XID_RB_EQUAL,
  790. XID_RB_LEFT,
  791. XID_RB_RIGHT,
  792. };
  793. static enum xprt_xid_rb_cmp
  794. xprt_xid_cmp(__be32 xid1, __be32 xid2)
  795. {
  796. if (xid1 == xid2)
  797. return XID_RB_EQUAL;
  798. if ((__force u32)xid1 < (__force u32)xid2)
  799. return XID_RB_LEFT;
  800. return XID_RB_RIGHT;
  801. }
  802. static struct rpc_rqst *
  803. xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
  804. {
  805. struct rb_node *n = xprt->recv_queue.rb_node;
  806. struct rpc_rqst *req;
  807. while (n != NULL) {
  808. req = rb_entry(n, struct rpc_rqst, rq_recv);
  809. switch (xprt_xid_cmp(xid, req->rq_xid)) {
  810. case XID_RB_LEFT:
  811. n = n->rb_left;
  812. break;
  813. case XID_RB_RIGHT:
  814. n = n->rb_right;
  815. break;
  816. case XID_RB_EQUAL:
  817. return req;
  818. }
  819. }
  820. return NULL;
  821. }
  822. static void
  823. xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
  824. {
  825. struct rb_node **p = &xprt->recv_queue.rb_node;
  826. struct rb_node *n = NULL;
  827. struct rpc_rqst *req;
  828. while (*p != NULL) {
  829. n = *p;
  830. req = rb_entry(n, struct rpc_rqst, rq_recv);
  831. switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
  832. case XID_RB_LEFT:
  833. p = &n->rb_left;
  834. break;
  835. case XID_RB_RIGHT:
  836. p = &n->rb_right;
  837. break;
  838. case XID_RB_EQUAL:
  839. WARN_ON_ONCE(new != req);
  840. return;
  841. }
  842. }
  843. rb_link_node(&new->rq_recv, n, p);
  844. rb_insert_color(&new->rq_recv, &xprt->recv_queue);
  845. }
  846. static void
  847. xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
  848. {
  849. rb_erase(&req->rq_recv, &xprt->recv_queue);
  850. }
  851. /**
  852. * xprt_lookup_rqst - find an RPC request corresponding to an XID
  853. * @xprt: transport on which the original request was transmitted
  854. * @xid: RPC XID of incoming reply
  855. *
  856. * Caller holds xprt->queue_lock.
  857. */
  858. struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
  859. {
  860. struct rpc_rqst *entry;
  861. entry = xprt_request_rb_find(xprt, xid);
  862. if (entry != NULL) {
  863. trace_xprt_lookup_rqst(xprt, xid, 0);
  864. entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
  865. return entry;
  866. }
  867. dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
  868. ntohl(xid));
  869. trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
  870. xprt->stat.bad_xids++;
  871. return NULL;
  872. }
  873. EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
  874. static bool
  875. xprt_is_pinned_rqst(struct rpc_rqst *req)
  876. {
  877. return atomic_read(&req->rq_pin) != 0;
  878. }
  879. /**
  880. * xprt_pin_rqst - Pin a request on the transport receive list
  881. * @req: Request to pin
  882. *
  883. * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
  884. * so should be holding the xprt receive lock.
  885. */
  886. void xprt_pin_rqst(struct rpc_rqst *req)
  887. {
  888. atomic_inc(&req->rq_pin);
  889. }
  890. EXPORT_SYMBOL_GPL(xprt_pin_rqst);
  891. /**
  892. * xprt_unpin_rqst - Unpin a request on the transport receive list
  893. * @req: Request to pin
  894. *
  895. * Caller should be holding the xprt receive lock.
  896. */
  897. void xprt_unpin_rqst(struct rpc_rqst *req)
  898. {
  899. if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
  900. atomic_dec(&req->rq_pin);
  901. return;
  902. }
  903. if (atomic_dec_and_test(&req->rq_pin))
  904. wake_up_var(&req->rq_pin);
  905. }
  906. EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
  907. static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
  908. {
  909. wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
  910. }
  911. static bool
  912. xprt_request_data_received(struct rpc_task *task)
  913. {
  914. return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
  915. READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
  916. }
  917. static bool
  918. xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
  919. {
  920. return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
  921. READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
  922. }
  923. /**
  924. * xprt_request_enqueue_receive - Add an request to the receive queue
  925. * @task: RPC task
  926. *
  927. */
  928. void
  929. xprt_request_enqueue_receive(struct rpc_task *task)
  930. {
  931. struct rpc_rqst *req = task->tk_rqstp;
  932. struct rpc_xprt *xprt = req->rq_xprt;
  933. if (!xprt_request_need_enqueue_receive(task, req))
  934. return;
  935. spin_lock(&xprt->queue_lock);
  936. /* Update the softirq receive buffer */
  937. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  938. sizeof(req->rq_private_buf));
  939. /* Add request to the receive list */
  940. xprt_request_rb_insert(xprt, req);
  941. set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
  942. spin_unlock(&xprt->queue_lock);
  943. xprt_reset_majortimeo(req);
  944. /* Turn off autodisconnect */
  945. del_singleshot_timer_sync(&xprt->timer);
  946. }
  947. /**
  948. * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
  949. * @task: RPC task
  950. *
  951. * Caller must hold xprt->queue_lock.
  952. */
  953. static void
  954. xprt_request_dequeue_receive_locked(struct rpc_task *task)
  955. {
  956. struct rpc_rqst *req = task->tk_rqstp;
  957. if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
  958. xprt_request_rb_remove(req->rq_xprt, req);
  959. }
  960. /**
  961. * xprt_update_rtt - Update RPC RTT statistics
  962. * @task: RPC request that recently completed
  963. *
  964. * Caller holds xprt->queue_lock.
  965. */
  966. void xprt_update_rtt(struct rpc_task *task)
  967. {
  968. struct rpc_rqst *req = task->tk_rqstp;
  969. struct rpc_rtt *rtt = task->tk_client->cl_rtt;
  970. unsigned int timer = task->tk_msg.rpc_proc->p_timer;
  971. long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
  972. if (timer) {
  973. if (req->rq_ntrans == 1)
  974. rpc_update_rtt(rtt, timer, m);
  975. rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
  976. }
  977. }
  978. EXPORT_SYMBOL_GPL(xprt_update_rtt);
  979. /**
  980. * xprt_complete_rqst - called when reply processing is complete
  981. * @task: RPC request that recently completed
  982. * @copied: actual number of bytes received from the transport
  983. *
  984. * Caller holds xprt->queue_lock.
  985. */
  986. void xprt_complete_rqst(struct rpc_task *task, int copied)
  987. {
  988. struct rpc_rqst *req = task->tk_rqstp;
  989. struct rpc_xprt *xprt = req->rq_xprt;
  990. dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
  991. task->tk_pid, ntohl(req->rq_xid), copied);
  992. trace_xprt_complete_rqst(xprt, req->rq_xid, copied);
  993. xprt->stat.recvs++;
  994. req->rq_private_buf.len = copied;
  995. /* Ensure all writes are done before we update */
  996. /* req->rq_reply_bytes_recvd */
  997. smp_wmb();
  998. req->rq_reply_bytes_recvd = copied;
  999. xprt_request_dequeue_receive_locked(task);
  1000. rpc_wake_up_queued_task(&xprt->pending, task);
  1001. }
  1002. EXPORT_SYMBOL_GPL(xprt_complete_rqst);
  1003. static void xprt_timer(struct rpc_task *task)
  1004. {
  1005. struct rpc_rqst *req = task->tk_rqstp;
  1006. struct rpc_xprt *xprt = req->rq_xprt;
  1007. if (task->tk_status != -ETIMEDOUT)
  1008. return;
  1009. trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
  1010. if (!req->rq_reply_bytes_recvd) {
  1011. if (xprt->ops->timer)
  1012. xprt->ops->timer(xprt, task);
  1013. } else
  1014. task->tk_status = 0;
  1015. }
  1016. /**
  1017. * xprt_request_wait_receive - wait for the reply to an RPC request
  1018. * @task: RPC task about to send a request
  1019. *
  1020. */
  1021. void xprt_request_wait_receive(struct rpc_task *task)
  1022. {
  1023. struct rpc_rqst *req = task->tk_rqstp;
  1024. struct rpc_xprt *xprt = req->rq_xprt;
  1025. if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
  1026. return;
  1027. /*
  1028. * Sleep on the pending queue if we're expecting a reply.
  1029. * The spinlock ensures atomicity between the test of
  1030. * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
  1031. */
  1032. spin_lock(&xprt->queue_lock);
  1033. if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
  1034. xprt->ops->set_retrans_timeout(task);
  1035. rpc_sleep_on(&xprt->pending, task, xprt_timer);
  1036. /*
  1037. * Send an extra queue wakeup call if the
  1038. * connection was dropped in case the call to
  1039. * rpc_sleep_on() raced.
  1040. */
  1041. if (xprt_request_retransmit_after_disconnect(task))
  1042. rpc_wake_up_queued_task_set_status(&xprt->pending,
  1043. task, -ENOTCONN);
  1044. }
  1045. spin_unlock(&xprt->queue_lock);
  1046. }
  1047. static bool
  1048. xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
  1049. {
  1050. return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
  1051. }
  1052. /**
  1053. * xprt_request_enqueue_transmit - queue a task for transmission
  1054. * @task: pointer to rpc_task
  1055. *
  1056. * Add a task to the transmission queue.
  1057. */
  1058. void
  1059. xprt_request_enqueue_transmit(struct rpc_task *task)
  1060. {
  1061. struct rpc_rqst *pos, *req = task->tk_rqstp;
  1062. struct rpc_xprt *xprt = req->rq_xprt;
  1063. if (xprt_request_need_enqueue_transmit(task, req)) {
  1064. spin_lock(&xprt->queue_lock);
  1065. /*
  1066. * Requests that carry congestion control credits are added
  1067. * to the head of the list to avoid starvation issues.
  1068. */
  1069. if (req->rq_cong) {
  1070. xprt_clear_congestion_window_wait(xprt);
  1071. list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
  1072. if (pos->rq_cong)
  1073. continue;
  1074. /* Note: req is added _before_ pos */
  1075. list_add_tail(&req->rq_xmit, &pos->rq_xmit);
  1076. INIT_LIST_HEAD(&req->rq_xmit2);
  1077. goto out;
  1078. }
  1079. } else if (RPC_IS_SWAPPER(task)) {
  1080. list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
  1081. if (pos->rq_cong || pos->rq_bytes_sent)
  1082. continue;
  1083. if (RPC_IS_SWAPPER(pos->rq_task))
  1084. continue;
  1085. /* Note: req is added _before_ pos */
  1086. list_add_tail(&req->rq_xmit, &pos->rq_xmit);
  1087. INIT_LIST_HEAD(&req->rq_xmit2);
  1088. goto out;
  1089. }
  1090. } else {
  1091. list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
  1092. if (pos->rq_task->tk_owner != task->tk_owner)
  1093. continue;
  1094. list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
  1095. INIT_LIST_HEAD(&req->rq_xmit);
  1096. goto out;
  1097. }
  1098. }
  1099. list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
  1100. INIT_LIST_HEAD(&req->rq_xmit2);
  1101. out:
  1102. set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
  1103. spin_unlock(&xprt->queue_lock);
  1104. }
  1105. }
  1106. /**
  1107. * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
  1108. * @task: pointer to rpc_task
  1109. *
  1110. * Remove a task from the transmission queue
  1111. * Caller must hold xprt->queue_lock
  1112. */
  1113. static void
  1114. xprt_request_dequeue_transmit_locked(struct rpc_task *task)
  1115. {
  1116. struct rpc_rqst *req = task->tk_rqstp;
  1117. if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
  1118. return;
  1119. if (!list_empty(&req->rq_xmit)) {
  1120. list_del(&req->rq_xmit);
  1121. if (!list_empty(&req->rq_xmit2)) {
  1122. struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
  1123. struct rpc_rqst, rq_xmit2);
  1124. list_del(&req->rq_xmit2);
  1125. list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
  1126. }
  1127. } else
  1128. list_del(&req->rq_xmit2);
  1129. }
  1130. /**
  1131. * xprt_request_dequeue_transmit - remove a task from the transmission queue
  1132. * @task: pointer to rpc_task
  1133. *
  1134. * Remove a task from the transmission queue
  1135. */
  1136. static void
  1137. xprt_request_dequeue_transmit(struct rpc_task *task)
  1138. {
  1139. struct rpc_rqst *req = task->tk_rqstp;
  1140. struct rpc_xprt *xprt = req->rq_xprt;
  1141. spin_lock(&xprt->queue_lock);
  1142. xprt_request_dequeue_transmit_locked(task);
  1143. spin_unlock(&xprt->queue_lock);
  1144. }
  1145. /**
  1146. * xprt_request_prepare - prepare an encoded request for transport
  1147. * @req: pointer to rpc_rqst
  1148. *
  1149. * Calls into the transport layer to do whatever is needed to prepare
  1150. * the request for transmission or receive.
  1151. */
  1152. void
  1153. xprt_request_prepare(struct rpc_rqst *req)
  1154. {
  1155. struct rpc_xprt *xprt = req->rq_xprt;
  1156. if (xprt->ops->prepare_request)
  1157. xprt->ops->prepare_request(req);
  1158. }
  1159. /**
  1160. * xprt_request_need_retransmit - Test if a task needs retransmission
  1161. * @task: pointer to rpc_task
  1162. *
  1163. * Test for whether a connection breakage requires the task to retransmit
  1164. */
  1165. bool
  1166. xprt_request_need_retransmit(struct rpc_task *task)
  1167. {
  1168. return xprt_request_retransmit_after_disconnect(task);
  1169. }
  1170. /**
  1171. * xprt_prepare_transmit - reserve the transport before sending a request
  1172. * @task: RPC task about to send a request
  1173. *
  1174. */
  1175. bool xprt_prepare_transmit(struct rpc_task *task)
  1176. {
  1177. struct rpc_rqst *req = task->tk_rqstp;
  1178. struct rpc_xprt *xprt = req->rq_xprt;
  1179. dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
  1180. if (!xprt_lock_write(xprt, task)) {
  1181. /* Race breaker: someone may have transmitted us */
  1182. if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
  1183. rpc_wake_up_queued_task_set_status(&xprt->sending,
  1184. task, 0);
  1185. return false;
  1186. }
  1187. return true;
  1188. }
  1189. void xprt_end_transmit(struct rpc_task *task)
  1190. {
  1191. xprt_release_write(task->tk_rqstp->rq_xprt, task);
  1192. }
  1193. /**
  1194. * xprt_request_transmit - send an RPC request on a transport
  1195. * @req: pointer to request to transmit
  1196. * @snd_task: RPC task that owns the transport lock
  1197. *
  1198. * This performs the transmission of a single request.
  1199. * Note that if the request is not the same as snd_task, then it
  1200. * does need to be pinned.
  1201. * Returns '0' on success.
  1202. */
  1203. static int
  1204. xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
  1205. {
  1206. struct rpc_xprt *xprt = req->rq_xprt;
  1207. struct rpc_task *task = req->rq_task;
  1208. unsigned int connect_cookie;
  1209. int is_retrans = RPC_WAS_SENT(task);
  1210. int status;
  1211. dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
  1212. if (!req->rq_bytes_sent) {
  1213. if (xprt_request_data_received(task)) {
  1214. status = 0;
  1215. goto out_dequeue;
  1216. }
  1217. /* Verify that our message lies in the RPCSEC_GSS window */
  1218. if (rpcauth_xmit_need_reencode(task)) {
  1219. status = -EBADMSG;
  1220. goto out_dequeue;
  1221. }
  1222. }
  1223. /*
  1224. * Update req->rq_ntrans before transmitting to avoid races with
  1225. * xprt_update_rtt(), which needs to know that it is recording a
  1226. * reply to the first transmission.
  1227. */
  1228. req->rq_ntrans++;
  1229. connect_cookie = xprt->connect_cookie;
  1230. status = xprt->ops->send_request(req);
  1231. trace_xprt_transmit(xprt, req->rq_xid, status);
  1232. if (status != 0) {
  1233. req->rq_ntrans--;
  1234. return status;
  1235. }
  1236. if (is_retrans)
  1237. task->tk_client->cl_stats->rpcretrans++;
  1238. xprt_inject_disconnect(xprt);
  1239. dprintk("RPC: %5u xmit complete\n", task->tk_pid);
  1240. task->tk_flags |= RPC_TASK_SENT;
  1241. spin_lock_bh(&xprt->transport_lock);
  1242. xprt->stat.sends++;
  1243. xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
  1244. xprt->stat.bklog_u += xprt->backlog.qlen;
  1245. xprt->stat.sending_u += xprt->sending.qlen;
  1246. xprt->stat.pending_u += xprt->pending.qlen;
  1247. spin_unlock_bh(&xprt->transport_lock);
  1248. req->rq_connect_cookie = connect_cookie;
  1249. out_dequeue:
  1250. xprt_request_dequeue_transmit(task);
  1251. rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
  1252. return status;
  1253. }
  1254. /**
  1255. * xprt_transmit - send an RPC request on a transport
  1256. * @task: controlling RPC task
  1257. *
  1258. * Attempts to drain the transmit queue. On exit, either the transport
  1259. * signalled an error that needs to be handled before transmission can
  1260. * resume, or @task finished transmitting, and detected that it already
  1261. * received a reply.
  1262. */
  1263. void
  1264. xprt_transmit(struct rpc_task *task)
  1265. {
  1266. struct rpc_rqst *next, *req = task->tk_rqstp;
  1267. struct rpc_xprt *xprt = req->rq_xprt;
  1268. int status;
  1269. spin_lock(&xprt->queue_lock);
  1270. while (!list_empty(&xprt->xmit_queue)) {
  1271. next = list_first_entry(&xprt->xmit_queue,
  1272. struct rpc_rqst, rq_xmit);
  1273. xprt_pin_rqst(next);
  1274. spin_unlock(&xprt->queue_lock);
  1275. status = xprt_request_transmit(next, task);
  1276. if (status == -EBADMSG && next != req)
  1277. status = 0;
  1278. cond_resched();
  1279. spin_lock(&xprt->queue_lock);
  1280. xprt_unpin_rqst(next);
  1281. if (status == 0) {
  1282. if (!xprt_request_data_received(task) ||
  1283. test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
  1284. continue;
  1285. } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
  1286. task->tk_status = status;
  1287. break;
  1288. }
  1289. spin_unlock(&xprt->queue_lock);
  1290. }
  1291. static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
  1292. {
  1293. set_bit(XPRT_CONGESTED, &xprt->state);
  1294. rpc_sleep_on(&xprt->backlog, task, NULL);
  1295. }
  1296. static void xprt_wake_up_backlog(struct rpc_xprt *xprt)
  1297. {
  1298. if (rpc_wake_up_next(&xprt->backlog) == NULL)
  1299. clear_bit(XPRT_CONGESTED, &xprt->state);
  1300. }
  1301. static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
  1302. {
  1303. bool ret = false;
  1304. if (!test_bit(XPRT_CONGESTED, &xprt->state))
  1305. goto out;
  1306. spin_lock(&xprt->reserve_lock);
  1307. if (test_bit(XPRT_CONGESTED, &xprt->state)) {
  1308. rpc_sleep_on(&xprt->backlog, task, NULL);
  1309. ret = true;
  1310. }
  1311. spin_unlock(&xprt->reserve_lock);
  1312. out:
  1313. return ret;
  1314. }
  1315. static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
  1316. {
  1317. struct rpc_rqst *req = ERR_PTR(-EAGAIN);
  1318. if (xprt->num_reqs >= xprt->max_reqs)
  1319. goto out;
  1320. ++xprt->num_reqs;
  1321. spin_unlock(&xprt->reserve_lock);
  1322. req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
  1323. spin_lock(&xprt->reserve_lock);
  1324. if (req != NULL)
  1325. goto out;
  1326. --xprt->num_reqs;
  1327. req = ERR_PTR(-ENOMEM);
  1328. out:
  1329. return req;
  1330. }
  1331. static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
  1332. {
  1333. if (xprt->num_reqs > xprt->min_reqs) {
  1334. --xprt->num_reqs;
  1335. kfree(req);
  1336. return true;
  1337. }
  1338. return false;
  1339. }
  1340. void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
  1341. {
  1342. struct rpc_rqst *req;
  1343. spin_lock(&xprt->reserve_lock);
  1344. if (!list_empty(&xprt->free)) {
  1345. req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
  1346. list_del(&req->rq_list);
  1347. goto out_init_req;
  1348. }
  1349. req = xprt_dynamic_alloc_slot(xprt);
  1350. if (!IS_ERR(req))
  1351. goto out_init_req;
  1352. switch (PTR_ERR(req)) {
  1353. case -ENOMEM:
  1354. dprintk("RPC: dynamic allocation of request slot "
  1355. "failed! Retrying\n");
  1356. task->tk_status = -ENOMEM;
  1357. break;
  1358. case -EAGAIN:
  1359. xprt_add_backlog(xprt, task);
  1360. dprintk("RPC: waiting for request slot\n");
  1361. /* fall through */
  1362. default:
  1363. task->tk_status = -EAGAIN;
  1364. }
  1365. spin_unlock(&xprt->reserve_lock);
  1366. return;
  1367. out_init_req:
  1368. xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
  1369. xprt->num_reqs);
  1370. spin_unlock(&xprt->reserve_lock);
  1371. task->tk_status = 0;
  1372. task->tk_rqstp = req;
  1373. }
  1374. EXPORT_SYMBOL_GPL(xprt_alloc_slot);
  1375. void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
  1376. {
  1377. spin_lock(&xprt->reserve_lock);
  1378. if (!xprt_dynamic_free_slot(xprt, req)) {
  1379. memset(req, 0, sizeof(*req)); /* mark unused */
  1380. list_add(&req->rq_list, &xprt->free);
  1381. }
  1382. xprt_wake_up_backlog(xprt);
  1383. spin_unlock(&xprt->reserve_lock);
  1384. }
  1385. EXPORT_SYMBOL_GPL(xprt_free_slot);
  1386. static void xprt_free_all_slots(struct rpc_xprt *xprt)
  1387. {
  1388. struct rpc_rqst *req;
  1389. while (!list_empty(&xprt->free)) {
  1390. req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
  1391. list_del(&req->rq_list);
  1392. kfree(req);
  1393. }
  1394. }
  1395. struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
  1396. unsigned int num_prealloc,
  1397. unsigned int max_alloc)
  1398. {
  1399. struct rpc_xprt *xprt;
  1400. struct rpc_rqst *req;
  1401. int i;
  1402. xprt = kzalloc(size, GFP_KERNEL);
  1403. if (xprt == NULL)
  1404. goto out;
  1405. xprt_init(xprt, net);
  1406. for (i = 0; i < num_prealloc; i++) {
  1407. req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
  1408. if (!req)
  1409. goto out_free;
  1410. list_add(&req->rq_list, &xprt->free);
  1411. }
  1412. if (max_alloc > num_prealloc)
  1413. xprt->max_reqs = max_alloc;
  1414. else
  1415. xprt->max_reqs = num_prealloc;
  1416. xprt->min_reqs = num_prealloc;
  1417. xprt->num_reqs = num_prealloc;
  1418. return xprt;
  1419. out_free:
  1420. xprt_free(xprt);
  1421. out:
  1422. return NULL;
  1423. }
  1424. EXPORT_SYMBOL_GPL(xprt_alloc);
  1425. void xprt_free(struct rpc_xprt *xprt)
  1426. {
  1427. put_net(xprt->xprt_net);
  1428. xprt_free_all_slots(xprt);
  1429. kfree_rcu(xprt, rcu);
  1430. }
  1431. EXPORT_SYMBOL_GPL(xprt_free);
  1432. static void
  1433. xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
  1434. {
  1435. req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
  1436. }
  1437. static __be32
  1438. xprt_alloc_xid(struct rpc_xprt *xprt)
  1439. {
  1440. __be32 xid;
  1441. spin_lock(&xprt->reserve_lock);
  1442. xid = (__force __be32)xprt->xid++;
  1443. spin_unlock(&xprt->reserve_lock);
  1444. return xid;
  1445. }
  1446. static void
  1447. xprt_init_xid(struct rpc_xprt *xprt)
  1448. {
  1449. xprt->xid = prandom_u32();
  1450. }
  1451. static void
  1452. xprt_request_init(struct rpc_task *task)
  1453. {
  1454. struct rpc_xprt *xprt = task->tk_xprt;
  1455. struct rpc_rqst *req = task->tk_rqstp;
  1456. req->rq_timeout = task->tk_client->cl_timeout->to_initval;
  1457. req->rq_task = task;
  1458. req->rq_xprt = xprt;
  1459. req->rq_buffer = NULL;
  1460. req->rq_xid = xprt_alloc_xid(xprt);
  1461. xprt_init_connect_cookie(req, xprt);
  1462. req->rq_bytes_sent = 0;
  1463. req->rq_snd_buf.len = 0;
  1464. req->rq_snd_buf.buflen = 0;
  1465. req->rq_rcv_buf.len = 0;
  1466. req->rq_rcv_buf.buflen = 0;
  1467. req->rq_snd_buf.bvec = NULL;
  1468. req->rq_rcv_buf.bvec = NULL;
  1469. req->rq_release_snd_buf = NULL;
  1470. xprt_reset_majortimeo(req);
  1471. dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
  1472. req, ntohl(req->rq_xid));
  1473. }
  1474. static void
  1475. xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
  1476. {
  1477. xprt->ops->alloc_slot(xprt, task);
  1478. if (task->tk_rqstp != NULL)
  1479. xprt_request_init(task);
  1480. }
  1481. /**
  1482. * xprt_reserve - allocate an RPC request slot
  1483. * @task: RPC task requesting a slot allocation
  1484. *
  1485. * If the transport is marked as being congested, or if no more
  1486. * slots are available, place the task on the transport's
  1487. * backlog queue.
  1488. */
  1489. void xprt_reserve(struct rpc_task *task)
  1490. {
  1491. struct rpc_xprt *xprt = task->tk_xprt;
  1492. task->tk_status = 0;
  1493. if (task->tk_rqstp != NULL)
  1494. return;
  1495. task->tk_timeout = 0;
  1496. task->tk_status = -EAGAIN;
  1497. if (!xprt_throttle_congested(xprt, task))
  1498. xprt_do_reserve(xprt, task);
  1499. }
  1500. /**
  1501. * xprt_retry_reserve - allocate an RPC request slot
  1502. * @task: RPC task requesting a slot allocation
  1503. *
  1504. * If no more slots are available, place the task on the transport's
  1505. * backlog queue.
  1506. * Note that the only difference with xprt_reserve is that we now
  1507. * ignore the value of the XPRT_CONGESTED flag.
  1508. */
  1509. void xprt_retry_reserve(struct rpc_task *task)
  1510. {
  1511. struct rpc_xprt *xprt = task->tk_xprt;
  1512. task->tk_status = 0;
  1513. if (task->tk_rqstp != NULL)
  1514. return;
  1515. task->tk_timeout = 0;
  1516. task->tk_status = -EAGAIN;
  1517. xprt_do_reserve(xprt, task);
  1518. }
  1519. static void
  1520. xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
  1521. {
  1522. struct rpc_xprt *xprt = req->rq_xprt;
  1523. if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
  1524. test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
  1525. xprt_is_pinned_rqst(req)) {
  1526. spin_lock(&xprt->queue_lock);
  1527. xprt_request_dequeue_transmit_locked(task);
  1528. xprt_request_dequeue_receive_locked(task);
  1529. while (xprt_is_pinned_rqst(req)) {
  1530. set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
  1531. spin_unlock(&xprt->queue_lock);
  1532. xprt_wait_on_pinned_rqst(req);
  1533. spin_lock(&xprt->queue_lock);
  1534. clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
  1535. }
  1536. spin_unlock(&xprt->queue_lock);
  1537. }
  1538. }
  1539. /**
  1540. * xprt_release - release an RPC request slot
  1541. * @task: task which is finished with the slot
  1542. *
  1543. */
  1544. void xprt_release(struct rpc_task *task)
  1545. {
  1546. struct rpc_xprt *xprt;
  1547. struct rpc_rqst *req = task->tk_rqstp;
  1548. if (req == NULL) {
  1549. if (task->tk_client) {
  1550. xprt = task->tk_xprt;
  1551. xprt_release_write(xprt, task);
  1552. }
  1553. return;
  1554. }
  1555. xprt = req->rq_xprt;
  1556. if (task->tk_ops->rpc_count_stats != NULL)
  1557. task->tk_ops->rpc_count_stats(task, task->tk_calldata);
  1558. else if (task->tk_client)
  1559. rpc_count_iostats(task, task->tk_client->cl_metrics);
  1560. xprt_request_dequeue_all(task, req);
  1561. spin_lock_bh(&xprt->transport_lock);
  1562. xprt->ops->release_xprt(xprt, task);
  1563. if (xprt->ops->release_request)
  1564. xprt->ops->release_request(task);
  1565. xprt->last_used = jiffies;
  1566. xprt_schedule_autodisconnect(xprt);
  1567. spin_unlock_bh(&xprt->transport_lock);
  1568. if (req->rq_buffer)
  1569. xprt->ops->buf_free(task);
  1570. xprt_inject_disconnect(xprt);
  1571. xdr_free_bvec(&req->rq_rcv_buf);
  1572. if (req->rq_cred != NULL)
  1573. put_rpccred(req->rq_cred);
  1574. task->tk_rqstp = NULL;
  1575. if (req->rq_release_snd_buf)
  1576. req->rq_release_snd_buf(req);
  1577. dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
  1578. if (likely(!bc_prealloc(req)))
  1579. xprt->ops->free_slot(xprt, req);
  1580. else
  1581. xprt_free_bc_request(req);
  1582. }
  1583. #ifdef CONFIG_SUNRPC_BACKCHANNEL
  1584. void
  1585. xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
  1586. {
  1587. struct xdr_buf *xbufp = &req->rq_snd_buf;
  1588. task->tk_rqstp = req;
  1589. req->rq_task = task;
  1590. xprt_init_connect_cookie(req, req->rq_xprt);
  1591. /*
  1592. * Set up the xdr_buf length.
  1593. * This also indicates that the buffer is XDR encoded already.
  1594. */
  1595. xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
  1596. xbufp->tail[0].iov_len;
  1597. req->rq_bytes_sent = 0;
  1598. }
  1599. #endif
  1600. static void xprt_init(struct rpc_xprt *xprt, struct net *net)
  1601. {
  1602. kref_init(&xprt->kref);
  1603. spin_lock_init(&xprt->transport_lock);
  1604. spin_lock_init(&xprt->reserve_lock);
  1605. spin_lock_init(&xprt->queue_lock);
  1606. INIT_LIST_HEAD(&xprt->free);
  1607. xprt->recv_queue = RB_ROOT;
  1608. INIT_LIST_HEAD(&xprt->xmit_queue);
  1609. #if defined(CONFIG_SUNRPC_BACKCHANNEL)
  1610. spin_lock_init(&xprt->bc_pa_lock);
  1611. INIT_LIST_HEAD(&xprt->bc_pa_list);
  1612. #endif /* CONFIG_SUNRPC_BACKCHANNEL */
  1613. INIT_LIST_HEAD(&xprt->xprt_switch);
  1614. xprt->last_used = jiffies;
  1615. xprt->cwnd = RPC_INITCWND;
  1616. xprt->bind_index = 0;
  1617. rpc_init_wait_queue(&xprt->binding, "xprt_binding");
  1618. rpc_init_wait_queue(&xprt->pending, "xprt_pending");
  1619. rpc_init_wait_queue(&xprt->sending, "xprt_sending");
  1620. rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
  1621. xprt_init_xid(xprt);
  1622. xprt->xprt_net = get_net(net);
  1623. }
  1624. /**
  1625. * xprt_create_transport - create an RPC transport
  1626. * @args: rpc transport creation arguments
  1627. *
  1628. */
  1629. struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
  1630. {
  1631. struct rpc_xprt *xprt;
  1632. struct xprt_class *t;
  1633. spin_lock(&xprt_list_lock);
  1634. list_for_each_entry(t, &xprt_list, list) {
  1635. if (t->ident == args->ident) {
  1636. spin_unlock(&xprt_list_lock);
  1637. goto found;
  1638. }
  1639. }
  1640. spin_unlock(&xprt_list_lock);
  1641. dprintk("RPC: transport (%d) not supported\n", args->ident);
  1642. return ERR_PTR(-EIO);
  1643. found:
  1644. xprt = t->setup(args);
  1645. if (IS_ERR(xprt)) {
  1646. dprintk("RPC: xprt_create_transport: failed, %ld\n",
  1647. -PTR_ERR(xprt));
  1648. goto out;
  1649. }
  1650. if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
  1651. xprt->idle_timeout = 0;
  1652. INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
  1653. if (xprt_has_timer(xprt))
  1654. timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
  1655. else
  1656. timer_setup(&xprt->timer, NULL, 0);
  1657. if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
  1658. xprt_destroy(xprt);
  1659. return ERR_PTR(-EINVAL);
  1660. }
  1661. xprt->servername = kstrdup(args->servername, GFP_KERNEL);
  1662. if (xprt->servername == NULL) {
  1663. xprt_destroy(xprt);
  1664. return ERR_PTR(-ENOMEM);
  1665. }
  1666. rpc_xprt_debugfs_register(xprt);
  1667. dprintk("RPC: created transport %p with %u slots\n", xprt,
  1668. xprt->max_reqs);
  1669. out:
  1670. return xprt;
  1671. }
  1672. static void xprt_destroy_cb(struct work_struct *work)
  1673. {
  1674. struct rpc_xprt *xprt =
  1675. container_of(work, struct rpc_xprt, task_cleanup);
  1676. rpc_xprt_debugfs_unregister(xprt);
  1677. rpc_destroy_wait_queue(&xprt->binding);
  1678. rpc_destroy_wait_queue(&xprt->pending);
  1679. rpc_destroy_wait_queue(&xprt->sending);
  1680. rpc_destroy_wait_queue(&xprt->backlog);
  1681. kfree(xprt->servername);
  1682. /*
  1683. * Tear down transport state and free the rpc_xprt
  1684. */
  1685. xprt->ops->destroy(xprt);
  1686. }
  1687. /**
  1688. * xprt_destroy - destroy an RPC transport, killing off all requests.
  1689. * @xprt: transport to destroy
  1690. *
  1691. */
  1692. static void xprt_destroy(struct rpc_xprt *xprt)
  1693. {
  1694. dprintk("RPC: destroying transport %p\n", xprt);
  1695. /*
  1696. * Exclude transport connect/disconnect handlers and autoclose
  1697. */
  1698. wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
  1699. del_timer_sync(&xprt->timer);
  1700. /*
  1701. * Destroy sockets etc from the system workqueue so they can
  1702. * safely flush receive work running on rpciod.
  1703. */
  1704. INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
  1705. schedule_work(&xprt->task_cleanup);
  1706. }
  1707. static void xprt_destroy_kref(struct kref *kref)
  1708. {
  1709. xprt_destroy(container_of(kref, struct rpc_xprt, kref));
  1710. }
  1711. /**
  1712. * xprt_get - return a reference to an RPC transport.
  1713. * @xprt: pointer to the transport
  1714. *
  1715. */
  1716. struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
  1717. {
  1718. if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
  1719. return xprt;
  1720. return NULL;
  1721. }
  1722. EXPORT_SYMBOL_GPL(xprt_get);
  1723. /**
  1724. * xprt_put - release a reference to an RPC transport.
  1725. * @xprt: pointer to the transport
  1726. *
  1727. */
  1728. void xprt_put(struct rpc_xprt *xprt)
  1729. {
  1730. if (xprt != NULL)
  1731. kref_put(&xprt->kref, xprt_destroy_kref);
  1732. }
  1733. EXPORT_SYMBOL_GPL(xprt_put);