osd_client.c 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/module.h>
  3. #include <linux/err.h>
  4. #include <linux/highmem.h>
  5. #include <linux/mm.h>
  6. #include <linux/pagemap.h>
  7. #include <linux/slab.h>
  8. #include <linux/uaccess.h>
  9. #ifdef CONFIG_BLOCK
  10. #include <linux/bio.h>
  11. #endif
  12. #include <linux/ceph/libceph.h>
  13. #include <linux/ceph/osd_client.h>
  14. #include <linux/ceph/messenger.h>
  15. #include <linux/ceph/decode.h>
  16. #include <linux/ceph/auth.h>
  17. #include <linux/ceph/pagelist.h>
  18. #define OSD_OP_FRONT_LEN 4096
  19. #define OSD_OPREPLY_FRONT_LEN 512
  20. static const struct ceph_connection_operations osd_con_ops;
  21. static void __send_queued(struct ceph_osd_client *osdc);
  22. static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
  23. static void __register_request(struct ceph_osd_client *osdc,
  24. struct ceph_osd_request *req);
  25. static void __unregister_linger_request(struct ceph_osd_client *osdc,
  26. struct ceph_osd_request *req);
  27. static void __send_request(struct ceph_osd_client *osdc,
  28. struct ceph_osd_request *req);
  29. /*
  30. * Implement client access to distributed object storage cluster.
  31. *
  32. * All data objects are stored within a cluster/cloud of OSDs, or
  33. * "object storage devices." (Note that Ceph OSDs have _nothing_ to
  34. * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
  35. * remote daemons serving up and coordinating consistent and safe
  36. * access to storage.
  37. *
  38. * Cluster membership and the mapping of data objects onto storage devices
  39. * are described by the osd map.
  40. *
  41. * We keep track of pending OSD requests (read, write), resubmit
  42. * requests to different OSDs when the cluster topology/data layout
  43. * change, or retry the affected requests when the communications
  44. * channel with an OSD is reset.
  45. */
  46. /*
  47. * calculate the mapping of a file extent onto an object, and fill out the
  48. * request accordingly. shorten extent as necessary if it crosses an
  49. * object boundary.
  50. *
  51. * fill osd op in request message.
  52. */
  53. static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
  54. u64 *objnum, u64 *objoff, u64 *objlen)
  55. {
  56. u64 orig_len = *plen;
  57. int r;
  58. /* object extent? */
  59. r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
  60. objoff, objlen);
  61. if (r < 0)
  62. return r;
  63. if (*objlen < orig_len) {
  64. *plen = *objlen;
  65. dout(" skipping last %llu, final file extent %llu~%llu\n",
  66. orig_len - *plen, off, *plen);
  67. }
  68. dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
  69. return 0;
  70. }
  71. /*
  72. * requests
  73. */
  74. void ceph_osdc_release_request(struct kref *kref)
  75. {
  76. int num_pages;
  77. struct ceph_osd_request *req = container_of(kref,
  78. struct ceph_osd_request,
  79. r_kref);
  80. if (req->r_request)
  81. ceph_msg_put(req->r_request);
  82. if (req->r_reply) {
  83. ceph_msg_revoke_incoming(req->r_reply);
  84. ceph_msg_put(req->r_reply);
  85. }
  86. if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
  87. req->r_data_in.own_pages) {
  88. num_pages = calc_pages_for((u64)req->r_data_in.alignment,
  89. (u64)req->r_data_in.length);
  90. ceph_release_page_vector(req->r_data_in.pages, num_pages);
  91. }
  92. if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
  93. req->r_data_out.own_pages) {
  94. num_pages = calc_pages_for((u64)req->r_data_out.alignment,
  95. (u64)req->r_data_out.length);
  96. ceph_release_page_vector(req->r_data_out.pages, num_pages);
  97. }
  98. ceph_put_snap_context(req->r_snapc);
  99. if (req->r_mempool)
  100. mempool_free(req, req->r_osdc->req_mempool);
  101. else
  102. kfree(req);
  103. }
  104. EXPORT_SYMBOL(ceph_osdc_release_request);
  105. struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
  106. struct ceph_snap_context *snapc,
  107. unsigned int num_ops,
  108. bool use_mempool,
  109. gfp_t gfp_flags)
  110. {
  111. struct ceph_osd_request *req;
  112. struct ceph_msg *msg;
  113. size_t msg_size;
  114. msg_size = 4 + 4 + 8 + 8 + 4+8;
  115. msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
  116. msg_size += 1 + 8 + 4 + 4; /* pg_t */
  117. msg_size += 4 + MAX_OBJ_NAME_SIZE;
  118. msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
  119. msg_size += 8; /* snapid */
  120. msg_size += 8; /* snap_seq */
  121. msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
  122. msg_size += 4;
  123. if (use_mempool) {
  124. req = mempool_alloc(osdc->req_mempool, gfp_flags);
  125. memset(req, 0, sizeof(*req));
  126. } else {
  127. req = kzalloc(sizeof(*req), gfp_flags);
  128. }
  129. if (req == NULL)
  130. return NULL;
  131. req->r_osdc = osdc;
  132. req->r_mempool = use_mempool;
  133. kref_init(&req->r_kref);
  134. init_completion(&req->r_completion);
  135. init_completion(&req->r_safe_completion);
  136. RB_CLEAR_NODE(&req->r_node);
  137. INIT_LIST_HEAD(&req->r_unsafe_item);
  138. INIT_LIST_HEAD(&req->r_linger_item);
  139. INIT_LIST_HEAD(&req->r_linger_osd);
  140. INIT_LIST_HEAD(&req->r_req_lru_item);
  141. INIT_LIST_HEAD(&req->r_osd_item);
  142. /* create reply message */
  143. if (use_mempool)
  144. msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
  145. else
  146. msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
  147. OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
  148. if (!msg) {
  149. ceph_osdc_put_request(req);
  150. return NULL;
  151. }
  152. req->r_reply = msg;
  153. req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
  154. req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
  155. /* create request message; allow space for oid */
  156. if (use_mempool)
  157. msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
  158. else
  159. msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
  160. if (!msg) {
  161. ceph_osdc_put_request(req);
  162. return NULL;
  163. }
  164. memset(msg->front.iov_base, 0, msg->front.iov_len);
  165. req->r_request = msg;
  166. return req;
  167. }
  168. EXPORT_SYMBOL(ceph_osdc_alloc_request);
  169. static bool osd_req_opcode_valid(u16 opcode)
  170. {
  171. switch (opcode) {
  172. case CEPH_OSD_OP_READ:
  173. case CEPH_OSD_OP_STAT:
  174. case CEPH_OSD_OP_MAPEXT:
  175. case CEPH_OSD_OP_MASKTRUNC:
  176. case CEPH_OSD_OP_SPARSE_READ:
  177. case CEPH_OSD_OP_NOTIFY:
  178. case CEPH_OSD_OP_NOTIFY_ACK:
  179. case CEPH_OSD_OP_ASSERT_VER:
  180. case CEPH_OSD_OP_WRITE:
  181. case CEPH_OSD_OP_WRITEFULL:
  182. case CEPH_OSD_OP_TRUNCATE:
  183. case CEPH_OSD_OP_ZERO:
  184. case CEPH_OSD_OP_DELETE:
  185. case CEPH_OSD_OP_APPEND:
  186. case CEPH_OSD_OP_STARTSYNC:
  187. case CEPH_OSD_OP_SETTRUNC:
  188. case CEPH_OSD_OP_TRIMTRUNC:
  189. case CEPH_OSD_OP_TMAPUP:
  190. case CEPH_OSD_OP_TMAPPUT:
  191. case CEPH_OSD_OP_TMAPGET:
  192. case CEPH_OSD_OP_CREATE:
  193. case CEPH_OSD_OP_ROLLBACK:
  194. case CEPH_OSD_OP_WATCH:
  195. case CEPH_OSD_OP_OMAPGETKEYS:
  196. case CEPH_OSD_OP_OMAPGETVALS:
  197. case CEPH_OSD_OP_OMAPGETHEADER:
  198. case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
  199. case CEPH_OSD_OP_OMAPSETVALS:
  200. case CEPH_OSD_OP_OMAPSETHEADER:
  201. case CEPH_OSD_OP_OMAPCLEAR:
  202. case CEPH_OSD_OP_OMAPRMKEYS:
  203. case CEPH_OSD_OP_OMAP_CMP:
  204. case CEPH_OSD_OP_CLONERANGE:
  205. case CEPH_OSD_OP_ASSERT_SRC_VERSION:
  206. case CEPH_OSD_OP_SRC_CMPXATTR:
  207. case CEPH_OSD_OP_GETXATTR:
  208. case CEPH_OSD_OP_GETXATTRS:
  209. case CEPH_OSD_OP_CMPXATTR:
  210. case CEPH_OSD_OP_SETXATTR:
  211. case CEPH_OSD_OP_SETXATTRS:
  212. case CEPH_OSD_OP_RESETXATTRS:
  213. case CEPH_OSD_OP_RMXATTR:
  214. case CEPH_OSD_OP_PULL:
  215. case CEPH_OSD_OP_PUSH:
  216. case CEPH_OSD_OP_BALANCEREADS:
  217. case CEPH_OSD_OP_UNBALANCEREADS:
  218. case CEPH_OSD_OP_SCRUB:
  219. case CEPH_OSD_OP_SCRUB_RESERVE:
  220. case CEPH_OSD_OP_SCRUB_UNRESERVE:
  221. case CEPH_OSD_OP_SCRUB_STOP:
  222. case CEPH_OSD_OP_SCRUB_MAP:
  223. case CEPH_OSD_OP_WRLOCK:
  224. case CEPH_OSD_OP_WRUNLOCK:
  225. case CEPH_OSD_OP_RDLOCK:
  226. case CEPH_OSD_OP_RDUNLOCK:
  227. case CEPH_OSD_OP_UPLOCK:
  228. case CEPH_OSD_OP_DNLOCK:
  229. case CEPH_OSD_OP_CALL:
  230. case CEPH_OSD_OP_PGLS:
  231. case CEPH_OSD_OP_PGLS_FILTER:
  232. return true;
  233. default:
  234. return false;
  235. }
  236. }
  237. /*
  238. * This is an osd op init function for opcodes that have no data or
  239. * other information associated with them. It also serves as a
  240. * common init routine for all the other init functions, below.
  241. */
  242. void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
  243. {
  244. BUG_ON(!osd_req_opcode_valid(opcode));
  245. memset(op, 0, sizeof (*op));
  246. op->op = opcode;
  247. }
  248. void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
  249. u64 offset, u64 length,
  250. u64 truncate_size, u32 truncate_seq)
  251. {
  252. size_t payload_len = 0;
  253. BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
  254. osd_req_op_init(op, opcode);
  255. op->extent.offset = offset;
  256. op->extent.length = length;
  257. op->extent.truncate_size = truncate_size;
  258. op->extent.truncate_seq = truncate_seq;
  259. if (opcode == CEPH_OSD_OP_WRITE)
  260. payload_len += length;
  261. op->payload_len = payload_len;
  262. }
  263. EXPORT_SYMBOL(osd_req_op_extent_init);
  264. void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
  265. const char *class, const char *method,
  266. const void *request_data, size_t request_data_size)
  267. {
  268. size_t payload_len = 0;
  269. size_t size;
  270. BUG_ON(opcode != CEPH_OSD_OP_CALL);
  271. osd_req_op_init(op, opcode);
  272. op->cls.class_name = class;
  273. size = strlen(class);
  274. BUG_ON(size > (size_t) U8_MAX);
  275. op->cls.class_len = size;
  276. payload_len += size;
  277. op->cls.method_name = method;
  278. size = strlen(method);
  279. BUG_ON(size > (size_t) U8_MAX);
  280. op->cls.method_len = size;
  281. payload_len += size;
  282. op->cls.indata = request_data;
  283. BUG_ON(request_data_size > (size_t) U32_MAX);
  284. op->cls.indata_len = (u32) request_data_size;
  285. payload_len += request_data_size;
  286. op->cls.argc = 0; /* currently unused */
  287. op->payload_len = payload_len;
  288. }
  289. EXPORT_SYMBOL(osd_req_op_cls_init);
  290. void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
  291. u64 cookie, u64 version, int flag)
  292. {
  293. BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
  294. osd_req_op_init(op, opcode);
  295. op->watch.cookie = cookie;
  296. /* op->watch.ver = version; */ /* XXX 3847 */
  297. op->watch.ver = cpu_to_le64(version);
  298. if (opcode == CEPH_OSD_OP_WATCH && flag)
  299. op->watch.flag = (u8) 1;
  300. }
  301. EXPORT_SYMBOL(osd_req_op_watch_init);
  302. static u64 osd_req_encode_op(struct ceph_osd_request *req,
  303. struct ceph_osd_op *dst,
  304. struct ceph_osd_req_op *src)
  305. {
  306. u64 out_data_len = 0;
  307. struct ceph_pagelist *pagelist;
  308. if (WARN_ON(!osd_req_opcode_valid(src->op))) {
  309. pr_err("unrecognized osd opcode %d\n", src->op);
  310. return 0;
  311. }
  312. switch (src->op) {
  313. case CEPH_OSD_OP_STAT:
  314. break;
  315. case CEPH_OSD_OP_READ:
  316. case CEPH_OSD_OP_WRITE:
  317. if (src->op == CEPH_OSD_OP_WRITE)
  318. out_data_len = src->extent.length;
  319. dst->extent.offset = cpu_to_le64(src->extent.offset);
  320. dst->extent.length = cpu_to_le64(src->extent.length);
  321. dst->extent.truncate_size =
  322. cpu_to_le64(src->extent.truncate_size);
  323. dst->extent.truncate_seq =
  324. cpu_to_le32(src->extent.truncate_seq);
  325. break;
  326. case CEPH_OSD_OP_CALL:
  327. pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
  328. BUG_ON(!pagelist);
  329. ceph_pagelist_init(pagelist);
  330. dst->cls.class_len = src->cls.class_len;
  331. dst->cls.method_len = src->cls.method_len;
  332. dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
  333. ceph_pagelist_append(pagelist, src->cls.class_name,
  334. src->cls.class_len);
  335. ceph_pagelist_append(pagelist, src->cls.method_name,
  336. src->cls.method_len);
  337. ceph_pagelist_append(pagelist, src->cls.indata,
  338. src->cls.indata_len);
  339. req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST;
  340. req->r_data_out.pagelist = pagelist;
  341. out_data_len = pagelist->length;
  342. break;
  343. case CEPH_OSD_OP_STARTSYNC:
  344. break;
  345. case CEPH_OSD_OP_NOTIFY_ACK:
  346. case CEPH_OSD_OP_WATCH:
  347. dst->watch.cookie = cpu_to_le64(src->watch.cookie);
  348. dst->watch.ver = cpu_to_le64(src->watch.ver);
  349. dst->watch.flag = src->watch.flag;
  350. break;
  351. default:
  352. pr_err("unsupported osd opcode %s\n",
  353. ceph_osd_op_name(src->op));
  354. WARN_ON(1);
  355. return 0;
  356. }
  357. dst->op = cpu_to_le16(src->op);
  358. dst->payload_len = cpu_to_le32(src->payload_len);
  359. return out_data_len;
  360. }
  361. /*
  362. * build new request AND message
  363. *
  364. */
  365. void ceph_osdc_build_request(struct ceph_osd_request *req,
  366. u64 off, unsigned int num_ops,
  367. struct ceph_osd_req_op *src_ops,
  368. struct ceph_snap_context *snapc, u64 snap_id,
  369. struct timespec *mtime)
  370. {
  371. struct ceph_msg *msg = req->r_request;
  372. struct ceph_osd_req_op *src_op;
  373. void *p;
  374. size_t msg_size;
  375. int flags = req->r_flags;
  376. u64 data_len;
  377. int i;
  378. req->r_num_ops = num_ops;
  379. req->r_snapid = snap_id;
  380. req->r_snapc = ceph_get_snap_context(snapc);
  381. /* encode request */
  382. msg->hdr.version = cpu_to_le16(4);
  383. p = msg->front.iov_base;
  384. ceph_encode_32(&p, 1); /* client_inc is always 1 */
  385. req->r_request_osdmap_epoch = p;
  386. p += 4;
  387. req->r_request_flags = p;
  388. p += 4;
  389. if (req->r_flags & CEPH_OSD_FLAG_WRITE)
  390. ceph_encode_timespec(p, mtime);
  391. p += sizeof(struct ceph_timespec);
  392. req->r_request_reassert_version = p;
  393. p += sizeof(struct ceph_eversion); /* will get filled in */
  394. /* oloc */
  395. ceph_encode_8(&p, 4);
  396. ceph_encode_8(&p, 4);
  397. ceph_encode_32(&p, 8 + 4 + 4);
  398. req->r_request_pool = p;
  399. p += 8;
  400. ceph_encode_32(&p, -1); /* preferred */
  401. ceph_encode_32(&p, 0); /* key len */
  402. ceph_encode_8(&p, 1);
  403. req->r_request_pgid = p;
  404. p += 8 + 4;
  405. ceph_encode_32(&p, -1); /* preferred */
  406. /* oid */
  407. ceph_encode_32(&p, req->r_oid_len);
  408. memcpy(p, req->r_oid, req->r_oid_len);
  409. dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
  410. p += req->r_oid_len;
  411. /* ops--can imply data */
  412. ceph_encode_16(&p, num_ops);
  413. src_op = src_ops;
  414. req->r_request_ops = p;
  415. data_len = 0;
  416. for (i = 0; i < num_ops; i++, src_op++) {
  417. data_len += osd_req_encode_op(req, p, src_op);
  418. p += sizeof(struct ceph_osd_op);
  419. }
  420. /* snaps */
  421. ceph_encode_64(&p, req->r_snapid);
  422. ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
  423. ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
  424. if (req->r_snapc) {
  425. for (i = 0; i < snapc->num_snaps; i++) {
  426. ceph_encode_64(&p, req->r_snapc->snaps[i]);
  427. }
  428. }
  429. req->r_request_attempts = p;
  430. p += 4;
  431. /* data */
  432. if (flags & CEPH_OSD_FLAG_WRITE) {
  433. u16 data_off;
  434. /*
  435. * The header "data_off" is a hint to the receiver
  436. * allowing it to align received data into its
  437. * buffers such that there's no need to re-copy
  438. * it before writing it to disk (direct I/O).
  439. */
  440. data_off = (u16) (off & 0xffff);
  441. req->r_request->hdr.data_off = cpu_to_le16(data_off);
  442. }
  443. req->r_request->hdr.data_len = cpu_to_le32(data_len);
  444. BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
  445. msg_size = p - msg->front.iov_base;
  446. msg->front.iov_len = msg_size;
  447. msg->hdr.front_len = cpu_to_le32(msg_size);
  448. dout("build_request msg_size was %d\n", (int)msg_size);
  449. }
  450. EXPORT_SYMBOL(ceph_osdc_build_request);
  451. /*
  452. * build new request AND message, calculate layout, and adjust file
  453. * extent as needed.
  454. *
  455. * if the file was recently truncated, we include information about its
  456. * old and new size so that the object can be updated appropriately. (we
  457. * avoid synchronously deleting truncated objects because it's slow.)
  458. *
  459. * if @do_sync, include a 'startsync' command so that the osd will flush
  460. * data quickly.
  461. */
  462. struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
  463. struct ceph_file_layout *layout,
  464. struct ceph_vino vino,
  465. u64 off, u64 *plen, int num_ops,
  466. struct ceph_osd_req_op *ops,
  467. int opcode, int flags,
  468. struct ceph_snap_context *snapc,
  469. u32 truncate_seq,
  470. u64 truncate_size,
  471. bool use_mempool)
  472. {
  473. struct ceph_osd_request *req;
  474. u64 objnum = 0;
  475. u64 objoff = 0;
  476. u64 objlen = 0;
  477. u32 object_size;
  478. u64 object_base;
  479. int r;
  480. BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
  481. req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
  482. GFP_NOFS);
  483. if (!req)
  484. return ERR_PTR(-ENOMEM);
  485. req->r_flags = flags;
  486. /* calculate max write size */
  487. r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
  488. if (r < 0) {
  489. ceph_osdc_put_request(req);
  490. return ERR_PTR(r);
  491. }
  492. object_size = le32_to_cpu(layout->fl_object_size);
  493. object_base = off - objoff;
  494. if (truncate_size <= object_base) {
  495. truncate_size = 0;
  496. } else {
  497. truncate_size -= object_base;
  498. if (truncate_size > object_size)
  499. truncate_size = object_size;
  500. }
  501. osd_req_op_extent_init(&ops[0], opcode, objoff, objlen,
  502. truncate_size, truncate_seq);
  503. /*
  504. * A second op in the ops array means the caller wants to
  505. * also issue a include a 'startsync' command so that the
  506. * osd will flush data quickly.
  507. */
  508. if (num_ops > 1)
  509. osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC);
  510. req->r_file_layout = *layout; /* keep a copy */
  511. snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
  512. vino.ino, objnum);
  513. req->r_oid_len = strlen(req->r_oid);
  514. return req;
  515. }
  516. EXPORT_SYMBOL(ceph_osdc_new_request);
  517. /*
  518. * We keep osd requests in an rbtree, sorted by ->r_tid.
  519. */
  520. static void __insert_request(struct ceph_osd_client *osdc,
  521. struct ceph_osd_request *new)
  522. {
  523. struct rb_node **p = &osdc->requests.rb_node;
  524. struct rb_node *parent = NULL;
  525. struct ceph_osd_request *req = NULL;
  526. while (*p) {
  527. parent = *p;
  528. req = rb_entry(parent, struct ceph_osd_request, r_node);
  529. if (new->r_tid < req->r_tid)
  530. p = &(*p)->rb_left;
  531. else if (new->r_tid > req->r_tid)
  532. p = &(*p)->rb_right;
  533. else
  534. BUG();
  535. }
  536. rb_link_node(&new->r_node, parent, p);
  537. rb_insert_color(&new->r_node, &osdc->requests);
  538. }
  539. static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
  540. u64 tid)
  541. {
  542. struct ceph_osd_request *req;
  543. struct rb_node *n = osdc->requests.rb_node;
  544. while (n) {
  545. req = rb_entry(n, struct ceph_osd_request, r_node);
  546. if (tid < req->r_tid)
  547. n = n->rb_left;
  548. else if (tid > req->r_tid)
  549. n = n->rb_right;
  550. else
  551. return req;
  552. }
  553. return NULL;
  554. }
  555. static struct ceph_osd_request *
  556. __lookup_request_ge(struct ceph_osd_client *osdc,
  557. u64 tid)
  558. {
  559. struct ceph_osd_request *req;
  560. struct rb_node *n = osdc->requests.rb_node;
  561. while (n) {
  562. req = rb_entry(n, struct ceph_osd_request, r_node);
  563. if (tid < req->r_tid) {
  564. if (!n->rb_left)
  565. return req;
  566. n = n->rb_left;
  567. } else if (tid > req->r_tid) {
  568. n = n->rb_right;
  569. } else {
  570. return req;
  571. }
  572. }
  573. return NULL;
  574. }
  575. /*
  576. * Resubmit requests pending on the given osd.
  577. */
  578. static void __kick_osd_requests(struct ceph_osd_client *osdc,
  579. struct ceph_osd *osd)
  580. {
  581. struct ceph_osd_request *req, *nreq;
  582. LIST_HEAD(resend);
  583. int err;
  584. dout("__kick_osd_requests osd%d\n", osd->o_osd);
  585. err = __reset_osd(osdc, osd);
  586. if (err)
  587. return;
  588. /*
  589. * Build up a list of requests to resend by traversing the
  590. * osd's list of requests. Requests for a given object are
  591. * sent in tid order, and that is also the order they're
  592. * kept on this list. Therefore all requests that are in
  593. * flight will be found first, followed by all requests that
  594. * have not yet been sent. And to resend requests while
  595. * preserving this order we will want to put any sent
  596. * requests back on the front of the osd client's unsent
  597. * list.
  598. *
  599. * So we build a separate ordered list of already-sent
  600. * requests for the affected osd and splice it onto the
  601. * front of the osd client's unsent list. Once we've seen a
  602. * request that has not yet been sent we're done. Those
  603. * requests are already sitting right where they belong.
  604. */
  605. list_for_each_entry(req, &osd->o_requests, r_osd_item) {
  606. if (!req->r_sent)
  607. break;
  608. list_move_tail(&req->r_req_lru_item, &resend);
  609. dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
  610. osd->o_osd);
  611. if (!req->r_linger)
  612. req->r_flags |= CEPH_OSD_FLAG_RETRY;
  613. }
  614. list_splice(&resend, &osdc->req_unsent);
  615. /*
  616. * Linger requests are re-registered before sending, which
  617. * sets up a new tid for each. We add them to the unsent
  618. * list at the end to keep things in tid order.
  619. */
  620. list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
  621. r_linger_osd) {
  622. /*
  623. * reregister request prior to unregistering linger so
  624. * that r_osd is preserved.
  625. */
  626. BUG_ON(!list_empty(&req->r_req_lru_item));
  627. __register_request(osdc, req);
  628. list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
  629. list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
  630. __unregister_linger_request(osdc, req);
  631. dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
  632. osd->o_osd);
  633. }
  634. }
  635. /*
  636. * If the osd connection drops, we need to resubmit all requests.
  637. */
  638. static void osd_reset(struct ceph_connection *con)
  639. {
  640. struct ceph_osd *osd = con->private;
  641. struct ceph_osd_client *osdc;
  642. if (!osd)
  643. return;
  644. dout("osd_reset osd%d\n", osd->o_osd);
  645. osdc = osd->o_osdc;
  646. down_read(&osdc->map_sem);
  647. mutex_lock(&osdc->request_mutex);
  648. __kick_osd_requests(osdc, osd);
  649. __send_queued(osdc);
  650. mutex_unlock(&osdc->request_mutex);
  651. up_read(&osdc->map_sem);
  652. }
  653. /*
  654. * Track open sessions with osds.
  655. */
  656. static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
  657. {
  658. struct ceph_osd *osd;
  659. osd = kzalloc(sizeof(*osd), GFP_NOFS);
  660. if (!osd)
  661. return NULL;
  662. atomic_set(&osd->o_ref, 1);
  663. osd->o_osdc = osdc;
  664. osd->o_osd = onum;
  665. RB_CLEAR_NODE(&osd->o_node);
  666. INIT_LIST_HEAD(&osd->o_requests);
  667. INIT_LIST_HEAD(&osd->o_linger_requests);
  668. INIT_LIST_HEAD(&osd->o_osd_lru);
  669. osd->o_incarnation = 1;
  670. ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
  671. INIT_LIST_HEAD(&osd->o_keepalive_item);
  672. return osd;
  673. }
  674. static struct ceph_osd *get_osd(struct ceph_osd *osd)
  675. {
  676. if (atomic_inc_not_zero(&osd->o_ref)) {
  677. dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
  678. atomic_read(&osd->o_ref));
  679. return osd;
  680. } else {
  681. dout("get_osd %p FAIL\n", osd);
  682. return NULL;
  683. }
  684. }
  685. static void put_osd(struct ceph_osd *osd)
  686. {
  687. dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
  688. atomic_read(&osd->o_ref) - 1);
  689. if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
  690. struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
  691. ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer);
  692. kfree(osd);
  693. }
  694. }
  695. /*
  696. * remove an osd from our map
  697. */
  698. static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
  699. {
  700. dout("__remove_osd %p\n", osd);
  701. BUG_ON(!list_empty(&osd->o_requests));
  702. rb_erase(&osd->o_node, &osdc->osds);
  703. list_del_init(&osd->o_osd_lru);
  704. ceph_con_close(&osd->o_con);
  705. put_osd(osd);
  706. }
  707. static void remove_all_osds(struct ceph_osd_client *osdc)
  708. {
  709. dout("%s %p\n", __func__, osdc);
  710. mutex_lock(&osdc->request_mutex);
  711. while (!RB_EMPTY_ROOT(&osdc->osds)) {
  712. struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
  713. struct ceph_osd, o_node);
  714. __remove_osd(osdc, osd);
  715. }
  716. mutex_unlock(&osdc->request_mutex);
  717. }
  718. static void __move_osd_to_lru(struct ceph_osd_client *osdc,
  719. struct ceph_osd *osd)
  720. {
  721. dout("__move_osd_to_lru %p\n", osd);
  722. BUG_ON(!list_empty(&osd->o_osd_lru));
  723. list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
  724. osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
  725. }
  726. static void __remove_osd_from_lru(struct ceph_osd *osd)
  727. {
  728. dout("__remove_osd_from_lru %p\n", osd);
  729. if (!list_empty(&osd->o_osd_lru))
  730. list_del_init(&osd->o_osd_lru);
  731. }
  732. static void remove_old_osds(struct ceph_osd_client *osdc)
  733. {
  734. struct ceph_osd *osd, *nosd;
  735. dout("__remove_old_osds %p\n", osdc);
  736. mutex_lock(&osdc->request_mutex);
  737. list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
  738. if (time_before(jiffies, osd->lru_ttl))
  739. break;
  740. __remove_osd(osdc, osd);
  741. }
  742. mutex_unlock(&osdc->request_mutex);
  743. }
  744. /*
  745. * reset osd connect
  746. */
  747. static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
  748. {
  749. struct ceph_entity_addr *peer_addr;
  750. dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
  751. if (list_empty(&osd->o_requests) &&
  752. list_empty(&osd->o_linger_requests)) {
  753. __remove_osd(osdc, osd);
  754. return -ENODEV;
  755. }
  756. peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
  757. if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
  758. !ceph_con_opened(&osd->o_con)) {
  759. struct ceph_osd_request *req;
  760. dout(" osd addr hasn't changed and connection never opened,"
  761. " letting msgr retry");
  762. /* touch each r_stamp for handle_timeout()'s benfit */
  763. list_for_each_entry(req, &osd->o_requests, r_osd_item)
  764. req->r_stamp = jiffies;
  765. return -EAGAIN;
  766. }
  767. ceph_con_close(&osd->o_con);
  768. ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
  769. osd->o_incarnation++;
  770. return 0;
  771. }
  772. static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
  773. {
  774. struct rb_node **p = &osdc->osds.rb_node;
  775. struct rb_node *parent = NULL;
  776. struct ceph_osd *osd = NULL;
  777. dout("__insert_osd %p osd%d\n", new, new->o_osd);
  778. while (*p) {
  779. parent = *p;
  780. osd = rb_entry(parent, struct ceph_osd, o_node);
  781. if (new->o_osd < osd->o_osd)
  782. p = &(*p)->rb_left;
  783. else if (new->o_osd > osd->o_osd)
  784. p = &(*p)->rb_right;
  785. else
  786. BUG();
  787. }
  788. rb_link_node(&new->o_node, parent, p);
  789. rb_insert_color(&new->o_node, &osdc->osds);
  790. }
  791. static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
  792. {
  793. struct ceph_osd *osd;
  794. struct rb_node *n = osdc->osds.rb_node;
  795. while (n) {
  796. osd = rb_entry(n, struct ceph_osd, o_node);
  797. if (o < osd->o_osd)
  798. n = n->rb_left;
  799. else if (o > osd->o_osd)
  800. n = n->rb_right;
  801. else
  802. return osd;
  803. }
  804. return NULL;
  805. }
  806. static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
  807. {
  808. schedule_delayed_work(&osdc->timeout_work,
  809. osdc->client->options->osd_keepalive_timeout * HZ);
  810. }
  811. static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  812. {
  813. cancel_delayed_work(&osdc->timeout_work);
  814. }
  815. /*
  816. * Register request, assign tid. If this is the first request, set up
  817. * the timeout event.
  818. */
  819. static void __register_request(struct ceph_osd_client *osdc,
  820. struct ceph_osd_request *req)
  821. {
  822. req->r_tid = ++osdc->last_tid;
  823. req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
  824. dout("__register_request %p tid %lld\n", req, req->r_tid);
  825. __insert_request(osdc, req);
  826. ceph_osdc_get_request(req);
  827. osdc->num_requests++;
  828. if (osdc->num_requests == 1) {
  829. dout(" first request, scheduling timeout\n");
  830. __schedule_osd_timeout(osdc);
  831. }
  832. }
  833. /*
  834. * called under osdc->request_mutex
  835. */
  836. static void __unregister_request(struct ceph_osd_client *osdc,
  837. struct ceph_osd_request *req)
  838. {
  839. if (RB_EMPTY_NODE(&req->r_node)) {
  840. dout("__unregister_request %p tid %lld not registered\n",
  841. req, req->r_tid);
  842. return;
  843. }
  844. dout("__unregister_request %p tid %lld\n", req, req->r_tid);
  845. rb_erase(&req->r_node, &osdc->requests);
  846. osdc->num_requests--;
  847. if (req->r_osd) {
  848. /* make sure the original request isn't in flight. */
  849. ceph_msg_revoke(req->r_request);
  850. list_del_init(&req->r_osd_item);
  851. if (list_empty(&req->r_osd->o_requests) &&
  852. list_empty(&req->r_osd->o_linger_requests)) {
  853. dout("moving osd to %p lru\n", req->r_osd);
  854. __move_osd_to_lru(osdc, req->r_osd);
  855. }
  856. if (list_empty(&req->r_linger_item))
  857. req->r_osd = NULL;
  858. }
  859. list_del_init(&req->r_req_lru_item);
  860. ceph_osdc_put_request(req);
  861. if (osdc->num_requests == 0) {
  862. dout(" no requests, canceling timeout\n");
  863. __cancel_osd_timeout(osdc);
  864. }
  865. }
  866. /*
  867. * Cancel a previously queued request message
  868. */
  869. static void __cancel_request(struct ceph_osd_request *req)
  870. {
  871. if (req->r_sent && req->r_osd) {
  872. ceph_msg_revoke(req->r_request);
  873. req->r_sent = 0;
  874. }
  875. }
  876. static void __register_linger_request(struct ceph_osd_client *osdc,
  877. struct ceph_osd_request *req)
  878. {
  879. dout("__register_linger_request %p\n", req);
  880. list_add_tail(&req->r_linger_item, &osdc->req_linger);
  881. if (req->r_osd)
  882. list_add_tail(&req->r_linger_osd,
  883. &req->r_osd->o_linger_requests);
  884. }
  885. static void __unregister_linger_request(struct ceph_osd_client *osdc,
  886. struct ceph_osd_request *req)
  887. {
  888. dout("__unregister_linger_request %p\n", req);
  889. list_del_init(&req->r_linger_item);
  890. if (req->r_osd) {
  891. list_del_init(&req->r_linger_osd);
  892. if (list_empty(&req->r_osd->o_requests) &&
  893. list_empty(&req->r_osd->o_linger_requests)) {
  894. dout("moving osd to %p lru\n", req->r_osd);
  895. __move_osd_to_lru(osdc, req->r_osd);
  896. }
  897. if (list_empty(&req->r_osd_item))
  898. req->r_osd = NULL;
  899. }
  900. }
  901. void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
  902. struct ceph_osd_request *req)
  903. {
  904. mutex_lock(&osdc->request_mutex);
  905. if (req->r_linger) {
  906. __unregister_linger_request(osdc, req);
  907. ceph_osdc_put_request(req);
  908. }
  909. mutex_unlock(&osdc->request_mutex);
  910. }
  911. EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
  912. void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
  913. struct ceph_osd_request *req)
  914. {
  915. if (!req->r_linger) {
  916. dout("set_request_linger %p\n", req);
  917. req->r_linger = 1;
  918. /*
  919. * caller is now responsible for calling
  920. * unregister_linger_request
  921. */
  922. ceph_osdc_get_request(req);
  923. }
  924. }
  925. EXPORT_SYMBOL(ceph_osdc_set_request_linger);
  926. /*
  927. * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  928. * (as needed), and set the request r_osd appropriately. If there is
  929. * no up osd, set r_osd to NULL. Move the request to the appropriate list
  930. * (unsent, homeless) or leave on in-flight lru.
  931. *
  932. * Return 0 if unchanged, 1 if changed, or negative on error.
  933. *
  934. * Caller should hold map_sem for read and request_mutex.
  935. */
  936. static int __map_request(struct ceph_osd_client *osdc,
  937. struct ceph_osd_request *req, int force_resend)
  938. {
  939. struct ceph_pg pgid;
  940. int acting[CEPH_PG_MAX_SIZE];
  941. int o = -1, num = 0;
  942. int err;
  943. dout("map_request %p tid %lld\n", req, req->r_tid);
  944. err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
  945. ceph_file_layout_pg_pool(req->r_file_layout));
  946. if (err) {
  947. list_move(&req->r_req_lru_item, &osdc->req_notarget);
  948. return err;
  949. }
  950. req->r_pgid = pgid;
  951. err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
  952. if (err > 0) {
  953. o = acting[0];
  954. num = err;
  955. }
  956. if ((!force_resend &&
  957. req->r_osd && req->r_osd->o_osd == o &&
  958. req->r_sent >= req->r_osd->o_incarnation &&
  959. req->r_num_pg_osds == num &&
  960. memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
  961. (req->r_osd == NULL && o == -1))
  962. return 0; /* no change */
  963. dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
  964. req->r_tid, pgid.pool, pgid.seed, o,
  965. req->r_osd ? req->r_osd->o_osd : -1);
  966. /* record full pg acting set */
  967. memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
  968. req->r_num_pg_osds = num;
  969. if (req->r_osd) {
  970. __cancel_request(req);
  971. list_del_init(&req->r_osd_item);
  972. req->r_osd = NULL;
  973. }
  974. req->r_osd = __lookup_osd(osdc, o);
  975. if (!req->r_osd && o >= 0) {
  976. err = -ENOMEM;
  977. req->r_osd = create_osd(osdc, o);
  978. if (!req->r_osd) {
  979. list_move(&req->r_req_lru_item, &osdc->req_notarget);
  980. goto out;
  981. }
  982. dout("map_request osd %p is osd%d\n", req->r_osd, o);
  983. __insert_osd(osdc, req->r_osd);
  984. ceph_con_open(&req->r_osd->o_con,
  985. CEPH_ENTITY_TYPE_OSD, o,
  986. &osdc->osdmap->osd_addr[o]);
  987. }
  988. if (req->r_osd) {
  989. __remove_osd_from_lru(req->r_osd);
  990. list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
  991. list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
  992. } else {
  993. list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
  994. }
  995. err = 1; /* osd or pg changed */
  996. out:
  997. return err;
  998. }
  999. /*
  1000. * caller should hold map_sem (for read) and request_mutex
  1001. */
  1002. static void __send_request(struct ceph_osd_client *osdc,
  1003. struct ceph_osd_request *req)
  1004. {
  1005. void *p;
  1006. dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
  1007. req, req->r_tid, req->r_osd->o_osd, req->r_flags,
  1008. (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
  1009. /* fill in message content that changes each time we send it */
  1010. put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
  1011. put_unaligned_le32(req->r_flags, req->r_request_flags);
  1012. put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
  1013. p = req->r_request_pgid;
  1014. ceph_encode_64(&p, req->r_pgid.pool);
  1015. ceph_encode_32(&p, req->r_pgid.seed);
  1016. put_unaligned_le64(1, req->r_request_attempts); /* FIXME */
  1017. memcpy(req->r_request_reassert_version, &req->r_reassert_version,
  1018. sizeof(req->r_reassert_version));
  1019. req->r_stamp = jiffies;
  1020. list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
  1021. ceph_msg_get(req->r_request); /* send consumes a ref */
  1022. ceph_con_send(&req->r_osd->o_con, req->r_request);
  1023. req->r_sent = req->r_osd->o_incarnation;
  1024. }
  1025. /*
  1026. * Send any requests in the queue (req_unsent).
  1027. */
  1028. static void __send_queued(struct ceph_osd_client *osdc)
  1029. {
  1030. struct ceph_osd_request *req, *tmp;
  1031. dout("__send_queued\n");
  1032. list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
  1033. __send_request(osdc, req);
  1034. }
  1035. /*
  1036. * Timeout callback, called every N seconds when 1 or more osd
  1037. * requests has been active for more than N seconds. When this
  1038. * happens, we ping all OSDs with requests who have timed out to
  1039. * ensure any communications channel reset is detected. Reset the
  1040. * request timeouts another N seconds in the future as we go.
  1041. * Reschedule the timeout event another N seconds in future (unless
  1042. * there are no open requests).
  1043. */
  1044. static void handle_timeout(struct work_struct *work)
  1045. {
  1046. struct ceph_osd_client *osdc =
  1047. container_of(work, struct ceph_osd_client, timeout_work.work);
  1048. struct ceph_osd_request *req;
  1049. struct ceph_osd *osd;
  1050. unsigned long keepalive =
  1051. osdc->client->options->osd_keepalive_timeout * HZ;
  1052. struct list_head slow_osds;
  1053. dout("timeout\n");
  1054. down_read(&osdc->map_sem);
  1055. ceph_monc_request_next_osdmap(&osdc->client->monc);
  1056. mutex_lock(&osdc->request_mutex);
  1057. /*
  1058. * ping osds that are a bit slow. this ensures that if there
  1059. * is a break in the TCP connection we will notice, and reopen
  1060. * a connection with that osd (from the fault callback).
  1061. */
  1062. INIT_LIST_HEAD(&slow_osds);
  1063. list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
  1064. if (time_before(jiffies, req->r_stamp + keepalive))
  1065. break;
  1066. osd = req->r_osd;
  1067. BUG_ON(!osd);
  1068. dout(" tid %llu is slow, will send keepalive on osd%d\n",
  1069. req->r_tid, osd->o_osd);
  1070. list_move_tail(&osd->o_keepalive_item, &slow_osds);
  1071. }
  1072. while (!list_empty(&slow_osds)) {
  1073. osd = list_entry(slow_osds.next, struct ceph_osd,
  1074. o_keepalive_item);
  1075. list_del_init(&osd->o_keepalive_item);
  1076. ceph_con_keepalive(&osd->o_con);
  1077. }
  1078. __schedule_osd_timeout(osdc);
  1079. __send_queued(osdc);
  1080. mutex_unlock(&osdc->request_mutex);
  1081. up_read(&osdc->map_sem);
  1082. }
  1083. static void handle_osds_timeout(struct work_struct *work)
  1084. {
  1085. struct ceph_osd_client *osdc =
  1086. container_of(work, struct ceph_osd_client,
  1087. osds_timeout_work.work);
  1088. unsigned long delay =
  1089. osdc->client->options->osd_idle_ttl * HZ >> 2;
  1090. dout("osds timeout\n");
  1091. down_read(&osdc->map_sem);
  1092. remove_old_osds(osdc);
  1093. up_read(&osdc->map_sem);
  1094. schedule_delayed_work(&osdc->osds_timeout_work,
  1095. round_jiffies_relative(delay));
  1096. }
  1097. static void complete_request(struct ceph_osd_request *req)
  1098. {
  1099. if (req->r_safe_callback)
  1100. req->r_safe_callback(req, NULL);
  1101. complete_all(&req->r_safe_completion); /* fsync waiter */
  1102. }
  1103. /*
  1104. * handle osd op reply. either call the callback if it is specified,
  1105. * or do the completion to wake up the waiting thread.
  1106. */
  1107. static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
  1108. struct ceph_connection *con)
  1109. {
  1110. void *p, *end;
  1111. struct ceph_osd_request *req;
  1112. u64 tid;
  1113. int object_len;
  1114. int numops, payload_len, flags;
  1115. s32 result;
  1116. s32 retry_attempt;
  1117. struct ceph_pg pg;
  1118. int err;
  1119. u32 reassert_epoch;
  1120. u64 reassert_version;
  1121. u32 osdmap_epoch;
  1122. int already_completed;
  1123. int i;
  1124. tid = le64_to_cpu(msg->hdr.tid);
  1125. dout("handle_reply %p tid %llu\n", msg, tid);
  1126. p = msg->front.iov_base;
  1127. end = p + msg->front.iov_len;
  1128. ceph_decode_need(&p, end, 4, bad);
  1129. object_len = ceph_decode_32(&p);
  1130. ceph_decode_need(&p, end, object_len, bad);
  1131. p += object_len;
  1132. err = ceph_decode_pgid(&p, end, &pg);
  1133. if (err)
  1134. goto bad;
  1135. ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
  1136. flags = ceph_decode_64(&p);
  1137. result = ceph_decode_32(&p);
  1138. reassert_epoch = ceph_decode_32(&p);
  1139. reassert_version = ceph_decode_64(&p);
  1140. osdmap_epoch = ceph_decode_32(&p);
  1141. /* lookup */
  1142. mutex_lock(&osdc->request_mutex);
  1143. req = __lookup_request(osdc, tid);
  1144. if (req == NULL) {
  1145. dout("handle_reply tid %llu dne\n", tid);
  1146. goto bad_mutex;
  1147. }
  1148. ceph_osdc_get_request(req);
  1149. dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
  1150. req, result);
  1151. ceph_decode_need(&p, end, 4, bad);
  1152. numops = ceph_decode_32(&p);
  1153. if (numops > CEPH_OSD_MAX_OP)
  1154. goto bad_put;
  1155. if (numops != req->r_num_ops)
  1156. goto bad_put;
  1157. payload_len = 0;
  1158. ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
  1159. for (i = 0; i < numops; i++) {
  1160. struct ceph_osd_op *op = p;
  1161. int len;
  1162. len = le32_to_cpu(op->payload_len);
  1163. req->r_reply_op_len[i] = len;
  1164. dout(" op %d has %d bytes\n", i, len);
  1165. payload_len += len;
  1166. p += sizeof(*op);
  1167. }
  1168. if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
  1169. pr_warning("sum of op payload lens %d != data_len %d",
  1170. payload_len, le32_to_cpu(msg->hdr.data_len));
  1171. goto bad_put;
  1172. }
  1173. ceph_decode_need(&p, end, 4 + numops * 4, bad);
  1174. retry_attempt = ceph_decode_32(&p);
  1175. for (i = 0; i < numops; i++)
  1176. req->r_reply_op_result[i] = ceph_decode_32(&p);
  1177. if (!req->r_got_reply) {
  1178. unsigned int bytes;
  1179. req->r_result = result;
  1180. bytes = le32_to_cpu(msg->hdr.data_len);
  1181. dout("handle_reply result %d bytes %d\n", req->r_result,
  1182. bytes);
  1183. if (req->r_result == 0)
  1184. req->r_result = bytes;
  1185. /* in case this is a write and we need to replay, */
  1186. req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
  1187. req->r_reassert_version.version = cpu_to_le64(reassert_version);
  1188. req->r_got_reply = 1;
  1189. } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
  1190. dout("handle_reply tid %llu dup ack\n", tid);
  1191. mutex_unlock(&osdc->request_mutex);
  1192. goto done;
  1193. }
  1194. dout("handle_reply tid %llu flags %d\n", tid, flags);
  1195. if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
  1196. __register_linger_request(osdc, req);
  1197. /* either this is a read, or we got the safe response */
  1198. if (result < 0 ||
  1199. (flags & CEPH_OSD_FLAG_ONDISK) ||
  1200. ((flags & CEPH_OSD_FLAG_WRITE) == 0))
  1201. __unregister_request(osdc, req);
  1202. already_completed = req->r_completed;
  1203. req->r_completed = 1;
  1204. mutex_unlock(&osdc->request_mutex);
  1205. if (already_completed)
  1206. goto done;
  1207. if (req->r_callback)
  1208. req->r_callback(req, msg);
  1209. else
  1210. complete_all(&req->r_completion);
  1211. if (flags & CEPH_OSD_FLAG_ONDISK)
  1212. complete_request(req);
  1213. done:
  1214. dout("req=%p req->r_linger=%d\n", req, req->r_linger);
  1215. ceph_osdc_put_request(req);
  1216. return;
  1217. bad_put:
  1218. ceph_osdc_put_request(req);
  1219. bad_mutex:
  1220. mutex_unlock(&osdc->request_mutex);
  1221. bad:
  1222. pr_err("corrupt osd_op_reply got %d %d\n",
  1223. (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
  1224. ceph_msg_dump(msg);
  1225. }
  1226. static void reset_changed_osds(struct ceph_osd_client *osdc)
  1227. {
  1228. struct rb_node *p, *n;
  1229. for (p = rb_first(&osdc->osds); p; p = n) {
  1230. struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
  1231. n = rb_next(p);
  1232. if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
  1233. memcmp(&osd->o_con.peer_addr,
  1234. ceph_osd_addr(osdc->osdmap,
  1235. osd->o_osd),
  1236. sizeof(struct ceph_entity_addr)) != 0)
  1237. __reset_osd(osdc, osd);
  1238. }
  1239. }
  1240. /*
  1241. * Requeue requests whose mapping to an OSD has changed. If requests map to
  1242. * no osd, request a new map.
  1243. *
  1244. * Caller should hold map_sem for read.
  1245. */
  1246. static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
  1247. {
  1248. struct ceph_osd_request *req, *nreq;
  1249. struct rb_node *p;
  1250. int needmap = 0;
  1251. int err;
  1252. dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
  1253. mutex_lock(&osdc->request_mutex);
  1254. for (p = rb_first(&osdc->requests); p; ) {
  1255. req = rb_entry(p, struct ceph_osd_request, r_node);
  1256. p = rb_next(p);
  1257. /*
  1258. * For linger requests that have not yet been
  1259. * registered, move them to the linger list; they'll
  1260. * be sent to the osd in the loop below. Unregister
  1261. * the request before re-registering it as a linger
  1262. * request to ensure the __map_request() below
  1263. * will decide it needs to be sent.
  1264. */
  1265. if (req->r_linger && list_empty(&req->r_linger_item)) {
  1266. dout("%p tid %llu restart on osd%d\n",
  1267. req, req->r_tid,
  1268. req->r_osd ? req->r_osd->o_osd : -1);
  1269. __unregister_request(osdc, req);
  1270. __register_linger_request(osdc, req);
  1271. continue;
  1272. }
  1273. err = __map_request(osdc, req, force_resend);
  1274. if (err < 0)
  1275. continue; /* error */
  1276. if (req->r_osd == NULL) {
  1277. dout("%p tid %llu maps to no osd\n", req, req->r_tid);
  1278. needmap++; /* request a newer map */
  1279. } else if (err > 0) {
  1280. if (!req->r_linger) {
  1281. dout("%p tid %llu requeued on osd%d\n", req,
  1282. req->r_tid,
  1283. req->r_osd ? req->r_osd->o_osd : -1);
  1284. req->r_flags |= CEPH_OSD_FLAG_RETRY;
  1285. }
  1286. }
  1287. }
  1288. list_for_each_entry_safe(req, nreq, &osdc->req_linger,
  1289. r_linger_item) {
  1290. dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
  1291. err = __map_request(osdc, req, force_resend);
  1292. dout("__map_request returned %d\n", err);
  1293. if (err == 0)
  1294. continue; /* no change and no osd was specified */
  1295. if (err < 0)
  1296. continue; /* hrm! */
  1297. if (req->r_osd == NULL) {
  1298. dout("tid %llu maps to no valid osd\n", req->r_tid);
  1299. needmap++; /* request a newer map */
  1300. continue;
  1301. }
  1302. dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
  1303. req->r_osd ? req->r_osd->o_osd : -1);
  1304. __register_request(osdc, req);
  1305. __unregister_linger_request(osdc, req);
  1306. }
  1307. mutex_unlock(&osdc->request_mutex);
  1308. if (needmap) {
  1309. dout("%d requests for down osds, need new map\n", needmap);
  1310. ceph_monc_request_next_osdmap(&osdc->client->monc);
  1311. }
  1312. reset_changed_osds(osdc);
  1313. }
  1314. /*
  1315. * Process updated osd map.
  1316. *
  1317. * The message contains any number of incremental and full maps, normally
  1318. * indicating some sort of topology change in the cluster. Kick requests
  1319. * off to different OSDs as needed.
  1320. */
  1321. void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
  1322. {
  1323. void *p, *end, *next;
  1324. u32 nr_maps, maplen;
  1325. u32 epoch;
  1326. struct ceph_osdmap *newmap = NULL, *oldmap;
  1327. int err;
  1328. struct ceph_fsid fsid;
  1329. dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
  1330. p = msg->front.iov_base;
  1331. end = p + msg->front.iov_len;
  1332. /* verify fsid */
  1333. ceph_decode_need(&p, end, sizeof(fsid), bad);
  1334. ceph_decode_copy(&p, &fsid, sizeof(fsid));
  1335. if (ceph_check_fsid(osdc->client, &fsid) < 0)
  1336. return;
  1337. down_write(&osdc->map_sem);
  1338. /* incremental maps */
  1339. ceph_decode_32_safe(&p, end, nr_maps, bad);
  1340. dout(" %d inc maps\n", nr_maps);
  1341. while (nr_maps > 0) {
  1342. ceph_decode_need(&p, end, 2*sizeof(u32), bad);
  1343. epoch = ceph_decode_32(&p);
  1344. maplen = ceph_decode_32(&p);
  1345. ceph_decode_need(&p, end, maplen, bad);
  1346. next = p + maplen;
  1347. if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
  1348. dout("applying incremental map %u len %d\n",
  1349. epoch, maplen);
  1350. newmap = osdmap_apply_incremental(&p, next,
  1351. osdc->osdmap,
  1352. &osdc->client->msgr);
  1353. if (IS_ERR(newmap)) {
  1354. err = PTR_ERR(newmap);
  1355. goto bad;
  1356. }
  1357. BUG_ON(!newmap);
  1358. if (newmap != osdc->osdmap) {
  1359. ceph_osdmap_destroy(osdc->osdmap);
  1360. osdc->osdmap = newmap;
  1361. }
  1362. kick_requests(osdc, 0);
  1363. } else {
  1364. dout("ignoring incremental map %u len %d\n",
  1365. epoch, maplen);
  1366. }
  1367. p = next;
  1368. nr_maps--;
  1369. }
  1370. if (newmap)
  1371. goto done;
  1372. /* full maps */
  1373. ceph_decode_32_safe(&p, end, nr_maps, bad);
  1374. dout(" %d full maps\n", nr_maps);
  1375. while (nr_maps) {
  1376. ceph_decode_need(&p, end, 2*sizeof(u32), bad);
  1377. epoch = ceph_decode_32(&p);
  1378. maplen = ceph_decode_32(&p);
  1379. ceph_decode_need(&p, end, maplen, bad);
  1380. if (nr_maps > 1) {
  1381. dout("skipping non-latest full map %u len %d\n",
  1382. epoch, maplen);
  1383. } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
  1384. dout("skipping full map %u len %d, "
  1385. "older than our %u\n", epoch, maplen,
  1386. osdc->osdmap->epoch);
  1387. } else {
  1388. int skipped_map = 0;
  1389. dout("taking full map %u len %d\n", epoch, maplen);
  1390. newmap = osdmap_decode(&p, p+maplen);
  1391. if (IS_ERR(newmap)) {
  1392. err = PTR_ERR(newmap);
  1393. goto bad;
  1394. }
  1395. BUG_ON(!newmap);
  1396. oldmap = osdc->osdmap;
  1397. osdc->osdmap = newmap;
  1398. if (oldmap) {
  1399. if (oldmap->epoch + 1 < newmap->epoch)
  1400. skipped_map = 1;
  1401. ceph_osdmap_destroy(oldmap);
  1402. }
  1403. kick_requests(osdc, skipped_map);
  1404. }
  1405. p += maplen;
  1406. nr_maps--;
  1407. }
  1408. done:
  1409. downgrade_write(&osdc->map_sem);
  1410. ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
  1411. /*
  1412. * subscribe to subsequent osdmap updates if full to ensure
  1413. * we find out when we are no longer full and stop returning
  1414. * ENOSPC.
  1415. */
  1416. if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
  1417. ceph_monc_request_next_osdmap(&osdc->client->monc);
  1418. mutex_lock(&osdc->request_mutex);
  1419. __send_queued(osdc);
  1420. mutex_unlock(&osdc->request_mutex);
  1421. up_read(&osdc->map_sem);
  1422. wake_up_all(&osdc->client->auth_wq);
  1423. return;
  1424. bad:
  1425. pr_err("osdc handle_map corrupt msg\n");
  1426. ceph_msg_dump(msg);
  1427. up_write(&osdc->map_sem);
  1428. return;
  1429. }
  1430. /*
  1431. * watch/notify callback event infrastructure
  1432. *
  1433. * These callbacks are used both for watch and notify operations.
  1434. */
  1435. static void __release_event(struct kref *kref)
  1436. {
  1437. struct ceph_osd_event *event =
  1438. container_of(kref, struct ceph_osd_event, kref);
  1439. dout("__release_event %p\n", event);
  1440. kfree(event);
  1441. }
  1442. static void get_event(struct ceph_osd_event *event)
  1443. {
  1444. kref_get(&event->kref);
  1445. }
  1446. void ceph_osdc_put_event(struct ceph_osd_event *event)
  1447. {
  1448. kref_put(&event->kref, __release_event);
  1449. }
  1450. EXPORT_SYMBOL(ceph_osdc_put_event);
  1451. static void __insert_event(struct ceph_osd_client *osdc,
  1452. struct ceph_osd_event *new)
  1453. {
  1454. struct rb_node **p = &osdc->event_tree.rb_node;
  1455. struct rb_node *parent = NULL;
  1456. struct ceph_osd_event *event = NULL;
  1457. while (*p) {
  1458. parent = *p;
  1459. event = rb_entry(parent, struct ceph_osd_event, node);
  1460. if (new->cookie < event->cookie)
  1461. p = &(*p)->rb_left;
  1462. else if (new->cookie > event->cookie)
  1463. p = &(*p)->rb_right;
  1464. else
  1465. BUG();
  1466. }
  1467. rb_link_node(&new->node, parent, p);
  1468. rb_insert_color(&new->node, &osdc->event_tree);
  1469. }
  1470. static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
  1471. u64 cookie)
  1472. {
  1473. struct rb_node **p = &osdc->event_tree.rb_node;
  1474. struct rb_node *parent = NULL;
  1475. struct ceph_osd_event *event = NULL;
  1476. while (*p) {
  1477. parent = *p;
  1478. event = rb_entry(parent, struct ceph_osd_event, node);
  1479. if (cookie < event->cookie)
  1480. p = &(*p)->rb_left;
  1481. else if (cookie > event->cookie)
  1482. p = &(*p)->rb_right;
  1483. else
  1484. return event;
  1485. }
  1486. return NULL;
  1487. }
  1488. static void __remove_event(struct ceph_osd_event *event)
  1489. {
  1490. struct ceph_osd_client *osdc = event->osdc;
  1491. if (!RB_EMPTY_NODE(&event->node)) {
  1492. dout("__remove_event removed %p\n", event);
  1493. rb_erase(&event->node, &osdc->event_tree);
  1494. ceph_osdc_put_event(event);
  1495. } else {
  1496. dout("__remove_event didn't remove %p\n", event);
  1497. }
  1498. }
  1499. int ceph_osdc_create_event(struct ceph_osd_client *osdc,
  1500. void (*event_cb)(u64, u64, u8, void *),
  1501. void *data, struct ceph_osd_event **pevent)
  1502. {
  1503. struct ceph_osd_event *event;
  1504. event = kmalloc(sizeof(*event), GFP_NOIO);
  1505. if (!event)
  1506. return -ENOMEM;
  1507. dout("create_event %p\n", event);
  1508. event->cb = event_cb;
  1509. event->one_shot = 0;
  1510. event->data = data;
  1511. event->osdc = osdc;
  1512. INIT_LIST_HEAD(&event->osd_node);
  1513. RB_CLEAR_NODE(&event->node);
  1514. kref_init(&event->kref); /* one ref for us */
  1515. kref_get(&event->kref); /* one ref for the caller */
  1516. spin_lock(&osdc->event_lock);
  1517. event->cookie = ++osdc->event_count;
  1518. __insert_event(osdc, event);
  1519. spin_unlock(&osdc->event_lock);
  1520. *pevent = event;
  1521. return 0;
  1522. }
  1523. EXPORT_SYMBOL(ceph_osdc_create_event);
  1524. void ceph_osdc_cancel_event(struct ceph_osd_event *event)
  1525. {
  1526. struct ceph_osd_client *osdc = event->osdc;
  1527. dout("cancel_event %p\n", event);
  1528. spin_lock(&osdc->event_lock);
  1529. __remove_event(event);
  1530. spin_unlock(&osdc->event_lock);
  1531. ceph_osdc_put_event(event); /* caller's */
  1532. }
  1533. EXPORT_SYMBOL(ceph_osdc_cancel_event);
  1534. static void do_event_work(struct work_struct *work)
  1535. {
  1536. struct ceph_osd_event_work *event_work =
  1537. container_of(work, struct ceph_osd_event_work, work);
  1538. struct ceph_osd_event *event = event_work->event;
  1539. u64 ver = event_work->ver;
  1540. u64 notify_id = event_work->notify_id;
  1541. u8 opcode = event_work->opcode;
  1542. dout("do_event_work completing %p\n", event);
  1543. event->cb(ver, notify_id, opcode, event->data);
  1544. dout("do_event_work completed %p\n", event);
  1545. ceph_osdc_put_event(event);
  1546. kfree(event_work);
  1547. }
  1548. /*
  1549. * Process osd watch notifications
  1550. */
  1551. static void handle_watch_notify(struct ceph_osd_client *osdc,
  1552. struct ceph_msg *msg)
  1553. {
  1554. void *p, *end;
  1555. u8 proto_ver;
  1556. u64 cookie, ver, notify_id;
  1557. u8 opcode;
  1558. struct ceph_osd_event *event;
  1559. struct ceph_osd_event_work *event_work;
  1560. p = msg->front.iov_base;
  1561. end = p + msg->front.iov_len;
  1562. ceph_decode_8_safe(&p, end, proto_ver, bad);
  1563. ceph_decode_8_safe(&p, end, opcode, bad);
  1564. ceph_decode_64_safe(&p, end, cookie, bad);
  1565. ceph_decode_64_safe(&p, end, ver, bad);
  1566. ceph_decode_64_safe(&p, end, notify_id, bad);
  1567. spin_lock(&osdc->event_lock);
  1568. event = __find_event(osdc, cookie);
  1569. if (event) {
  1570. BUG_ON(event->one_shot);
  1571. get_event(event);
  1572. }
  1573. spin_unlock(&osdc->event_lock);
  1574. dout("handle_watch_notify cookie %lld ver %lld event %p\n",
  1575. cookie, ver, event);
  1576. if (event) {
  1577. event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
  1578. if (!event_work) {
  1579. dout("ERROR: could not allocate event_work\n");
  1580. goto done_err;
  1581. }
  1582. INIT_WORK(&event_work->work, do_event_work);
  1583. event_work->event = event;
  1584. event_work->ver = ver;
  1585. event_work->notify_id = notify_id;
  1586. event_work->opcode = opcode;
  1587. if (!queue_work(osdc->notify_wq, &event_work->work)) {
  1588. dout("WARNING: failed to queue notify event work\n");
  1589. goto done_err;
  1590. }
  1591. }
  1592. return;
  1593. done_err:
  1594. ceph_osdc_put_event(event);
  1595. return;
  1596. bad:
  1597. pr_err("osdc handle_watch_notify corrupt msg\n");
  1598. return;
  1599. }
  1600. static void ceph_osdc_msg_data_set(struct ceph_msg *msg,
  1601. struct ceph_osd_data *osd_data)
  1602. {
  1603. if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
  1604. BUG_ON(osd_data->length > (u64) SIZE_MAX);
  1605. if (osd_data->length)
  1606. ceph_msg_data_set_pages(msg, osd_data->pages,
  1607. osd_data->length, osd_data->alignment);
  1608. } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
  1609. BUG_ON(!osd_data->pagelist->length);
  1610. ceph_msg_data_set_pagelist(msg, osd_data->pagelist);
  1611. #ifdef CONFIG_BLOCK
  1612. } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
  1613. ceph_msg_data_set_bio(msg, osd_data->bio, osd_data->bio_length);
  1614. #endif
  1615. } else {
  1616. BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
  1617. }
  1618. }
  1619. /*
  1620. * Register request, send initial attempt.
  1621. */
  1622. int ceph_osdc_start_request(struct ceph_osd_client *osdc,
  1623. struct ceph_osd_request *req,
  1624. bool nofail)
  1625. {
  1626. int rc = 0;
  1627. /* Set up response incoming data and request outgoing data fields */
  1628. ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in);
  1629. ceph_osdc_msg_data_set(req->r_request, &req->r_data_out);
  1630. down_read(&osdc->map_sem);
  1631. mutex_lock(&osdc->request_mutex);
  1632. __register_request(osdc, req);
  1633. WARN_ON(req->r_sent);
  1634. rc = __map_request(osdc, req, 0);
  1635. if (rc < 0) {
  1636. if (nofail) {
  1637. dout("osdc_start_request failed map, "
  1638. " will retry %lld\n", req->r_tid);
  1639. rc = 0;
  1640. }
  1641. goto out_unlock;
  1642. }
  1643. if (req->r_osd == NULL) {
  1644. dout("send_request %p no up osds in pg\n", req);
  1645. ceph_monc_request_next_osdmap(&osdc->client->monc);
  1646. } else {
  1647. __send_queued(osdc);
  1648. }
  1649. rc = 0;
  1650. out_unlock:
  1651. mutex_unlock(&osdc->request_mutex);
  1652. up_read(&osdc->map_sem);
  1653. return rc;
  1654. }
  1655. EXPORT_SYMBOL(ceph_osdc_start_request);
  1656. /*
  1657. * wait for a request to complete
  1658. */
  1659. int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
  1660. struct ceph_osd_request *req)
  1661. {
  1662. int rc;
  1663. rc = wait_for_completion_interruptible(&req->r_completion);
  1664. if (rc < 0) {
  1665. mutex_lock(&osdc->request_mutex);
  1666. __cancel_request(req);
  1667. __unregister_request(osdc, req);
  1668. mutex_unlock(&osdc->request_mutex);
  1669. complete_request(req);
  1670. dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
  1671. return rc;
  1672. }
  1673. dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
  1674. return req->r_result;
  1675. }
  1676. EXPORT_SYMBOL(ceph_osdc_wait_request);
  1677. /*
  1678. * sync - wait for all in-flight requests to flush. avoid starvation.
  1679. */
  1680. void ceph_osdc_sync(struct ceph_osd_client *osdc)
  1681. {
  1682. struct ceph_osd_request *req;
  1683. u64 last_tid, next_tid = 0;
  1684. mutex_lock(&osdc->request_mutex);
  1685. last_tid = osdc->last_tid;
  1686. while (1) {
  1687. req = __lookup_request_ge(osdc, next_tid);
  1688. if (!req)
  1689. break;
  1690. if (req->r_tid > last_tid)
  1691. break;
  1692. next_tid = req->r_tid + 1;
  1693. if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
  1694. continue;
  1695. ceph_osdc_get_request(req);
  1696. mutex_unlock(&osdc->request_mutex);
  1697. dout("sync waiting on tid %llu (last is %llu)\n",
  1698. req->r_tid, last_tid);
  1699. wait_for_completion(&req->r_safe_completion);
  1700. mutex_lock(&osdc->request_mutex);
  1701. ceph_osdc_put_request(req);
  1702. }
  1703. mutex_unlock(&osdc->request_mutex);
  1704. dout("sync done (thru tid %llu)\n", last_tid);
  1705. }
  1706. EXPORT_SYMBOL(ceph_osdc_sync);
  1707. /*
  1708. * init, shutdown
  1709. */
  1710. int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
  1711. {
  1712. int err;
  1713. dout("init\n");
  1714. osdc->client = client;
  1715. osdc->osdmap = NULL;
  1716. init_rwsem(&osdc->map_sem);
  1717. init_completion(&osdc->map_waiters);
  1718. osdc->last_requested_map = 0;
  1719. mutex_init(&osdc->request_mutex);
  1720. osdc->last_tid = 0;
  1721. osdc->osds = RB_ROOT;
  1722. INIT_LIST_HEAD(&osdc->osd_lru);
  1723. osdc->requests = RB_ROOT;
  1724. INIT_LIST_HEAD(&osdc->req_lru);
  1725. INIT_LIST_HEAD(&osdc->req_unsent);
  1726. INIT_LIST_HEAD(&osdc->req_notarget);
  1727. INIT_LIST_HEAD(&osdc->req_linger);
  1728. osdc->num_requests = 0;
  1729. INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
  1730. INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
  1731. spin_lock_init(&osdc->event_lock);
  1732. osdc->event_tree = RB_ROOT;
  1733. osdc->event_count = 0;
  1734. schedule_delayed_work(&osdc->osds_timeout_work,
  1735. round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
  1736. err = -ENOMEM;
  1737. osdc->req_mempool = mempool_create_kmalloc_pool(10,
  1738. sizeof(struct ceph_osd_request));
  1739. if (!osdc->req_mempool)
  1740. goto out;
  1741. err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
  1742. OSD_OP_FRONT_LEN, 10, true,
  1743. "osd_op");
  1744. if (err < 0)
  1745. goto out_mempool;
  1746. err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
  1747. OSD_OPREPLY_FRONT_LEN, 10, true,
  1748. "osd_op_reply");
  1749. if (err < 0)
  1750. goto out_msgpool;
  1751. osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
  1752. if (IS_ERR(osdc->notify_wq)) {
  1753. err = PTR_ERR(osdc->notify_wq);
  1754. osdc->notify_wq = NULL;
  1755. goto out_msgpool;
  1756. }
  1757. return 0;
  1758. out_msgpool:
  1759. ceph_msgpool_destroy(&osdc->msgpool_op);
  1760. out_mempool:
  1761. mempool_destroy(osdc->req_mempool);
  1762. out:
  1763. return err;
  1764. }
  1765. void ceph_osdc_stop(struct ceph_osd_client *osdc)
  1766. {
  1767. flush_workqueue(osdc->notify_wq);
  1768. destroy_workqueue(osdc->notify_wq);
  1769. cancel_delayed_work_sync(&osdc->timeout_work);
  1770. cancel_delayed_work_sync(&osdc->osds_timeout_work);
  1771. if (osdc->osdmap) {
  1772. ceph_osdmap_destroy(osdc->osdmap);
  1773. osdc->osdmap = NULL;
  1774. }
  1775. remove_all_osds(osdc);
  1776. mempool_destroy(osdc->req_mempool);
  1777. ceph_msgpool_destroy(&osdc->msgpool_op);
  1778. ceph_msgpool_destroy(&osdc->msgpool_op_reply);
  1779. }
  1780. /*
  1781. * Read some contiguous pages. If we cross a stripe boundary, shorten
  1782. * *plen. Return number of bytes read, or error.
  1783. */
  1784. int ceph_osdc_readpages(struct ceph_osd_client *osdc,
  1785. struct ceph_vino vino, struct ceph_file_layout *layout,
  1786. u64 off, u64 *plen,
  1787. u32 truncate_seq, u64 truncate_size,
  1788. struct page **pages, int num_pages, int page_align)
  1789. {
  1790. struct ceph_osd_request *req;
  1791. struct ceph_osd_data *osd_data;
  1792. struct ceph_osd_req_op op;
  1793. int rc = 0;
  1794. dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
  1795. vino.snap, off, *plen);
  1796. req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op,
  1797. CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
  1798. NULL, truncate_seq, truncate_size,
  1799. false);
  1800. if (IS_ERR(req))
  1801. return PTR_ERR(req);
  1802. /* it may be a short read due to an object boundary */
  1803. osd_data = &req->r_data_in;
  1804. osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
  1805. osd_data->pages = pages;
  1806. osd_data->length = *plen;
  1807. osd_data->alignment = page_align;
  1808. dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
  1809. off, *plen, osd_data->length, page_align);
  1810. ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL);
  1811. rc = ceph_osdc_start_request(osdc, req, false);
  1812. if (!rc)
  1813. rc = ceph_osdc_wait_request(osdc, req);
  1814. ceph_osdc_put_request(req);
  1815. dout("readpages result %d\n", rc);
  1816. return rc;
  1817. }
  1818. EXPORT_SYMBOL(ceph_osdc_readpages);
  1819. /*
  1820. * do a synchronous write on N pages
  1821. */
  1822. int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
  1823. struct ceph_file_layout *layout,
  1824. struct ceph_snap_context *snapc,
  1825. u64 off, u64 len,
  1826. u32 truncate_seq, u64 truncate_size,
  1827. struct timespec *mtime,
  1828. struct page **pages, int num_pages)
  1829. {
  1830. struct ceph_osd_request *req;
  1831. struct ceph_osd_data *osd_data;
  1832. struct ceph_osd_req_op op;
  1833. int rc = 0;
  1834. int page_align = off & ~PAGE_MASK;
  1835. BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */
  1836. req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op,
  1837. CEPH_OSD_OP_WRITE,
  1838. CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
  1839. snapc, truncate_seq, truncate_size,
  1840. true);
  1841. if (IS_ERR(req))
  1842. return PTR_ERR(req);
  1843. /* it may be a short write due to an object boundary */
  1844. osd_data = &req->r_data_out;
  1845. osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
  1846. osd_data->pages = pages;
  1847. osd_data->length = len;
  1848. osd_data->alignment = page_align;
  1849. dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length);
  1850. ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime);
  1851. rc = ceph_osdc_start_request(osdc, req, true);
  1852. if (!rc)
  1853. rc = ceph_osdc_wait_request(osdc, req);
  1854. ceph_osdc_put_request(req);
  1855. if (rc == 0)
  1856. rc = len;
  1857. dout("writepages result %d\n", rc);
  1858. return rc;
  1859. }
  1860. EXPORT_SYMBOL(ceph_osdc_writepages);
  1861. /*
  1862. * handle incoming message
  1863. */
  1864. static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  1865. {
  1866. struct ceph_osd *osd = con->private;
  1867. struct ceph_osd_client *osdc;
  1868. int type = le16_to_cpu(msg->hdr.type);
  1869. if (!osd)
  1870. goto out;
  1871. osdc = osd->o_osdc;
  1872. switch (type) {
  1873. case CEPH_MSG_OSD_MAP:
  1874. ceph_osdc_handle_map(osdc, msg);
  1875. break;
  1876. case CEPH_MSG_OSD_OPREPLY:
  1877. handle_reply(osdc, msg, con);
  1878. break;
  1879. case CEPH_MSG_WATCH_NOTIFY:
  1880. handle_watch_notify(osdc, msg);
  1881. break;
  1882. default:
  1883. pr_err("received unknown message type %d %s\n", type,
  1884. ceph_msg_type_name(type));
  1885. }
  1886. out:
  1887. ceph_msg_put(msg);
  1888. }
  1889. /*
  1890. * lookup and return message for incoming reply. set up reply message
  1891. * pages.
  1892. */
  1893. static struct ceph_msg *get_reply(struct ceph_connection *con,
  1894. struct ceph_msg_header *hdr,
  1895. int *skip)
  1896. {
  1897. struct ceph_osd *osd = con->private;
  1898. struct ceph_osd_client *osdc = osd->o_osdc;
  1899. struct ceph_msg *m;
  1900. struct ceph_osd_request *req;
  1901. int front = le32_to_cpu(hdr->front_len);
  1902. int data_len = le32_to_cpu(hdr->data_len);
  1903. u64 tid;
  1904. tid = le64_to_cpu(hdr->tid);
  1905. mutex_lock(&osdc->request_mutex);
  1906. req = __lookup_request(osdc, tid);
  1907. if (!req) {
  1908. *skip = 1;
  1909. m = NULL;
  1910. dout("get_reply unknown tid %llu from osd%d\n", tid,
  1911. osd->o_osd);
  1912. goto out;
  1913. }
  1914. if (req->r_reply->con)
  1915. dout("%s revoking msg %p from old con %p\n", __func__,
  1916. req->r_reply, req->r_reply->con);
  1917. ceph_msg_revoke_incoming(req->r_reply);
  1918. if (front > req->r_reply->front.iov_len) {
  1919. pr_warning("get_reply front %d > preallocated %d\n",
  1920. front, (int)req->r_reply->front.iov_len);
  1921. m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
  1922. if (!m)
  1923. goto out;
  1924. ceph_msg_put(req->r_reply);
  1925. req->r_reply = m;
  1926. }
  1927. m = ceph_msg_get(req->r_reply);
  1928. if (data_len > 0) {
  1929. struct ceph_osd_data *osd_data = &req->r_data_in;
  1930. if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
  1931. if (osd_data->pages &&
  1932. unlikely(osd_data->length < data_len)) {
  1933. pr_warning("tid %lld reply has %d bytes "
  1934. "we had only %llu bytes ready\n",
  1935. tid, data_len, osd_data->length);
  1936. *skip = 1;
  1937. ceph_msg_put(m);
  1938. m = NULL;
  1939. goto out;
  1940. }
  1941. }
  1942. }
  1943. *skip = 0;
  1944. dout("get_reply tid %lld %p\n", tid, m);
  1945. out:
  1946. mutex_unlock(&osdc->request_mutex);
  1947. return m;
  1948. }
  1949. static struct ceph_msg *alloc_msg(struct ceph_connection *con,
  1950. struct ceph_msg_header *hdr,
  1951. int *skip)
  1952. {
  1953. struct ceph_osd *osd = con->private;
  1954. int type = le16_to_cpu(hdr->type);
  1955. int front = le32_to_cpu(hdr->front_len);
  1956. *skip = 0;
  1957. switch (type) {
  1958. case CEPH_MSG_OSD_MAP:
  1959. case CEPH_MSG_WATCH_NOTIFY:
  1960. return ceph_msg_new(type, front, GFP_NOFS, false);
  1961. case CEPH_MSG_OSD_OPREPLY:
  1962. return get_reply(con, hdr, skip);
  1963. default:
  1964. pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
  1965. osd->o_osd);
  1966. *skip = 1;
  1967. return NULL;
  1968. }
  1969. }
  1970. /*
  1971. * Wrappers to refcount containing ceph_osd struct
  1972. */
  1973. static struct ceph_connection *get_osd_con(struct ceph_connection *con)
  1974. {
  1975. struct ceph_osd *osd = con->private;
  1976. if (get_osd(osd))
  1977. return con;
  1978. return NULL;
  1979. }
  1980. static void put_osd_con(struct ceph_connection *con)
  1981. {
  1982. struct ceph_osd *osd = con->private;
  1983. put_osd(osd);
  1984. }
  1985. /*
  1986. * authentication
  1987. */
  1988. /*
  1989. * Note: returned pointer is the address of a structure that's
  1990. * managed separately. Caller must *not* attempt to free it.
  1991. */
  1992. static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
  1993. int *proto, int force_new)
  1994. {
  1995. struct ceph_osd *o = con->private;
  1996. struct ceph_osd_client *osdc = o->o_osdc;
  1997. struct ceph_auth_client *ac = osdc->client->monc.auth;
  1998. struct ceph_auth_handshake *auth = &o->o_auth;
  1999. if (force_new && auth->authorizer) {
  2000. ceph_auth_destroy_authorizer(ac, auth->authorizer);
  2001. auth->authorizer = NULL;
  2002. }
  2003. if (!auth->authorizer) {
  2004. int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
  2005. auth);
  2006. if (ret)
  2007. return ERR_PTR(ret);
  2008. } else {
  2009. int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
  2010. auth);
  2011. if (ret)
  2012. return ERR_PTR(ret);
  2013. }
  2014. *proto = ac->protocol;
  2015. return auth;
  2016. }
  2017. static int verify_authorizer_reply(struct ceph_connection *con, int len)
  2018. {
  2019. struct ceph_osd *o = con->private;
  2020. struct ceph_osd_client *osdc = o->o_osdc;
  2021. struct ceph_auth_client *ac = osdc->client->monc.auth;
  2022. return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
  2023. }
  2024. static int invalidate_authorizer(struct ceph_connection *con)
  2025. {
  2026. struct ceph_osd *o = con->private;
  2027. struct ceph_osd_client *osdc = o->o_osdc;
  2028. struct ceph_auth_client *ac = osdc->client->monc.auth;
  2029. ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
  2030. return ceph_monc_validate_auth(&osdc->client->monc);
  2031. }
  2032. static const struct ceph_connection_operations osd_con_ops = {
  2033. .get = get_osd_con,
  2034. .put = put_osd_con,
  2035. .dispatch = dispatch,
  2036. .get_authorizer = get_authorizer,
  2037. .verify_authorizer_reply = verify_authorizer_reply,
  2038. .invalidate_authorizer = invalidate_authorizer,
  2039. .alloc_msg = alloc_msg,
  2040. .fault = osd_reset,
  2041. };