osd_client.c 117 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/module.h>
  3. #include <linux/err.h>
  4. #include <linux/highmem.h>
  5. #include <linux/mm.h>
  6. #include <linux/pagemap.h>
  7. #include <linux/slab.h>
  8. #include <linux/uaccess.h>
  9. #ifdef CONFIG_BLOCK
  10. #include <linux/bio.h>
  11. #endif
  12. #include <linux/ceph/libceph.h>
  13. #include <linux/ceph/osd_client.h>
  14. #include <linux/ceph/messenger.h>
  15. #include <linux/ceph/decode.h>
  16. #include <linux/ceph/auth.h>
  17. #include <linux/ceph/pagelist.h>
  18. #define OSD_OPREPLY_FRONT_LEN 512
  19. static struct kmem_cache *ceph_osd_request_cache;
  20. static const struct ceph_connection_operations osd_con_ops;
  21. /*
  22. * Implement client access to distributed object storage cluster.
  23. *
  24. * All data objects are stored within a cluster/cloud of OSDs, or
  25. * "object storage devices." (Note that Ceph OSDs have _nothing_ to
  26. * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
  27. * remote daemons serving up and coordinating consistent and safe
  28. * access to storage.
  29. *
  30. * Cluster membership and the mapping of data objects onto storage devices
  31. * are described by the osd map.
  32. *
  33. * We keep track of pending OSD requests (read, write), resubmit
  34. * requests to different OSDs when the cluster topology/data layout
  35. * change, or retry the affected requests when the communications
  36. * channel with an OSD is reset.
  37. */
  38. static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
  39. static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
  40. static void link_linger(struct ceph_osd *osd,
  41. struct ceph_osd_linger_request *lreq);
  42. static void unlink_linger(struct ceph_osd *osd,
  43. struct ceph_osd_linger_request *lreq);
  44. #if 1
  45. static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
  46. {
  47. bool wrlocked = true;
  48. if (unlikely(down_read_trylock(sem))) {
  49. wrlocked = false;
  50. up_read(sem);
  51. }
  52. return wrlocked;
  53. }
  54. static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
  55. {
  56. WARN_ON(!rwsem_is_locked(&osdc->lock));
  57. }
  58. static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
  59. {
  60. WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
  61. }
  62. static inline void verify_osd_locked(struct ceph_osd *osd)
  63. {
  64. struct ceph_osd_client *osdc = osd->o_osdc;
  65. WARN_ON(!(mutex_is_locked(&osd->lock) &&
  66. rwsem_is_locked(&osdc->lock)) &&
  67. !rwsem_is_wrlocked(&osdc->lock));
  68. }
  69. static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
  70. {
  71. WARN_ON(!mutex_is_locked(&lreq->lock));
  72. }
  73. #else
  74. static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
  75. static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
  76. static inline void verify_osd_locked(struct ceph_osd *osd) { }
  77. static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
  78. #endif
  79. /*
  80. * calculate the mapping of a file extent onto an object, and fill out the
  81. * request accordingly. shorten extent as necessary if it crosses an
  82. * object boundary.
  83. *
  84. * fill osd op in request message.
  85. */
  86. static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
  87. u64 *objnum, u64 *objoff, u64 *objlen)
  88. {
  89. u64 orig_len = *plen;
  90. int r;
  91. /* object extent? */
  92. r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
  93. objoff, objlen);
  94. if (r < 0)
  95. return r;
  96. if (*objlen < orig_len) {
  97. *plen = *objlen;
  98. dout(" skipping last %llu, final file extent %llu~%llu\n",
  99. orig_len - *plen, off, *plen);
  100. }
  101. dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
  102. return 0;
  103. }
  104. static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
  105. {
  106. memset(osd_data, 0, sizeof (*osd_data));
  107. osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
  108. }
  109. static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
  110. struct page **pages, u64 length, u32 alignment,
  111. bool pages_from_pool, bool own_pages)
  112. {
  113. osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
  114. osd_data->pages = pages;
  115. osd_data->length = length;
  116. osd_data->alignment = alignment;
  117. osd_data->pages_from_pool = pages_from_pool;
  118. osd_data->own_pages = own_pages;
  119. }
  120. static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
  121. struct ceph_pagelist *pagelist)
  122. {
  123. osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
  124. osd_data->pagelist = pagelist;
  125. }
  126. #ifdef CONFIG_BLOCK
  127. static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
  128. struct bio *bio, size_t bio_length)
  129. {
  130. osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
  131. osd_data->bio = bio;
  132. osd_data->bio_length = bio_length;
  133. }
  134. #endif /* CONFIG_BLOCK */
  135. #define osd_req_op_data(oreq, whch, typ, fld) \
  136. ({ \
  137. struct ceph_osd_request *__oreq = (oreq); \
  138. unsigned int __whch = (whch); \
  139. BUG_ON(__whch >= __oreq->r_num_ops); \
  140. &__oreq->r_ops[__whch].typ.fld; \
  141. })
  142. static struct ceph_osd_data *
  143. osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
  144. {
  145. BUG_ON(which >= osd_req->r_num_ops);
  146. return &osd_req->r_ops[which].raw_data_in;
  147. }
  148. struct ceph_osd_data *
  149. osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
  150. unsigned int which)
  151. {
  152. return osd_req_op_data(osd_req, which, extent, osd_data);
  153. }
  154. EXPORT_SYMBOL(osd_req_op_extent_osd_data);
  155. void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
  156. unsigned int which, struct page **pages,
  157. u64 length, u32 alignment,
  158. bool pages_from_pool, bool own_pages)
  159. {
  160. struct ceph_osd_data *osd_data;
  161. osd_data = osd_req_op_raw_data_in(osd_req, which);
  162. ceph_osd_data_pages_init(osd_data, pages, length, alignment,
  163. pages_from_pool, own_pages);
  164. }
  165. EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
  166. void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
  167. unsigned int which, struct page **pages,
  168. u64 length, u32 alignment,
  169. bool pages_from_pool, bool own_pages)
  170. {
  171. struct ceph_osd_data *osd_data;
  172. osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
  173. ceph_osd_data_pages_init(osd_data, pages, length, alignment,
  174. pages_from_pool, own_pages);
  175. }
  176. EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
  177. void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
  178. unsigned int which, struct ceph_pagelist *pagelist)
  179. {
  180. struct ceph_osd_data *osd_data;
  181. osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
  182. ceph_osd_data_pagelist_init(osd_data, pagelist);
  183. }
  184. EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
  185. #ifdef CONFIG_BLOCK
  186. void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
  187. unsigned int which, struct bio *bio, size_t bio_length)
  188. {
  189. struct ceph_osd_data *osd_data;
  190. osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
  191. ceph_osd_data_bio_init(osd_data, bio, bio_length);
  192. }
  193. EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
  194. #endif /* CONFIG_BLOCK */
  195. static void osd_req_op_cls_request_info_pagelist(
  196. struct ceph_osd_request *osd_req,
  197. unsigned int which, struct ceph_pagelist *pagelist)
  198. {
  199. struct ceph_osd_data *osd_data;
  200. osd_data = osd_req_op_data(osd_req, which, cls, request_info);
  201. ceph_osd_data_pagelist_init(osd_data, pagelist);
  202. }
  203. void osd_req_op_cls_request_data_pagelist(
  204. struct ceph_osd_request *osd_req,
  205. unsigned int which, struct ceph_pagelist *pagelist)
  206. {
  207. struct ceph_osd_data *osd_data;
  208. osd_data = osd_req_op_data(osd_req, which, cls, request_data);
  209. ceph_osd_data_pagelist_init(osd_data, pagelist);
  210. osd_req->r_ops[which].cls.indata_len += pagelist->length;
  211. osd_req->r_ops[which].indata_len += pagelist->length;
  212. }
  213. EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
  214. void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
  215. unsigned int which, struct page **pages, u64 length,
  216. u32 alignment, bool pages_from_pool, bool own_pages)
  217. {
  218. struct ceph_osd_data *osd_data;
  219. osd_data = osd_req_op_data(osd_req, which, cls, request_data);
  220. ceph_osd_data_pages_init(osd_data, pages, length, alignment,
  221. pages_from_pool, own_pages);
  222. osd_req->r_ops[which].cls.indata_len += length;
  223. osd_req->r_ops[which].indata_len += length;
  224. }
  225. EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
  226. void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
  227. unsigned int which, struct page **pages, u64 length,
  228. u32 alignment, bool pages_from_pool, bool own_pages)
  229. {
  230. struct ceph_osd_data *osd_data;
  231. osd_data = osd_req_op_data(osd_req, which, cls, response_data);
  232. ceph_osd_data_pages_init(osd_data, pages, length, alignment,
  233. pages_from_pool, own_pages);
  234. }
  235. EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
  236. static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
  237. {
  238. switch (osd_data->type) {
  239. case CEPH_OSD_DATA_TYPE_NONE:
  240. return 0;
  241. case CEPH_OSD_DATA_TYPE_PAGES:
  242. return osd_data->length;
  243. case CEPH_OSD_DATA_TYPE_PAGELIST:
  244. return (u64)osd_data->pagelist->length;
  245. #ifdef CONFIG_BLOCK
  246. case CEPH_OSD_DATA_TYPE_BIO:
  247. return (u64)osd_data->bio_length;
  248. #endif /* CONFIG_BLOCK */
  249. default:
  250. WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
  251. return 0;
  252. }
  253. }
  254. static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
  255. {
  256. if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
  257. int num_pages;
  258. num_pages = calc_pages_for((u64)osd_data->alignment,
  259. (u64)osd_data->length);
  260. ceph_release_page_vector(osd_data->pages, num_pages);
  261. }
  262. ceph_osd_data_init(osd_data);
  263. }
  264. static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
  265. unsigned int which)
  266. {
  267. struct ceph_osd_req_op *op;
  268. BUG_ON(which >= osd_req->r_num_ops);
  269. op = &osd_req->r_ops[which];
  270. switch (op->op) {
  271. case CEPH_OSD_OP_READ:
  272. case CEPH_OSD_OP_WRITE:
  273. case CEPH_OSD_OP_WRITEFULL:
  274. ceph_osd_data_release(&op->extent.osd_data);
  275. break;
  276. case CEPH_OSD_OP_CALL:
  277. ceph_osd_data_release(&op->cls.request_info);
  278. ceph_osd_data_release(&op->cls.request_data);
  279. ceph_osd_data_release(&op->cls.response_data);
  280. break;
  281. case CEPH_OSD_OP_SETXATTR:
  282. case CEPH_OSD_OP_CMPXATTR:
  283. ceph_osd_data_release(&op->xattr.osd_data);
  284. break;
  285. case CEPH_OSD_OP_STAT:
  286. ceph_osd_data_release(&op->raw_data_in);
  287. break;
  288. case CEPH_OSD_OP_NOTIFY_ACK:
  289. ceph_osd_data_release(&op->notify_ack.request_data);
  290. break;
  291. case CEPH_OSD_OP_NOTIFY:
  292. ceph_osd_data_release(&op->notify.request_data);
  293. ceph_osd_data_release(&op->notify.response_data);
  294. break;
  295. case CEPH_OSD_OP_LIST_WATCHERS:
  296. ceph_osd_data_release(&op->list_watchers.response_data);
  297. break;
  298. default:
  299. break;
  300. }
  301. }
  302. /*
  303. * Assumes @t is zero-initialized.
  304. */
  305. static void target_init(struct ceph_osd_request_target *t)
  306. {
  307. ceph_oid_init(&t->base_oid);
  308. ceph_oloc_init(&t->base_oloc);
  309. ceph_oid_init(&t->target_oid);
  310. ceph_oloc_init(&t->target_oloc);
  311. ceph_osds_init(&t->acting);
  312. ceph_osds_init(&t->up);
  313. t->size = -1;
  314. t->min_size = -1;
  315. t->osd = CEPH_HOMELESS_OSD;
  316. }
  317. static void target_copy(struct ceph_osd_request_target *dest,
  318. const struct ceph_osd_request_target *src)
  319. {
  320. ceph_oid_copy(&dest->base_oid, &src->base_oid);
  321. ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
  322. ceph_oid_copy(&dest->target_oid, &src->target_oid);
  323. ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
  324. dest->pgid = src->pgid; /* struct */
  325. dest->pg_num = src->pg_num;
  326. dest->pg_num_mask = src->pg_num_mask;
  327. ceph_osds_copy(&dest->acting, &src->acting);
  328. ceph_osds_copy(&dest->up, &src->up);
  329. dest->size = src->size;
  330. dest->min_size = src->min_size;
  331. dest->sort_bitwise = src->sort_bitwise;
  332. dest->flags = src->flags;
  333. dest->paused = src->paused;
  334. dest->osd = src->osd;
  335. }
  336. static void target_destroy(struct ceph_osd_request_target *t)
  337. {
  338. ceph_oid_destroy(&t->base_oid);
  339. ceph_oloc_destroy(&t->base_oloc);
  340. ceph_oid_destroy(&t->target_oid);
  341. ceph_oloc_destroy(&t->target_oloc);
  342. }
  343. /*
  344. * requests
  345. */
  346. static void request_release_checks(struct ceph_osd_request *req)
  347. {
  348. WARN_ON(!RB_EMPTY_NODE(&req->r_node));
  349. WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
  350. WARN_ON(!list_empty(&req->r_unsafe_item));
  351. WARN_ON(req->r_osd);
  352. }
  353. static void ceph_osdc_release_request(struct kref *kref)
  354. {
  355. struct ceph_osd_request *req = container_of(kref,
  356. struct ceph_osd_request, r_kref);
  357. unsigned int which;
  358. dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
  359. req->r_request, req->r_reply);
  360. request_release_checks(req);
  361. if (req->r_request)
  362. ceph_msg_put(req->r_request);
  363. if (req->r_reply)
  364. ceph_msg_put(req->r_reply);
  365. for (which = 0; which < req->r_num_ops; which++)
  366. osd_req_op_data_release(req, which);
  367. target_destroy(&req->r_t);
  368. ceph_put_snap_context(req->r_snapc);
  369. if (req->r_mempool)
  370. mempool_free(req, req->r_osdc->req_mempool);
  371. else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
  372. kmem_cache_free(ceph_osd_request_cache, req);
  373. else
  374. kfree(req);
  375. }
  376. void ceph_osdc_get_request(struct ceph_osd_request *req)
  377. {
  378. dout("%s %p (was %d)\n", __func__, req,
  379. atomic_read(&req->r_kref.refcount));
  380. kref_get(&req->r_kref);
  381. }
  382. EXPORT_SYMBOL(ceph_osdc_get_request);
  383. void ceph_osdc_put_request(struct ceph_osd_request *req)
  384. {
  385. if (req) {
  386. dout("%s %p (was %d)\n", __func__, req,
  387. atomic_read(&req->r_kref.refcount));
  388. kref_put(&req->r_kref, ceph_osdc_release_request);
  389. }
  390. }
  391. EXPORT_SYMBOL(ceph_osdc_put_request);
  392. static void request_init(struct ceph_osd_request *req)
  393. {
  394. /* req only, each op is zeroed in _osd_req_op_init() */
  395. memset(req, 0, sizeof(*req));
  396. kref_init(&req->r_kref);
  397. init_completion(&req->r_completion);
  398. init_completion(&req->r_done_completion);
  399. RB_CLEAR_NODE(&req->r_node);
  400. RB_CLEAR_NODE(&req->r_mc_node);
  401. INIT_LIST_HEAD(&req->r_unsafe_item);
  402. target_init(&req->r_t);
  403. }
  404. /*
  405. * This is ugly, but it allows us to reuse linger registration and ping
  406. * requests, keeping the structure of the code around send_linger{_ping}()
  407. * reasonable. Setting up a min_nr=2 mempool for each linger request
  408. * and dealing with copying ops (this blasts req only, watch op remains
  409. * intact) isn't any better.
  410. */
  411. static void request_reinit(struct ceph_osd_request *req)
  412. {
  413. struct ceph_osd_client *osdc = req->r_osdc;
  414. bool mempool = req->r_mempool;
  415. unsigned int num_ops = req->r_num_ops;
  416. u64 snapid = req->r_snapid;
  417. struct ceph_snap_context *snapc = req->r_snapc;
  418. bool linger = req->r_linger;
  419. struct ceph_msg *request_msg = req->r_request;
  420. struct ceph_msg *reply_msg = req->r_reply;
  421. dout("%s req %p\n", __func__, req);
  422. WARN_ON(atomic_read(&req->r_kref.refcount) != 1);
  423. request_release_checks(req);
  424. WARN_ON(atomic_read(&request_msg->kref.refcount) != 1);
  425. WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1);
  426. target_destroy(&req->r_t);
  427. request_init(req);
  428. req->r_osdc = osdc;
  429. req->r_mempool = mempool;
  430. req->r_num_ops = num_ops;
  431. req->r_snapid = snapid;
  432. req->r_snapc = snapc;
  433. req->r_linger = linger;
  434. req->r_request = request_msg;
  435. req->r_reply = reply_msg;
  436. }
  437. struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
  438. struct ceph_snap_context *snapc,
  439. unsigned int num_ops,
  440. bool use_mempool,
  441. gfp_t gfp_flags)
  442. {
  443. struct ceph_osd_request *req;
  444. if (use_mempool) {
  445. BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
  446. req = mempool_alloc(osdc->req_mempool, gfp_flags);
  447. } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
  448. req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
  449. } else {
  450. BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
  451. req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
  452. gfp_flags);
  453. }
  454. if (unlikely(!req))
  455. return NULL;
  456. request_init(req);
  457. req->r_osdc = osdc;
  458. req->r_mempool = use_mempool;
  459. req->r_num_ops = num_ops;
  460. req->r_snapid = CEPH_NOSNAP;
  461. req->r_snapc = ceph_get_snap_context(snapc);
  462. dout("%s req %p\n", __func__, req);
  463. return req;
  464. }
  465. EXPORT_SYMBOL(ceph_osdc_alloc_request);
  466. static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
  467. {
  468. return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
  469. }
  470. int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
  471. {
  472. struct ceph_osd_client *osdc = req->r_osdc;
  473. struct ceph_msg *msg;
  474. int msg_size;
  475. WARN_ON(ceph_oid_empty(&req->r_base_oid));
  476. WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
  477. /* create request message */
  478. msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
  479. msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
  480. msg_size += CEPH_ENCODING_START_BLK_LEN +
  481. ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
  482. msg_size += 1 + 8 + 4 + 4; /* pgid */
  483. msg_size += 4 + req->r_base_oid.name_len; /* oid */
  484. msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
  485. msg_size += 8; /* snapid */
  486. msg_size += 8; /* snap_seq */
  487. msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
  488. msg_size += 4; /* retry_attempt */
  489. if (req->r_mempool)
  490. msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
  491. else
  492. msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
  493. if (!msg)
  494. return -ENOMEM;
  495. memset(msg->front.iov_base, 0, msg->front.iov_len);
  496. req->r_request = msg;
  497. /* create reply message */
  498. msg_size = OSD_OPREPLY_FRONT_LEN;
  499. msg_size += req->r_base_oid.name_len;
  500. msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
  501. if (req->r_mempool)
  502. msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
  503. else
  504. msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
  505. if (!msg)
  506. return -ENOMEM;
  507. req->r_reply = msg;
  508. return 0;
  509. }
  510. EXPORT_SYMBOL(ceph_osdc_alloc_messages);
  511. static bool osd_req_opcode_valid(u16 opcode)
  512. {
  513. switch (opcode) {
  514. #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
  515. __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
  516. #undef GENERATE_CASE
  517. default:
  518. return false;
  519. }
  520. }
  521. /*
  522. * This is an osd op init function for opcodes that have no data or
  523. * other information associated with them. It also serves as a
  524. * common init routine for all the other init functions, below.
  525. */
  526. static struct ceph_osd_req_op *
  527. _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
  528. u16 opcode, u32 flags)
  529. {
  530. struct ceph_osd_req_op *op;
  531. BUG_ON(which >= osd_req->r_num_ops);
  532. BUG_ON(!osd_req_opcode_valid(opcode));
  533. op = &osd_req->r_ops[which];
  534. memset(op, 0, sizeof (*op));
  535. op->op = opcode;
  536. op->flags = flags;
  537. return op;
  538. }
  539. void osd_req_op_init(struct ceph_osd_request *osd_req,
  540. unsigned int which, u16 opcode, u32 flags)
  541. {
  542. (void)_osd_req_op_init(osd_req, which, opcode, flags);
  543. }
  544. EXPORT_SYMBOL(osd_req_op_init);
  545. void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
  546. unsigned int which, u16 opcode,
  547. u64 offset, u64 length,
  548. u64 truncate_size, u32 truncate_seq)
  549. {
  550. struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
  551. opcode, 0);
  552. size_t payload_len = 0;
  553. BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
  554. opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
  555. opcode != CEPH_OSD_OP_TRUNCATE);
  556. op->extent.offset = offset;
  557. op->extent.length = length;
  558. op->extent.truncate_size = truncate_size;
  559. op->extent.truncate_seq = truncate_seq;
  560. if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
  561. payload_len += length;
  562. op->indata_len = payload_len;
  563. }
  564. EXPORT_SYMBOL(osd_req_op_extent_init);
  565. void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
  566. unsigned int which, u64 length)
  567. {
  568. struct ceph_osd_req_op *op;
  569. u64 previous;
  570. BUG_ON(which >= osd_req->r_num_ops);
  571. op = &osd_req->r_ops[which];
  572. previous = op->extent.length;
  573. if (length == previous)
  574. return; /* Nothing to do */
  575. BUG_ON(length > previous);
  576. op->extent.length = length;
  577. op->indata_len -= previous - length;
  578. }
  579. EXPORT_SYMBOL(osd_req_op_extent_update);
  580. void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
  581. unsigned int which, u64 offset_inc)
  582. {
  583. struct ceph_osd_req_op *op, *prev_op;
  584. BUG_ON(which + 1 >= osd_req->r_num_ops);
  585. prev_op = &osd_req->r_ops[which];
  586. op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
  587. /* dup previous one */
  588. op->indata_len = prev_op->indata_len;
  589. op->outdata_len = prev_op->outdata_len;
  590. op->extent = prev_op->extent;
  591. /* adjust offset */
  592. op->extent.offset += offset_inc;
  593. op->extent.length -= offset_inc;
  594. if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
  595. op->indata_len -= offset_inc;
  596. }
  597. EXPORT_SYMBOL(osd_req_op_extent_dup_last);
  598. void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
  599. u16 opcode, const char *class, const char *method)
  600. {
  601. struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
  602. opcode, 0);
  603. struct ceph_pagelist *pagelist;
  604. size_t payload_len = 0;
  605. size_t size;
  606. BUG_ON(opcode != CEPH_OSD_OP_CALL);
  607. pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
  608. BUG_ON(!pagelist);
  609. ceph_pagelist_init(pagelist);
  610. op->cls.class_name = class;
  611. size = strlen(class);
  612. BUG_ON(size > (size_t) U8_MAX);
  613. op->cls.class_len = size;
  614. ceph_pagelist_append(pagelist, class, size);
  615. payload_len += size;
  616. op->cls.method_name = method;
  617. size = strlen(method);
  618. BUG_ON(size > (size_t) U8_MAX);
  619. op->cls.method_len = size;
  620. ceph_pagelist_append(pagelist, method, size);
  621. payload_len += size;
  622. osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
  623. op->indata_len = payload_len;
  624. }
  625. EXPORT_SYMBOL(osd_req_op_cls_init);
  626. int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
  627. u16 opcode, const char *name, const void *value,
  628. size_t size, u8 cmp_op, u8 cmp_mode)
  629. {
  630. struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
  631. opcode, 0);
  632. struct ceph_pagelist *pagelist;
  633. size_t payload_len;
  634. BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
  635. pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
  636. if (!pagelist)
  637. return -ENOMEM;
  638. ceph_pagelist_init(pagelist);
  639. payload_len = strlen(name);
  640. op->xattr.name_len = payload_len;
  641. ceph_pagelist_append(pagelist, name, payload_len);
  642. op->xattr.value_len = size;
  643. ceph_pagelist_append(pagelist, value, size);
  644. payload_len += size;
  645. op->xattr.cmp_op = cmp_op;
  646. op->xattr.cmp_mode = cmp_mode;
  647. ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
  648. op->indata_len = payload_len;
  649. return 0;
  650. }
  651. EXPORT_SYMBOL(osd_req_op_xattr_init);
  652. /*
  653. * @watch_opcode: CEPH_OSD_WATCH_OP_*
  654. */
  655. static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
  656. u64 cookie, u8 watch_opcode)
  657. {
  658. struct ceph_osd_req_op *op;
  659. op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
  660. op->watch.cookie = cookie;
  661. op->watch.op = watch_opcode;
  662. op->watch.gen = 0;
  663. }
  664. void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
  665. unsigned int which,
  666. u64 expected_object_size,
  667. u64 expected_write_size)
  668. {
  669. struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
  670. CEPH_OSD_OP_SETALLOCHINT,
  671. 0);
  672. op->alloc_hint.expected_object_size = expected_object_size;
  673. op->alloc_hint.expected_write_size = expected_write_size;
  674. /*
  675. * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
  676. * not worth a feature bit. Set FAILOK per-op flag to make
  677. * sure older osds don't trip over an unsupported opcode.
  678. */
  679. op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
  680. }
  681. EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
  682. static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
  683. struct ceph_osd_data *osd_data)
  684. {
  685. u64 length = ceph_osd_data_length(osd_data);
  686. if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
  687. BUG_ON(length > (u64) SIZE_MAX);
  688. if (length)
  689. ceph_msg_data_add_pages(msg, osd_data->pages,
  690. length, osd_data->alignment);
  691. } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
  692. BUG_ON(!length);
  693. ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
  694. #ifdef CONFIG_BLOCK
  695. } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
  696. ceph_msg_data_add_bio(msg, osd_data->bio, length);
  697. #endif
  698. } else {
  699. BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
  700. }
  701. }
  702. static u32 osd_req_encode_op(struct ceph_osd_op *dst,
  703. const struct ceph_osd_req_op *src)
  704. {
  705. if (WARN_ON(!osd_req_opcode_valid(src->op))) {
  706. pr_err("unrecognized osd opcode %d\n", src->op);
  707. return 0;
  708. }
  709. switch (src->op) {
  710. case CEPH_OSD_OP_STAT:
  711. break;
  712. case CEPH_OSD_OP_READ:
  713. case CEPH_OSD_OP_WRITE:
  714. case CEPH_OSD_OP_WRITEFULL:
  715. case CEPH_OSD_OP_ZERO:
  716. case CEPH_OSD_OP_TRUNCATE:
  717. dst->extent.offset = cpu_to_le64(src->extent.offset);
  718. dst->extent.length = cpu_to_le64(src->extent.length);
  719. dst->extent.truncate_size =
  720. cpu_to_le64(src->extent.truncate_size);
  721. dst->extent.truncate_seq =
  722. cpu_to_le32(src->extent.truncate_seq);
  723. break;
  724. case CEPH_OSD_OP_CALL:
  725. dst->cls.class_len = src->cls.class_len;
  726. dst->cls.method_len = src->cls.method_len;
  727. dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
  728. break;
  729. case CEPH_OSD_OP_STARTSYNC:
  730. break;
  731. case CEPH_OSD_OP_WATCH:
  732. dst->watch.cookie = cpu_to_le64(src->watch.cookie);
  733. dst->watch.ver = cpu_to_le64(0);
  734. dst->watch.op = src->watch.op;
  735. dst->watch.gen = cpu_to_le32(src->watch.gen);
  736. break;
  737. case CEPH_OSD_OP_NOTIFY_ACK:
  738. break;
  739. case CEPH_OSD_OP_NOTIFY:
  740. dst->notify.cookie = cpu_to_le64(src->notify.cookie);
  741. break;
  742. case CEPH_OSD_OP_LIST_WATCHERS:
  743. break;
  744. case CEPH_OSD_OP_SETALLOCHINT:
  745. dst->alloc_hint.expected_object_size =
  746. cpu_to_le64(src->alloc_hint.expected_object_size);
  747. dst->alloc_hint.expected_write_size =
  748. cpu_to_le64(src->alloc_hint.expected_write_size);
  749. break;
  750. case CEPH_OSD_OP_SETXATTR:
  751. case CEPH_OSD_OP_CMPXATTR:
  752. dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
  753. dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
  754. dst->xattr.cmp_op = src->xattr.cmp_op;
  755. dst->xattr.cmp_mode = src->xattr.cmp_mode;
  756. break;
  757. case CEPH_OSD_OP_CREATE:
  758. case CEPH_OSD_OP_DELETE:
  759. break;
  760. default:
  761. pr_err("unsupported osd opcode %s\n",
  762. ceph_osd_op_name(src->op));
  763. WARN_ON(1);
  764. return 0;
  765. }
  766. dst->op = cpu_to_le16(src->op);
  767. dst->flags = cpu_to_le32(src->flags);
  768. dst->payload_len = cpu_to_le32(src->indata_len);
  769. return src->indata_len;
  770. }
  771. /*
  772. * build new request AND message, calculate layout, and adjust file
  773. * extent as needed.
  774. *
  775. * if the file was recently truncated, we include information about its
  776. * old and new size so that the object can be updated appropriately. (we
  777. * avoid synchronously deleting truncated objects because it's slow.)
  778. *
  779. * if @do_sync, include a 'startsync' command so that the osd will flush
  780. * data quickly.
  781. */
  782. struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
  783. struct ceph_file_layout *layout,
  784. struct ceph_vino vino,
  785. u64 off, u64 *plen,
  786. unsigned int which, int num_ops,
  787. int opcode, int flags,
  788. struct ceph_snap_context *snapc,
  789. u32 truncate_seq,
  790. u64 truncate_size,
  791. bool use_mempool)
  792. {
  793. struct ceph_osd_request *req;
  794. u64 objnum = 0;
  795. u64 objoff = 0;
  796. u64 objlen = 0;
  797. int r;
  798. BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
  799. opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
  800. opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
  801. req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
  802. GFP_NOFS);
  803. if (!req) {
  804. r = -ENOMEM;
  805. goto fail;
  806. }
  807. /* calculate max write size */
  808. r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
  809. if (r)
  810. goto fail;
  811. if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
  812. osd_req_op_init(req, which, opcode, 0);
  813. } else {
  814. u32 object_size = layout->object_size;
  815. u32 object_base = off - objoff;
  816. if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
  817. if (truncate_size <= object_base) {
  818. truncate_size = 0;
  819. } else {
  820. truncate_size -= object_base;
  821. if (truncate_size > object_size)
  822. truncate_size = object_size;
  823. }
  824. }
  825. osd_req_op_extent_init(req, which, opcode, objoff, objlen,
  826. truncate_size, truncate_seq);
  827. }
  828. req->r_flags = flags;
  829. req->r_base_oloc.pool = layout->pool_id;
  830. req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
  831. ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
  832. req->r_snapid = vino.snap;
  833. if (flags & CEPH_OSD_FLAG_WRITE)
  834. req->r_data_offset = off;
  835. r = ceph_osdc_alloc_messages(req, GFP_NOFS);
  836. if (r)
  837. goto fail;
  838. return req;
  839. fail:
  840. ceph_osdc_put_request(req);
  841. return ERR_PTR(r);
  842. }
  843. EXPORT_SYMBOL(ceph_osdc_new_request);
  844. /*
  845. * We keep osd requests in an rbtree, sorted by ->r_tid.
  846. */
  847. DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
  848. DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
  849. static bool osd_homeless(struct ceph_osd *osd)
  850. {
  851. return osd->o_osd == CEPH_HOMELESS_OSD;
  852. }
  853. static bool osd_registered(struct ceph_osd *osd)
  854. {
  855. verify_osdc_locked(osd->o_osdc);
  856. return !RB_EMPTY_NODE(&osd->o_node);
  857. }
  858. /*
  859. * Assumes @osd is zero-initialized.
  860. */
  861. static void osd_init(struct ceph_osd *osd)
  862. {
  863. atomic_set(&osd->o_ref, 1);
  864. RB_CLEAR_NODE(&osd->o_node);
  865. osd->o_requests = RB_ROOT;
  866. osd->o_linger_requests = RB_ROOT;
  867. INIT_LIST_HEAD(&osd->o_osd_lru);
  868. INIT_LIST_HEAD(&osd->o_keepalive_item);
  869. osd->o_incarnation = 1;
  870. mutex_init(&osd->lock);
  871. }
  872. static void osd_cleanup(struct ceph_osd *osd)
  873. {
  874. WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
  875. WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
  876. WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
  877. WARN_ON(!list_empty(&osd->o_osd_lru));
  878. WARN_ON(!list_empty(&osd->o_keepalive_item));
  879. if (osd->o_auth.authorizer) {
  880. WARN_ON(osd_homeless(osd));
  881. ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
  882. }
  883. }
  884. /*
  885. * Track open sessions with osds.
  886. */
  887. static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
  888. {
  889. struct ceph_osd *osd;
  890. WARN_ON(onum == CEPH_HOMELESS_OSD);
  891. osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
  892. osd_init(osd);
  893. osd->o_osdc = osdc;
  894. osd->o_osd = onum;
  895. ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
  896. return osd;
  897. }
  898. static struct ceph_osd *get_osd(struct ceph_osd *osd)
  899. {
  900. if (atomic_inc_not_zero(&osd->o_ref)) {
  901. dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
  902. atomic_read(&osd->o_ref));
  903. return osd;
  904. } else {
  905. dout("get_osd %p FAIL\n", osd);
  906. return NULL;
  907. }
  908. }
  909. static void put_osd(struct ceph_osd *osd)
  910. {
  911. dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
  912. atomic_read(&osd->o_ref) - 1);
  913. if (atomic_dec_and_test(&osd->o_ref)) {
  914. osd_cleanup(osd);
  915. kfree(osd);
  916. }
  917. }
  918. DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
  919. static void __move_osd_to_lru(struct ceph_osd *osd)
  920. {
  921. struct ceph_osd_client *osdc = osd->o_osdc;
  922. dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
  923. BUG_ON(!list_empty(&osd->o_osd_lru));
  924. spin_lock(&osdc->osd_lru_lock);
  925. list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
  926. spin_unlock(&osdc->osd_lru_lock);
  927. osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
  928. }
  929. static void maybe_move_osd_to_lru(struct ceph_osd *osd)
  930. {
  931. if (RB_EMPTY_ROOT(&osd->o_requests) &&
  932. RB_EMPTY_ROOT(&osd->o_linger_requests))
  933. __move_osd_to_lru(osd);
  934. }
  935. static void __remove_osd_from_lru(struct ceph_osd *osd)
  936. {
  937. struct ceph_osd_client *osdc = osd->o_osdc;
  938. dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
  939. spin_lock(&osdc->osd_lru_lock);
  940. if (!list_empty(&osd->o_osd_lru))
  941. list_del_init(&osd->o_osd_lru);
  942. spin_unlock(&osdc->osd_lru_lock);
  943. }
  944. /*
  945. * Close the connection and assign any leftover requests to the
  946. * homeless session.
  947. */
  948. static void close_osd(struct ceph_osd *osd)
  949. {
  950. struct ceph_osd_client *osdc = osd->o_osdc;
  951. struct rb_node *n;
  952. verify_osdc_wrlocked(osdc);
  953. dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
  954. ceph_con_close(&osd->o_con);
  955. for (n = rb_first(&osd->o_requests); n; ) {
  956. struct ceph_osd_request *req =
  957. rb_entry(n, struct ceph_osd_request, r_node);
  958. n = rb_next(n); /* unlink_request() */
  959. dout(" reassigning req %p tid %llu\n", req, req->r_tid);
  960. unlink_request(osd, req);
  961. link_request(&osdc->homeless_osd, req);
  962. }
  963. for (n = rb_first(&osd->o_linger_requests); n; ) {
  964. struct ceph_osd_linger_request *lreq =
  965. rb_entry(n, struct ceph_osd_linger_request, node);
  966. n = rb_next(n); /* unlink_linger() */
  967. dout(" reassigning lreq %p linger_id %llu\n", lreq,
  968. lreq->linger_id);
  969. unlink_linger(osd, lreq);
  970. link_linger(&osdc->homeless_osd, lreq);
  971. }
  972. __remove_osd_from_lru(osd);
  973. erase_osd(&osdc->osds, osd);
  974. put_osd(osd);
  975. }
  976. /*
  977. * reset osd connect
  978. */
  979. static int reopen_osd(struct ceph_osd *osd)
  980. {
  981. struct ceph_entity_addr *peer_addr;
  982. dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
  983. if (RB_EMPTY_ROOT(&osd->o_requests) &&
  984. RB_EMPTY_ROOT(&osd->o_linger_requests)) {
  985. close_osd(osd);
  986. return -ENODEV;
  987. }
  988. peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
  989. if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
  990. !ceph_con_opened(&osd->o_con)) {
  991. struct rb_node *n;
  992. dout("osd addr hasn't changed and connection never opened, "
  993. "letting msgr retry\n");
  994. /* touch each r_stamp for handle_timeout()'s benfit */
  995. for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
  996. struct ceph_osd_request *req =
  997. rb_entry(n, struct ceph_osd_request, r_node);
  998. req->r_stamp = jiffies;
  999. }
  1000. return -EAGAIN;
  1001. }
  1002. ceph_con_close(&osd->o_con);
  1003. ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
  1004. osd->o_incarnation++;
  1005. return 0;
  1006. }
  1007. static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
  1008. bool wrlocked)
  1009. {
  1010. struct ceph_osd *osd;
  1011. if (wrlocked)
  1012. verify_osdc_wrlocked(osdc);
  1013. else
  1014. verify_osdc_locked(osdc);
  1015. if (o != CEPH_HOMELESS_OSD)
  1016. osd = lookup_osd(&osdc->osds, o);
  1017. else
  1018. osd = &osdc->homeless_osd;
  1019. if (!osd) {
  1020. if (!wrlocked)
  1021. return ERR_PTR(-EAGAIN);
  1022. osd = create_osd(osdc, o);
  1023. insert_osd(&osdc->osds, osd);
  1024. ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
  1025. &osdc->osdmap->osd_addr[osd->o_osd]);
  1026. }
  1027. dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
  1028. return osd;
  1029. }
  1030. /*
  1031. * Create request <-> OSD session relation.
  1032. *
  1033. * @req has to be assigned a tid, @osd may be homeless.
  1034. */
  1035. static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
  1036. {
  1037. verify_osd_locked(osd);
  1038. WARN_ON(!req->r_tid || req->r_osd);
  1039. dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
  1040. req, req->r_tid);
  1041. if (!osd_homeless(osd))
  1042. __remove_osd_from_lru(osd);
  1043. else
  1044. atomic_inc(&osd->o_osdc->num_homeless);
  1045. get_osd(osd);
  1046. insert_request(&osd->o_requests, req);
  1047. req->r_osd = osd;
  1048. }
  1049. static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
  1050. {
  1051. verify_osd_locked(osd);
  1052. WARN_ON(req->r_osd != osd);
  1053. dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
  1054. req, req->r_tid);
  1055. req->r_osd = NULL;
  1056. erase_request(&osd->o_requests, req);
  1057. put_osd(osd);
  1058. if (!osd_homeless(osd))
  1059. maybe_move_osd_to_lru(osd);
  1060. else
  1061. atomic_dec(&osd->o_osdc->num_homeless);
  1062. }
  1063. static bool __pool_full(struct ceph_pg_pool_info *pi)
  1064. {
  1065. return pi->flags & CEPH_POOL_FLAG_FULL;
  1066. }
  1067. static bool have_pool_full(struct ceph_osd_client *osdc)
  1068. {
  1069. struct rb_node *n;
  1070. for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
  1071. struct ceph_pg_pool_info *pi =
  1072. rb_entry(n, struct ceph_pg_pool_info, node);
  1073. if (__pool_full(pi))
  1074. return true;
  1075. }
  1076. return false;
  1077. }
  1078. static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
  1079. {
  1080. struct ceph_pg_pool_info *pi;
  1081. pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
  1082. if (!pi)
  1083. return false;
  1084. return __pool_full(pi);
  1085. }
  1086. /*
  1087. * Returns whether a request should be blocked from being sent
  1088. * based on the current osdmap and osd_client settings.
  1089. */
  1090. static bool target_should_be_paused(struct ceph_osd_client *osdc,
  1091. const struct ceph_osd_request_target *t,
  1092. struct ceph_pg_pool_info *pi)
  1093. {
  1094. bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
  1095. bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
  1096. ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
  1097. __pool_full(pi);
  1098. WARN_ON(pi->id != t->base_oloc.pool);
  1099. return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
  1100. (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
  1101. }
  1102. enum calc_target_result {
  1103. CALC_TARGET_NO_ACTION = 0,
  1104. CALC_TARGET_NEED_RESEND,
  1105. CALC_TARGET_POOL_DNE,
  1106. };
  1107. static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
  1108. struct ceph_osd_request_target *t,
  1109. u32 *last_force_resend,
  1110. bool any_change)
  1111. {
  1112. struct ceph_pg_pool_info *pi;
  1113. struct ceph_pg pgid, last_pgid;
  1114. struct ceph_osds up, acting;
  1115. bool force_resend = false;
  1116. bool need_check_tiering = false;
  1117. bool need_resend = false;
  1118. bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
  1119. enum calc_target_result ct_res;
  1120. int ret;
  1121. pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
  1122. if (!pi) {
  1123. t->osd = CEPH_HOMELESS_OSD;
  1124. ct_res = CALC_TARGET_POOL_DNE;
  1125. goto out;
  1126. }
  1127. if (osdc->osdmap->epoch == pi->last_force_request_resend) {
  1128. if (last_force_resend &&
  1129. *last_force_resend < pi->last_force_request_resend) {
  1130. *last_force_resend = pi->last_force_request_resend;
  1131. force_resend = true;
  1132. } else if (!last_force_resend) {
  1133. force_resend = true;
  1134. }
  1135. }
  1136. if (ceph_oid_empty(&t->target_oid) || force_resend) {
  1137. ceph_oid_copy(&t->target_oid, &t->base_oid);
  1138. need_check_tiering = true;
  1139. }
  1140. if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
  1141. ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
  1142. need_check_tiering = true;
  1143. }
  1144. if (need_check_tiering &&
  1145. (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
  1146. if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
  1147. t->target_oloc.pool = pi->read_tier;
  1148. if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
  1149. t->target_oloc.pool = pi->write_tier;
  1150. }
  1151. ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
  1152. &t->target_oloc, &pgid);
  1153. if (ret) {
  1154. WARN_ON(ret != -ENOENT);
  1155. t->osd = CEPH_HOMELESS_OSD;
  1156. ct_res = CALC_TARGET_POOL_DNE;
  1157. goto out;
  1158. }
  1159. last_pgid.pool = pgid.pool;
  1160. last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
  1161. ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
  1162. if (any_change &&
  1163. ceph_is_new_interval(&t->acting,
  1164. &acting,
  1165. &t->up,
  1166. &up,
  1167. t->size,
  1168. pi->size,
  1169. t->min_size,
  1170. pi->min_size,
  1171. t->pg_num,
  1172. pi->pg_num,
  1173. t->sort_bitwise,
  1174. sort_bitwise,
  1175. &last_pgid))
  1176. force_resend = true;
  1177. if (t->paused && !target_should_be_paused(osdc, t, pi)) {
  1178. t->paused = false;
  1179. need_resend = true;
  1180. }
  1181. if (ceph_pg_compare(&t->pgid, &pgid) ||
  1182. ceph_osds_changed(&t->acting, &acting, any_change) ||
  1183. force_resend) {
  1184. t->pgid = pgid; /* struct */
  1185. ceph_osds_copy(&t->acting, &acting);
  1186. ceph_osds_copy(&t->up, &up);
  1187. t->size = pi->size;
  1188. t->min_size = pi->min_size;
  1189. t->pg_num = pi->pg_num;
  1190. t->pg_num_mask = pi->pg_num_mask;
  1191. t->sort_bitwise = sort_bitwise;
  1192. t->osd = acting.primary;
  1193. need_resend = true;
  1194. }
  1195. ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
  1196. out:
  1197. dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
  1198. return ct_res;
  1199. }
  1200. static void setup_request_data(struct ceph_osd_request *req,
  1201. struct ceph_msg *msg)
  1202. {
  1203. u32 data_len = 0;
  1204. int i;
  1205. if (!list_empty(&msg->data))
  1206. return;
  1207. WARN_ON(msg->data_length);
  1208. for (i = 0; i < req->r_num_ops; i++) {
  1209. struct ceph_osd_req_op *op = &req->r_ops[i];
  1210. switch (op->op) {
  1211. /* request */
  1212. case CEPH_OSD_OP_WRITE:
  1213. case CEPH_OSD_OP_WRITEFULL:
  1214. WARN_ON(op->indata_len != op->extent.length);
  1215. ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
  1216. break;
  1217. case CEPH_OSD_OP_SETXATTR:
  1218. case CEPH_OSD_OP_CMPXATTR:
  1219. WARN_ON(op->indata_len != op->xattr.name_len +
  1220. op->xattr.value_len);
  1221. ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
  1222. break;
  1223. case CEPH_OSD_OP_NOTIFY_ACK:
  1224. ceph_osdc_msg_data_add(msg,
  1225. &op->notify_ack.request_data);
  1226. break;
  1227. /* reply */
  1228. case CEPH_OSD_OP_STAT:
  1229. ceph_osdc_msg_data_add(req->r_reply,
  1230. &op->raw_data_in);
  1231. break;
  1232. case CEPH_OSD_OP_READ:
  1233. ceph_osdc_msg_data_add(req->r_reply,
  1234. &op->extent.osd_data);
  1235. break;
  1236. case CEPH_OSD_OP_LIST_WATCHERS:
  1237. ceph_osdc_msg_data_add(req->r_reply,
  1238. &op->list_watchers.response_data);
  1239. break;
  1240. /* both */
  1241. case CEPH_OSD_OP_CALL:
  1242. WARN_ON(op->indata_len != op->cls.class_len +
  1243. op->cls.method_len +
  1244. op->cls.indata_len);
  1245. ceph_osdc_msg_data_add(msg, &op->cls.request_info);
  1246. /* optional, can be NONE */
  1247. ceph_osdc_msg_data_add(msg, &op->cls.request_data);
  1248. /* optional, can be NONE */
  1249. ceph_osdc_msg_data_add(req->r_reply,
  1250. &op->cls.response_data);
  1251. break;
  1252. case CEPH_OSD_OP_NOTIFY:
  1253. ceph_osdc_msg_data_add(msg,
  1254. &op->notify.request_data);
  1255. ceph_osdc_msg_data_add(req->r_reply,
  1256. &op->notify.response_data);
  1257. break;
  1258. }
  1259. data_len += op->indata_len;
  1260. }
  1261. WARN_ON(data_len != msg->data_length);
  1262. }
  1263. static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
  1264. {
  1265. void *p = msg->front.iov_base;
  1266. void *const end = p + msg->front_alloc_len;
  1267. u32 data_len = 0;
  1268. int i;
  1269. if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
  1270. /* snapshots aren't writeable */
  1271. WARN_ON(req->r_snapid != CEPH_NOSNAP);
  1272. } else {
  1273. WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
  1274. req->r_data_offset || req->r_snapc);
  1275. }
  1276. setup_request_data(req, msg);
  1277. ceph_encode_32(&p, 1); /* client_inc, always 1 */
  1278. ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
  1279. ceph_encode_32(&p, req->r_flags);
  1280. ceph_encode_timespec(p, &req->r_mtime);
  1281. p += sizeof(struct ceph_timespec);
  1282. /* aka reassert_version */
  1283. memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
  1284. p += sizeof(req->r_replay_version);
  1285. /* oloc */
  1286. ceph_start_encoding(&p, 5, 4,
  1287. ceph_oloc_encoding_size(&req->r_t.target_oloc));
  1288. ceph_encode_64(&p, req->r_t.target_oloc.pool);
  1289. ceph_encode_32(&p, -1); /* preferred */
  1290. ceph_encode_32(&p, 0); /* key len */
  1291. if (req->r_t.target_oloc.pool_ns)
  1292. ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
  1293. req->r_t.target_oloc.pool_ns->len);
  1294. else
  1295. ceph_encode_32(&p, 0);
  1296. /* pgid */
  1297. ceph_encode_8(&p, 1);
  1298. ceph_encode_64(&p, req->r_t.pgid.pool);
  1299. ceph_encode_32(&p, req->r_t.pgid.seed);
  1300. ceph_encode_32(&p, -1); /* preferred */
  1301. /* oid */
  1302. ceph_encode_32(&p, req->r_t.target_oid.name_len);
  1303. memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
  1304. p += req->r_t.target_oid.name_len;
  1305. /* ops, can imply data */
  1306. ceph_encode_16(&p, req->r_num_ops);
  1307. for (i = 0; i < req->r_num_ops; i++) {
  1308. data_len += osd_req_encode_op(p, &req->r_ops[i]);
  1309. p += sizeof(struct ceph_osd_op);
  1310. }
  1311. ceph_encode_64(&p, req->r_snapid); /* snapid */
  1312. if (req->r_snapc) {
  1313. ceph_encode_64(&p, req->r_snapc->seq);
  1314. ceph_encode_32(&p, req->r_snapc->num_snaps);
  1315. for (i = 0; i < req->r_snapc->num_snaps; i++)
  1316. ceph_encode_64(&p, req->r_snapc->snaps[i]);
  1317. } else {
  1318. ceph_encode_64(&p, 0); /* snap_seq */
  1319. ceph_encode_32(&p, 0); /* snaps len */
  1320. }
  1321. ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
  1322. BUG_ON(p > end);
  1323. msg->front.iov_len = p - msg->front.iov_base;
  1324. msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
  1325. msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
  1326. msg->hdr.data_len = cpu_to_le32(data_len);
  1327. /*
  1328. * The header "data_off" is a hint to the receiver allowing it
  1329. * to align received data into its buffers such that there's no
  1330. * need to re-copy it before writing it to disk (direct I/O).
  1331. */
  1332. msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
  1333. dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__,
  1334. req, req->r_t.target_oid.name, req->r_t.target_oid.name_len,
  1335. msg->front.iov_len, data_len);
  1336. }
  1337. /*
  1338. * @req has to be assigned a tid and registered.
  1339. */
  1340. static void send_request(struct ceph_osd_request *req)
  1341. {
  1342. struct ceph_osd *osd = req->r_osd;
  1343. verify_osd_locked(osd);
  1344. WARN_ON(osd->o_osd != req->r_t.osd);
  1345. /*
  1346. * We may have a previously queued request message hanging
  1347. * around. Cancel it to avoid corrupting the msgr.
  1348. */
  1349. if (req->r_sent)
  1350. ceph_msg_revoke(req->r_request);
  1351. req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
  1352. if (req->r_attempts)
  1353. req->r_flags |= CEPH_OSD_FLAG_RETRY;
  1354. else
  1355. WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
  1356. encode_request(req, req->r_request);
  1357. dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
  1358. __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
  1359. req->r_t.osd, req->r_flags, req->r_attempts);
  1360. req->r_t.paused = false;
  1361. req->r_stamp = jiffies;
  1362. req->r_attempts++;
  1363. req->r_sent = osd->o_incarnation;
  1364. req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
  1365. ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
  1366. }
  1367. static void maybe_request_map(struct ceph_osd_client *osdc)
  1368. {
  1369. bool continuous = false;
  1370. verify_osdc_locked(osdc);
  1371. WARN_ON(!osdc->osdmap->epoch);
  1372. if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
  1373. ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
  1374. ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
  1375. dout("%s osdc %p continuous\n", __func__, osdc);
  1376. continuous = true;
  1377. } else {
  1378. dout("%s osdc %p onetime\n", __func__, osdc);
  1379. }
  1380. if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
  1381. osdc->osdmap->epoch + 1, continuous))
  1382. ceph_monc_renew_subs(&osdc->client->monc);
  1383. }
  1384. static void send_map_check(struct ceph_osd_request *req);
  1385. static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
  1386. {
  1387. struct ceph_osd_client *osdc = req->r_osdc;
  1388. struct ceph_osd *osd;
  1389. enum calc_target_result ct_res;
  1390. bool need_send = false;
  1391. bool promoted = false;
  1392. WARN_ON(req->r_tid || req->r_got_reply);
  1393. dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
  1394. again:
  1395. ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
  1396. if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
  1397. goto promote;
  1398. osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
  1399. if (IS_ERR(osd)) {
  1400. WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
  1401. goto promote;
  1402. }
  1403. if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
  1404. ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
  1405. dout("req %p pausewr\n", req);
  1406. req->r_t.paused = true;
  1407. maybe_request_map(osdc);
  1408. } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
  1409. ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
  1410. dout("req %p pauserd\n", req);
  1411. req->r_t.paused = true;
  1412. maybe_request_map(osdc);
  1413. } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
  1414. !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
  1415. CEPH_OSD_FLAG_FULL_FORCE)) &&
  1416. (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
  1417. pool_full(osdc, req->r_t.base_oloc.pool))) {
  1418. dout("req %p full/pool_full\n", req);
  1419. pr_warn_ratelimited("FULL or reached pool quota\n");
  1420. req->r_t.paused = true;
  1421. maybe_request_map(osdc);
  1422. } else if (!osd_homeless(osd)) {
  1423. need_send = true;
  1424. } else {
  1425. maybe_request_map(osdc);
  1426. }
  1427. mutex_lock(&osd->lock);
  1428. /*
  1429. * Assign the tid atomically with send_request() to protect
  1430. * multiple writes to the same object from racing with each
  1431. * other, resulting in out of order ops on the OSDs.
  1432. */
  1433. req->r_tid = atomic64_inc_return(&osdc->last_tid);
  1434. link_request(osd, req);
  1435. if (need_send)
  1436. send_request(req);
  1437. mutex_unlock(&osd->lock);
  1438. if (ct_res == CALC_TARGET_POOL_DNE)
  1439. send_map_check(req);
  1440. if (promoted)
  1441. downgrade_write(&osdc->lock);
  1442. return;
  1443. promote:
  1444. up_read(&osdc->lock);
  1445. down_write(&osdc->lock);
  1446. wrlocked = true;
  1447. promoted = true;
  1448. goto again;
  1449. }
  1450. static void account_request(struct ceph_osd_request *req)
  1451. {
  1452. unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
  1453. if (req->r_flags & CEPH_OSD_FLAG_READ) {
  1454. WARN_ON(req->r_flags & mask);
  1455. req->r_flags |= CEPH_OSD_FLAG_ACK;
  1456. } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
  1457. WARN_ON(!(req->r_flags & mask));
  1458. else
  1459. WARN_ON(1);
  1460. WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
  1461. atomic_inc(&req->r_osdc->num_requests);
  1462. }
  1463. static void submit_request(struct ceph_osd_request *req, bool wrlocked)
  1464. {
  1465. ceph_osdc_get_request(req);
  1466. account_request(req);
  1467. __submit_request(req, wrlocked);
  1468. }
  1469. static void finish_request(struct ceph_osd_request *req)
  1470. {
  1471. struct ceph_osd_client *osdc = req->r_osdc;
  1472. struct ceph_osd *osd = req->r_osd;
  1473. verify_osd_locked(osd);
  1474. dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
  1475. WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
  1476. unlink_request(osd, req);
  1477. atomic_dec(&osdc->num_requests);
  1478. /*
  1479. * If an OSD has failed or returned and a request has been sent
  1480. * twice, it's possible to get a reply and end up here while the
  1481. * request message is queued for delivery. We will ignore the
  1482. * reply, so not a big deal, but better to try and catch it.
  1483. */
  1484. ceph_msg_revoke(req->r_request);
  1485. ceph_msg_revoke_incoming(req->r_reply);
  1486. }
  1487. static void __complete_request(struct ceph_osd_request *req)
  1488. {
  1489. if (req->r_callback)
  1490. req->r_callback(req);
  1491. else
  1492. complete_all(&req->r_completion);
  1493. }
  1494. /*
  1495. * Note that this is open-coded in handle_reply(), which has to deal
  1496. * with ack vs commit, dup acks, etc.
  1497. */
  1498. static void complete_request(struct ceph_osd_request *req, int err)
  1499. {
  1500. dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
  1501. req->r_result = err;
  1502. finish_request(req);
  1503. __complete_request(req);
  1504. complete_all(&req->r_done_completion);
  1505. ceph_osdc_put_request(req);
  1506. }
  1507. static void cancel_map_check(struct ceph_osd_request *req)
  1508. {
  1509. struct ceph_osd_client *osdc = req->r_osdc;
  1510. struct ceph_osd_request *lookup_req;
  1511. verify_osdc_wrlocked(osdc);
  1512. lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
  1513. if (!lookup_req)
  1514. return;
  1515. WARN_ON(lookup_req != req);
  1516. erase_request_mc(&osdc->map_checks, req);
  1517. ceph_osdc_put_request(req);
  1518. }
  1519. static void cancel_request(struct ceph_osd_request *req)
  1520. {
  1521. dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
  1522. cancel_map_check(req);
  1523. finish_request(req);
  1524. complete_all(&req->r_done_completion);
  1525. ceph_osdc_put_request(req);
  1526. }
  1527. static void check_pool_dne(struct ceph_osd_request *req)
  1528. {
  1529. struct ceph_osd_client *osdc = req->r_osdc;
  1530. struct ceph_osdmap *map = osdc->osdmap;
  1531. verify_osdc_wrlocked(osdc);
  1532. WARN_ON(!map->epoch);
  1533. if (req->r_attempts) {
  1534. /*
  1535. * We sent a request earlier, which means that
  1536. * previously the pool existed, and now it does not
  1537. * (i.e., it was deleted).
  1538. */
  1539. req->r_map_dne_bound = map->epoch;
  1540. dout("%s req %p tid %llu pool disappeared\n", __func__, req,
  1541. req->r_tid);
  1542. } else {
  1543. dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
  1544. req, req->r_tid, req->r_map_dne_bound, map->epoch);
  1545. }
  1546. if (req->r_map_dne_bound) {
  1547. if (map->epoch >= req->r_map_dne_bound) {
  1548. /* we had a new enough map */
  1549. pr_info_ratelimited("tid %llu pool does not exist\n",
  1550. req->r_tid);
  1551. complete_request(req, -ENOENT);
  1552. }
  1553. } else {
  1554. send_map_check(req);
  1555. }
  1556. }
  1557. static void map_check_cb(struct ceph_mon_generic_request *greq)
  1558. {
  1559. struct ceph_osd_client *osdc = &greq->monc->client->osdc;
  1560. struct ceph_osd_request *req;
  1561. u64 tid = greq->private_data;
  1562. WARN_ON(greq->result || !greq->u.newest);
  1563. down_write(&osdc->lock);
  1564. req = lookup_request_mc(&osdc->map_checks, tid);
  1565. if (!req) {
  1566. dout("%s tid %llu dne\n", __func__, tid);
  1567. goto out_unlock;
  1568. }
  1569. dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
  1570. req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
  1571. if (!req->r_map_dne_bound)
  1572. req->r_map_dne_bound = greq->u.newest;
  1573. erase_request_mc(&osdc->map_checks, req);
  1574. check_pool_dne(req);
  1575. ceph_osdc_put_request(req);
  1576. out_unlock:
  1577. up_write(&osdc->lock);
  1578. }
  1579. static void send_map_check(struct ceph_osd_request *req)
  1580. {
  1581. struct ceph_osd_client *osdc = req->r_osdc;
  1582. struct ceph_osd_request *lookup_req;
  1583. int ret;
  1584. verify_osdc_wrlocked(osdc);
  1585. lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
  1586. if (lookup_req) {
  1587. WARN_ON(lookup_req != req);
  1588. return;
  1589. }
  1590. ceph_osdc_get_request(req);
  1591. insert_request_mc(&osdc->map_checks, req);
  1592. ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
  1593. map_check_cb, req->r_tid);
  1594. WARN_ON(ret);
  1595. }
  1596. /*
  1597. * lingering requests, watch/notify v2 infrastructure
  1598. */
  1599. static void linger_release(struct kref *kref)
  1600. {
  1601. struct ceph_osd_linger_request *lreq =
  1602. container_of(kref, struct ceph_osd_linger_request, kref);
  1603. dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
  1604. lreq->reg_req, lreq->ping_req);
  1605. WARN_ON(!RB_EMPTY_NODE(&lreq->node));
  1606. WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
  1607. WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
  1608. WARN_ON(!list_empty(&lreq->scan_item));
  1609. WARN_ON(!list_empty(&lreq->pending_lworks));
  1610. WARN_ON(lreq->osd);
  1611. if (lreq->reg_req)
  1612. ceph_osdc_put_request(lreq->reg_req);
  1613. if (lreq->ping_req)
  1614. ceph_osdc_put_request(lreq->ping_req);
  1615. target_destroy(&lreq->t);
  1616. kfree(lreq);
  1617. }
  1618. static void linger_put(struct ceph_osd_linger_request *lreq)
  1619. {
  1620. if (lreq)
  1621. kref_put(&lreq->kref, linger_release);
  1622. }
  1623. static struct ceph_osd_linger_request *
  1624. linger_get(struct ceph_osd_linger_request *lreq)
  1625. {
  1626. kref_get(&lreq->kref);
  1627. return lreq;
  1628. }
  1629. static struct ceph_osd_linger_request *
  1630. linger_alloc(struct ceph_osd_client *osdc)
  1631. {
  1632. struct ceph_osd_linger_request *lreq;
  1633. lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
  1634. if (!lreq)
  1635. return NULL;
  1636. kref_init(&lreq->kref);
  1637. mutex_init(&lreq->lock);
  1638. RB_CLEAR_NODE(&lreq->node);
  1639. RB_CLEAR_NODE(&lreq->osdc_node);
  1640. RB_CLEAR_NODE(&lreq->mc_node);
  1641. INIT_LIST_HEAD(&lreq->scan_item);
  1642. INIT_LIST_HEAD(&lreq->pending_lworks);
  1643. init_completion(&lreq->reg_commit_wait);
  1644. init_completion(&lreq->notify_finish_wait);
  1645. lreq->osdc = osdc;
  1646. target_init(&lreq->t);
  1647. dout("%s lreq %p\n", __func__, lreq);
  1648. return lreq;
  1649. }
  1650. DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
  1651. DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
  1652. DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
  1653. /*
  1654. * Create linger request <-> OSD session relation.
  1655. *
  1656. * @lreq has to be registered, @osd may be homeless.
  1657. */
  1658. static void link_linger(struct ceph_osd *osd,
  1659. struct ceph_osd_linger_request *lreq)
  1660. {
  1661. verify_osd_locked(osd);
  1662. WARN_ON(!lreq->linger_id || lreq->osd);
  1663. dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
  1664. osd->o_osd, lreq, lreq->linger_id);
  1665. if (!osd_homeless(osd))
  1666. __remove_osd_from_lru(osd);
  1667. else
  1668. atomic_inc(&osd->o_osdc->num_homeless);
  1669. get_osd(osd);
  1670. insert_linger(&osd->o_linger_requests, lreq);
  1671. lreq->osd = osd;
  1672. }
  1673. static void unlink_linger(struct ceph_osd *osd,
  1674. struct ceph_osd_linger_request *lreq)
  1675. {
  1676. verify_osd_locked(osd);
  1677. WARN_ON(lreq->osd != osd);
  1678. dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
  1679. osd->o_osd, lreq, lreq->linger_id);
  1680. lreq->osd = NULL;
  1681. erase_linger(&osd->o_linger_requests, lreq);
  1682. put_osd(osd);
  1683. if (!osd_homeless(osd))
  1684. maybe_move_osd_to_lru(osd);
  1685. else
  1686. atomic_dec(&osd->o_osdc->num_homeless);
  1687. }
  1688. static bool __linger_registered(struct ceph_osd_linger_request *lreq)
  1689. {
  1690. verify_osdc_locked(lreq->osdc);
  1691. return !RB_EMPTY_NODE(&lreq->osdc_node);
  1692. }
  1693. static bool linger_registered(struct ceph_osd_linger_request *lreq)
  1694. {
  1695. struct ceph_osd_client *osdc = lreq->osdc;
  1696. bool registered;
  1697. down_read(&osdc->lock);
  1698. registered = __linger_registered(lreq);
  1699. up_read(&osdc->lock);
  1700. return registered;
  1701. }
  1702. static void linger_register(struct ceph_osd_linger_request *lreq)
  1703. {
  1704. struct ceph_osd_client *osdc = lreq->osdc;
  1705. verify_osdc_wrlocked(osdc);
  1706. WARN_ON(lreq->linger_id);
  1707. linger_get(lreq);
  1708. lreq->linger_id = ++osdc->last_linger_id;
  1709. insert_linger_osdc(&osdc->linger_requests, lreq);
  1710. }
  1711. static void linger_unregister(struct ceph_osd_linger_request *lreq)
  1712. {
  1713. struct ceph_osd_client *osdc = lreq->osdc;
  1714. verify_osdc_wrlocked(osdc);
  1715. erase_linger_osdc(&osdc->linger_requests, lreq);
  1716. linger_put(lreq);
  1717. }
  1718. static void cancel_linger_request(struct ceph_osd_request *req)
  1719. {
  1720. struct ceph_osd_linger_request *lreq = req->r_priv;
  1721. WARN_ON(!req->r_linger);
  1722. cancel_request(req);
  1723. linger_put(lreq);
  1724. }
  1725. struct linger_work {
  1726. struct work_struct work;
  1727. struct ceph_osd_linger_request *lreq;
  1728. struct list_head pending_item;
  1729. unsigned long queued_stamp;
  1730. union {
  1731. struct {
  1732. u64 notify_id;
  1733. u64 notifier_id;
  1734. void *payload; /* points into @msg front */
  1735. size_t payload_len;
  1736. struct ceph_msg *msg; /* for ceph_msg_put() */
  1737. } notify;
  1738. struct {
  1739. int err;
  1740. } error;
  1741. };
  1742. };
  1743. static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
  1744. work_func_t workfn)
  1745. {
  1746. struct linger_work *lwork;
  1747. lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
  1748. if (!lwork)
  1749. return NULL;
  1750. INIT_WORK(&lwork->work, workfn);
  1751. INIT_LIST_HEAD(&lwork->pending_item);
  1752. lwork->lreq = linger_get(lreq);
  1753. return lwork;
  1754. }
  1755. static void lwork_free(struct linger_work *lwork)
  1756. {
  1757. struct ceph_osd_linger_request *lreq = lwork->lreq;
  1758. mutex_lock(&lreq->lock);
  1759. list_del(&lwork->pending_item);
  1760. mutex_unlock(&lreq->lock);
  1761. linger_put(lreq);
  1762. kfree(lwork);
  1763. }
  1764. static void lwork_queue(struct linger_work *lwork)
  1765. {
  1766. struct ceph_osd_linger_request *lreq = lwork->lreq;
  1767. struct ceph_osd_client *osdc = lreq->osdc;
  1768. verify_lreq_locked(lreq);
  1769. WARN_ON(!list_empty(&lwork->pending_item));
  1770. lwork->queued_stamp = jiffies;
  1771. list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
  1772. queue_work(osdc->notify_wq, &lwork->work);
  1773. }
  1774. static void do_watch_notify(struct work_struct *w)
  1775. {
  1776. struct linger_work *lwork = container_of(w, struct linger_work, work);
  1777. struct ceph_osd_linger_request *lreq = lwork->lreq;
  1778. if (!linger_registered(lreq)) {
  1779. dout("%s lreq %p not registered\n", __func__, lreq);
  1780. goto out;
  1781. }
  1782. WARN_ON(!lreq->is_watch);
  1783. dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
  1784. __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
  1785. lwork->notify.payload_len);
  1786. lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
  1787. lwork->notify.notifier_id, lwork->notify.payload,
  1788. lwork->notify.payload_len);
  1789. out:
  1790. ceph_msg_put(lwork->notify.msg);
  1791. lwork_free(lwork);
  1792. }
  1793. static void do_watch_error(struct work_struct *w)
  1794. {
  1795. struct linger_work *lwork = container_of(w, struct linger_work, work);
  1796. struct ceph_osd_linger_request *lreq = lwork->lreq;
  1797. if (!linger_registered(lreq)) {
  1798. dout("%s lreq %p not registered\n", __func__, lreq);
  1799. goto out;
  1800. }
  1801. dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
  1802. lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
  1803. out:
  1804. lwork_free(lwork);
  1805. }
  1806. static void queue_watch_error(struct ceph_osd_linger_request *lreq)
  1807. {
  1808. struct linger_work *lwork;
  1809. lwork = lwork_alloc(lreq, do_watch_error);
  1810. if (!lwork) {
  1811. pr_err("failed to allocate error-lwork\n");
  1812. return;
  1813. }
  1814. lwork->error.err = lreq->last_error;
  1815. lwork_queue(lwork);
  1816. }
  1817. static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
  1818. int result)
  1819. {
  1820. if (!completion_done(&lreq->reg_commit_wait)) {
  1821. lreq->reg_commit_error = (result <= 0 ? result : 0);
  1822. complete_all(&lreq->reg_commit_wait);
  1823. }
  1824. }
  1825. static void linger_commit_cb(struct ceph_osd_request *req)
  1826. {
  1827. struct ceph_osd_linger_request *lreq = req->r_priv;
  1828. mutex_lock(&lreq->lock);
  1829. dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
  1830. lreq->linger_id, req->r_result);
  1831. WARN_ON(!__linger_registered(lreq));
  1832. linger_reg_commit_complete(lreq, req->r_result);
  1833. lreq->committed = true;
  1834. if (!lreq->is_watch) {
  1835. struct ceph_osd_data *osd_data =
  1836. osd_req_op_data(req, 0, notify, response_data);
  1837. void *p = page_address(osd_data->pages[0]);
  1838. WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
  1839. osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
  1840. /* make note of the notify_id */
  1841. if (req->r_ops[0].outdata_len >= sizeof(u64)) {
  1842. lreq->notify_id = ceph_decode_64(&p);
  1843. dout("lreq %p notify_id %llu\n", lreq,
  1844. lreq->notify_id);
  1845. } else {
  1846. dout("lreq %p no notify_id\n", lreq);
  1847. }
  1848. }
  1849. mutex_unlock(&lreq->lock);
  1850. linger_put(lreq);
  1851. }
  1852. static int normalize_watch_error(int err)
  1853. {
  1854. /*
  1855. * Translate ENOENT -> ENOTCONN so that a delete->disconnection
  1856. * notification and a failure to reconnect because we raced with
  1857. * the delete appear the same to the user.
  1858. */
  1859. if (err == -ENOENT)
  1860. err = -ENOTCONN;
  1861. return err;
  1862. }
  1863. static void linger_reconnect_cb(struct ceph_osd_request *req)
  1864. {
  1865. struct ceph_osd_linger_request *lreq = req->r_priv;
  1866. mutex_lock(&lreq->lock);
  1867. dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
  1868. lreq, lreq->linger_id, req->r_result, lreq->last_error);
  1869. if (req->r_result < 0) {
  1870. if (!lreq->last_error) {
  1871. lreq->last_error = normalize_watch_error(req->r_result);
  1872. queue_watch_error(lreq);
  1873. }
  1874. }
  1875. mutex_unlock(&lreq->lock);
  1876. linger_put(lreq);
  1877. }
  1878. static void send_linger(struct ceph_osd_linger_request *lreq)
  1879. {
  1880. struct ceph_osd_request *req = lreq->reg_req;
  1881. struct ceph_osd_req_op *op = &req->r_ops[0];
  1882. verify_osdc_wrlocked(req->r_osdc);
  1883. dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
  1884. if (req->r_osd)
  1885. cancel_linger_request(req);
  1886. request_reinit(req);
  1887. ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
  1888. ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
  1889. req->r_flags = lreq->t.flags;
  1890. req->r_mtime = lreq->mtime;
  1891. mutex_lock(&lreq->lock);
  1892. if (lreq->is_watch && lreq->committed) {
  1893. WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
  1894. op->watch.cookie != lreq->linger_id);
  1895. op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
  1896. op->watch.gen = ++lreq->register_gen;
  1897. dout("lreq %p reconnect register_gen %u\n", lreq,
  1898. op->watch.gen);
  1899. req->r_callback = linger_reconnect_cb;
  1900. } else {
  1901. if (!lreq->is_watch)
  1902. lreq->notify_id = 0;
  1903. else
  1904. WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
  1905. dout("lreq %p register\n", lreq);
  1906. req->r_callback = linger_commit_cb;
  1907. }
  1908. mutex_unlock(&lreq->lock);
  1909. req->r_priv = linger_get(lreq);
  1910. req->r_linger = true;
  1911. submit_request(req, true);
  1912. }
  1913. static void linger_ping_cb(struct ceph_osd_request *req)
  1914. {
  1915. struct ceph_osd_linger_request *lreq = req->r_priv;
  1916. mutex_lock(&lreq->lock);
  1917. dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
  1918. __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
  1919. lreq->last_error);
  1920. if (lreq->register_gen == req->r_ops[0].watch.gen) {
  1921. if (!req->r_result) {
  1922. lreq->watch_valid_thru = lreq->ping_sent;
  1923. } else if (!lreq->last_error) {
  1924. lreq->last_error = normalize_watch_error(req->r_result);
  1925. queue_watch_error(lreq);
  1926. }
  1927. } else {
  1928. dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
  1929. lreq->register_gen, req->r_ops[0].watch.gen);
  1930. }
  1931. mutex_unlock(&lreq->lock);
  1932. linger_put(lreq);
  1933. }
  1934. static void send_linger_ping(struct ceph_osd_linger_request *lreq)
  1935. {
  1936. struct ceph_osd_client *osdc = lreq->osdc;
  1937. struct ceph_osd_request *req = lreq->ping_req;
  1938. struct ceph_osd_req_op *op = &req->r_ops[0];
  1939. if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
  1940. dout("%s PAUSERD\n", __func__);
  1941. return;
  1942. }
  1943. lreq->ping_sent = jiffies;
  1944. dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
  1945. __func__, lreq, lreq->linger_id, lreq->ping_sent,
  1946. lreq->register_gen);
  1947. if (req->r_osd)
  1948. cancel_linger_request(req);
  1949. request_reinit(req);
  1950. target_copy(&req->r_t, &lreq->t);
  1951. WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
  1952. op->watch.cookie != lreq->linger_id ||
  1953. op->watch.op != CEPH_OSD_WATCH_OP_PING);
  1954. op->watch.gen = lreq->register_gen;
  1955. req->r_callback = linger_ping_cb;
  1956. req->r_priv = linger_get(lreq);
  1957. req->r_linger = true;
  1958. ceph_osdc_get_request(req);
  1959. account_request(req);
  1960. req->r_tid = atomic64_inc_return(&osdc->last_tid);
  1961. link_request(lreq->osd, req);
  1962. send_request(req);
  1963. }
  1964. static void linger_submit(struct ceph_osd_linger_request *lreq)
  1965. {
  1966. struct ceph_osd_client *osdc = lreq->osdc;
  1967. struct ceph_osd *osd;
  1968. calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
  1969. osd = lookup_create_osd(osdc, lreq->t.osd, true);
  1970. link_linger(osd, lreq);
  1971. send_linger(lreq);
  1972. }
  1973. static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
  1974. {
  1975. struct ceph_osd_client *osdc = lreq->osdc;
  1976. struct ceph_osd_linger_request *lookup_lreq;
  1977. verify_osdc_wrlocked(osdc);
  1978. lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
  1979. lreq->linger_id);
  1980. if (!lookup_lreq)
  1981. return;
  1982. WARN_ON(lookup_lreq != lreq);
  1983. erase_linger_mc(&osdc->linger_map_checks, lreq);
  1984. linger_put(lreq);
  1985. }
  1986. /*
  1987. * @lreq has to be both registered and linked.
  1988. */
  1989. static void __linger_cancel(struct ceph_osd_linger_request *lreq)
  1990. {
  1991. if (lreq->is_watch && lreq->ping_req->r_osd)
  1992. cancel_linger_request(lreq->ping_req);
  1993. if (lreq->reg_req->r_osd)
  1994. cancel_linger_request(lreq->reg_req);
  1995. cancel_linger_map_check(lreq);
  1996. unlink_linger(lreq->osd, lreq);
  1997. linger_unregister(lreq);
  1998. }
  1999. static void linger_cancel(struct ceph_osd_linger_request *lreq)
  2000. {
  2001. struct ceph_osd_client *osdc = lreq->osdc;
  2002. down_write(&osdc->lock);
  2003. if (__linger_registered(lreq))
  2004. __linger_cancel(lreq);
  2005. up_write(&osdc->lock);
  2006. }
  2007. static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
  2008. static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
  2009. {
  2010. struct ceph_osd_client *osdc = lreq->osdc;
  2011. struct ceph_osdmap *map = osdc->osdmap;
  2012. verify_osdc_wrlocked(osdc);
  2013. WARN_ON(!map->epoch);
  2014. if (lreq->register_gen) {
  2015. lreq->map_dne_bound = map->epoch;
  2016. dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
  2017. lreq, lreq->linger_id);
  2018. } else {
  2019. dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
  2020. __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
  2021. map->epoch);
  2022. }
  2023. if (lreq->map_dne_bound) {
  2024. if (map->epoch >= lreq->map_dne_bound) {
  2025. /* we had a new enough map */
  2026. pr_info("linger_id %llu pool does not exist\n",
  2027. lreq->linger_id);
  2028. linger_reg_commit_complete(lreq, -ENOENT);
  2029. __linger_cancel(lreq);
  2030. }
  2031. } else {
  2032. send_linger_map_check(lreq);
  2033. }
  2034. }
  2035. static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
  2036. {
  2037. struct ceph_osd_client *osdc = &greq->monc->client->osdc;
  2038. struct ceph_osd_linger_request *lreq;
  2039. u64 linger_id = greq->private_data;
  2040. WARN_ON(greq->result || !greq->u.newest);
  2041. down_write(&osdc->lock);
  2042. lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
  2043. if (!lreq) {
  2044. dout("%s linger_id %llu dne\n", __func__, linger_id);
  2045. goto out_unlock;
  2046. }
  2047. dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
  2048. __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
  2049. greq->u.newest);
  2050. if (!lreq->map_dne_bound)
  2051. lreq->map_dne_bound = greq->u.newest;
  2052. erase_linger_mc(&osdc->linger_map_checks, lreq);
  2053. check_linger_pool_dne(lreq);
  2054. linger_put(lreq);
  2055. out_unlock:
  2056. up_write(&osdc->lock);
  2057. }
  2058. static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
  2059. {
  2060. struct ceph_osd_client *osdc = lreq->osdc;
  2061. struct ceph_osd_linger_request *lookup_lreq;
  2062. int ret;
  2063. verify_osdc_wrlocked(osdc);
  2064. lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
  2065. lreq->linger_id);
  2066. if (lookup_lreq) {
  2067. WARN_ON(lookup_lreq != lreq);
  2068. return;
  2069. }
  2070. linger_get(lreq);
  2071. insert_linger_mc(&osdc->linger_map_checks, lreq);
  2072. ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
  2073. linger_map_check_cb, lreq->linger_id);
  2074. WARN_ON(ret);
  2075. }
  2076. static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
  2077. {
  2078. int ret;
  2079. dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
  2080. ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
  2081. return ret ?: lreq->reg_commit_error;
  2082. }
  2083. static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
  2084. {
  2085. int ret;
  2086. dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
  2087. ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
  2088. return ret ?: lreq->notify_finish_error;
  2089. }
  2090. /*
  2091. * Timeout callback, called every N seconds. When 1 or more OSD
  2092. * requests has been active for more than N seconds, we send a keepalive
  2093. * (tag + timestamp) to its OSD to ensure any communications channel
  2094. * reset is detected.
  2095. */
  2096. static void handle_timeout(struct work_struct *work)
  2097. {
  2098. struct ceph_osd_client *osdc =
  2099. container_of(work, struct ceph_osd_client, timeout_work.work);
  2100. struct ceph_options *opts = osdc->client->options;
  2101. unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
  2102. LIST_HEAD(slow_osds);
  2103. struct rb_node *n, *p;
  2104. dout("%s osdc %p\n", __func__, osdc);
  2105. down_write(&osdc->lock);
  2106. /*
  2107. * ping osds that are a bit slow. this ensures that if there
  2108. * is a break in the TCP connection we will notice, and reopen
  2109. * a connection with that osd (from the fault callback).
  2110. */
  2111. for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
  2112. struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
  2113. bool found = false;
  2114. for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
  2115. struct ceph_osd_request *req =
  2116. rb_entry(p, struct ceph_osd_request, r_node);
  2117. if (time_before(req->r_stamp, cutoff)) {
  2118. dout(" req %p tid %llu on osd%d is laggy\n",
  2119. req, req->r_tid, osd->o_osd);
  2120. found = true;
  2121. }
  2122. }
  2123. for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
  2124. struct ceph_osd_linger_request *lreq =
  2125. rb_entry(p, struct ceph_osd_linger_request, node);
  2126. dout(" lreq %p linger_id %llu is served by osd%d\n",
  2127. lreq, lreq->linger_id, osd->o_osd);
  2128. found = true;
  2129. mutex_lock(&lreq->lock);
  2130. if (lreq->is_watch && lreq->committed && !lreq->last_error)
  2131. send_linger_ping(lreq);
  2132. mutex_unlock(&lreq->lock);
  2133. }
  2134. if (found)
  2135. list_move_tail(&osd->o_keepalive_item, &slow_osds);
  2136. }
  2137. if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
  2138. maybe_request_map(osdc);
  2139. while (!list_empty(&slow_osds)) {
  2140. struct ceph_osd *osd = list_first_entry(&slow_osds,
  2141. struct ceph_osd,
  2142. o_keepalive_item);
  2143. list_del_init(&osd->o_keepalive_item);
  2144. ceph_con_keepalive(&osd->o_con);
  2145. }
  2146. up_write(&osdc->lock);
  2147. schedule_delayed_work(&osdc->timeout_work,
  2148. osdc->client->options->osd_keepalive_timeout);
  2149. }
  2150. static void handle_osds_timeout(struct work_struct *work)
  2151. {
  2152. struct ceph_osd_client *osdc =
  2153. container_of(work, struct ceph_osd_client,
  2154. osds_timeout_work.work);
  2155. unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
  2156. struct ceph_osd *osd, *nosd;
  2157. dout("%s osdc %p\n", __func__, osdc);
  2158. down_write(&osdc->lock);
  2159. list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
  2160. if (time_before(jiffies, osd->lru_ttl))
  2161. break;
  2162. WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
  2163. WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
  2164. close_osd(osd);
  2165. }
  2166. up_write(&osdc->lock);
  2167. schedule_delayed_work(&osdc->osds_timeout_work,
  2168. round_jiffies_relative(delay));
  2169. }
  2170. static int ceph_oloc_decode(void **p, void *end,
  2171. struct ceph_object_locator *oloc)
  2172. {
  2173. u8 struct_v, struct_cv;
  2174. u32 len;
  2175. void *struct_end;
  2176. int ret = 0;
  2177. ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
  2178. struct_v = ceph_decode_8(p);
  2179. struct_cv = ceph_decode_8(p);
  2180. if (struct_v < 3) {
  2181. pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
  2182. struct_v, struct_cv);
  2183. goto e_inval;
  2184. }
  2185. if (struct_cv > 6) {
  2186. pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
  2187. struct_v, struct_cv);
  2188. goto e_inval;
  2189. }
  2190. len = ceph_decode_32(p);
  2191. ceph_decode_need(p, end, len, e_inval);
  2192. struct_end = *p + len;
  2193. oloc->pool = ceph_decode_64(p);
  2194. *p += 4; /* skip preferred */
  2195. len = ceph_decode_32(p);
  2196. if (len > 0) {
  2197. pr_warn("ceph_object_locator::key is set\n");
  2198. goto e_inval;
  2199. }
  2200. if (struct_v >= 5) {
  2201. bool changed = false;
  2202. len = ceph_decode_32(p);
  2203. if (len > 0) {
  2204. ceph_decode_need(p, end, len, e_inval);
  2205. if (!oloc->pool_ns ||
  2206. ceph_compare_string(oloc->pool_ns, *p, len))
  2207. changed = true;
  2208. *p += len;
  2209. } else {
  2210. if (oloc->pool_ns)
  2211. changed = true;
  2212. }
  2213. if (changed) {
  2214. /* redirect changes namespace */
  2215. pr_warn("ceph_object_locator::nspace is changed\n");
  2216. goto e_inval;
  2217. }
  2218. }
  2219. if (struct_v >= 6) {
  2220. s64 hash = ceph_decode_64(p);
  2221. if (hash != -1) {
  2222. pr_warn("ceph_object_locator::hash is set\n");
  2223. goto e_inval;
  2224. }
  2225. }
  2226. /* skip the rest */
  2227. *p = struct_end;
  2228. out:
  2229. return ret;
  2230. e_inval:
  2231. ret = -EINVAL;
  2232. goto out;
  2233. }
  2234. static int ceph_redirect_decode(void **p, void *end,
  2235. struct ceph_request_redirect *redir)
  2236. {
  2237. u8 struct_v, struct_cv;
  2238. u32 len;
  2239. void *struct_end;
  2240. int ret;
  2241. ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
  2242. struct_v = ceph_decode_8(p);
  2243. struct_cv = ceph_decode_8(p);
  2244. if (struct_cv > 1) {
  2245. pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
  2246. struct_v, struct_cv);
  2247. goto e_inval;
  2248. }
  2249. len = ceph_decode_32(p);
  2250. ceph_decode_need(p, end, len, e_inval);
  2251. struct_end = *p + len;
  2252. ret = ceph_oloc_decode(p, end, &redir->oloc);
  2253. if (ret)
  2254. goto out;
  2255. len = ceph_decode_32(p);
  2256. if (len > 0) {
  2257. pr_warn("ceph_request_redirect::object_name is set\n");
  2258. goto e_inval;
  2259. }
  2260. len = ceph_decode_32(p);
  2261. *p += len; /* skip osd_instructions */
  2262. /* skip the rest */
  2263. *p = struct_end;
  2264. out:
  2265. return ret;
  2266. e_inval:
  2267. ret = -EINVAL;
  2268. goto out;
  2269. }
  2270. struct MOSDOpReply {
  2271. struct ceph_pg pgid;
  2272. u64 flags;
  2273. int result;
  2274. u32 epoch;
  2275. int num_ops;
  2276. u32 outdata_len[CEPH_OSD_MAX_OPS];
  2277. s32 rval[CEPH_OSD_MAX_OPS];
  2278. int retry_attempt;
  2279. struct ceph_eversion replay_version;
  2280. u64 user_version;
  2281. struct ceph_request_redirect redirect;
  2282. };
  2283. static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
  2284. {
  2285. void *p = msg->front.iov_base;
  2286. void *const end = p + msg->front.iov_len;
  2287. u16 version = le16_to_cpu(msg->hdr.version);
  2288. struct ceph_eversion bad_replay_version;
  2289. u8 decode_redir;
  2290. u32 len;
  2291. int ret;
  2292. int i;
  2293. ceph_decode_32_safe(&p, end, len, e_inval);
  2294. ceph_decode_need(&p, end, len, e_inval);
  2295. p += len; /* skip oid */
  2296. ret = ceph_decode_pgid(&p, end, &m->pgid);
  2297. if (ret)
  2298. return ret;
  2299. ceph_decode_64_safe(&p, end, m->flags, e_inval);
  2300. ceph_decode_32_safe(&p, end, m->result, e_inval);
  2301. ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
  2302. memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
  2303. p += sizeof(bad_replay_version);
  2304. ceph_decode_32_safe(&p, end, m->epoch, e_inval);
  2305. ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
  2306. if (m->num_ops > ARRAY_SIZE(m->outdata_len))
  2307. goto e_inval;
  2308. ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
  2309. e_inval);
  2310. for (i = 0; i < m->num_ops; i++) {
  2311. struct ceph_osd_op *op = p;
  2312. m->outdata_len[i] = le32_to_cpu(op->payload_len);
  2313. p += sizeof(*op);
  2314. }
  2315. ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
  2316. for (i = 0; i < m->num_ops; i++)
  2317. ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
  2318. if (version >= 5) {
  2319. ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
  2320. memcpy(&m->replay_version, p, sizeof(m->replay_version));
  2321. p += sizeof(m->replay_version);
  2322. ceph_decode_64_safe(&p, end, m->user_version, e_inval);
  2323. } else {
  2324. m->replay_version = bad_replay_version; /* struct */
  2325. m->user_version = le64_to_cpu(m->replay_version.version);
  2326. }
  2327. if (version >= 6) {
  2328. if (version >= 7)
  2329. ceph_decode_8_safe(&p, end, decode_redir, e_inval);
  2330. else
  2331. decode_redir = 1;
  2332. } else {
  2333. decode_redir = 0;
  2334. }
  2335. if (decode_redir) {
  2336. ret = ceph_redirect_decode(&p, end, &m->redirect);
  2337. if (ret)
  2338. return ret;
  2339. } else {
  2340. ceph_oloc_init(&m->redirect.oloc);
  2341. }
  2342. return 0;
  2343. e_inval:
  2344. return -EINVAL;
  2345. }
  2346. /*
  2347. * We are done with @req if
  2348. * - @m is a safe reply, or
  2349. * - @m is an unsafe reply and we didn't want a safe one
  2350. */
  2351. static bool done_request(const struct ceph_osd_request *req,
  2352. const struct MOSDOpReply *m)
  2353. {
  2354. return (m->result < 0 ||
  2355. (m->flags & CEPH_OSD_FLAG_ONDISK) ||
  2356. !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
  2357. }
  2358. /*
  2359. * handle osd op reply. either call the callback if it is specified,
  2360. * or do the completion to wake up the waiting thread.
  2361. *
  2362. * ->r_unsafe_callback is set? yes no
  2363. *
  2364. * first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
  2365. * any or needed/got safe) r_done_completion r_done_completion
  2366. *
  2367. * first reply is unsafe r_unsafe_cb(true) (nothing)
  2368. *
  2369. * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
  2370. * r_done_completion r_done_completion
  2371. */
  2372. static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
  2373. {
  2374. struct ceph_osd_client *osdc = osd->o_osdc;
  2375. struct ceph_osd_request *req;
  2376. struct MOSDOpReply m;
  2377. u64 tid = le64_to_cpu(msg->hdr.tid);
  2378. u32 data_len = 0;
  2379. bool already_acked;
  2380. int ret;
  2381. int i;
  2382. dout("%s msg %p tid %llu\n", __func__, msg, tid);
  2383. down_read(&osdc->lock);
  2384. if (!osd_registered(osd)) {
  2385. dout("%s osd%d unknown\n", __func__, osd->o_osd);
  2386. goto out_unlock_osdc;
  2387. }
  2388. WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
  2389. mutex_lock(&osd->lock);
  2390. req = lookup_request(&osd->o_requests, tid);
  2391. if (!req) {
  2392. dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
  2393. goto out_unlock_session;
  2394. }
  2395. m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
  2396. ret = decode_MOSDOpReply(msg, &m);
  2397. m.redirect.oloc.pool_ns = NULL;
  2398. if (ret) {
  2399. pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
  2400. req->r_tid, ret);
  2401. ceph_msg_dump(msg);
  2402. goto fail_request;
  2403. }
  2404. dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
  2405. __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
  2406. m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
  2407. le64_to_cpu(m.replay_version.version), m.user_version);
  2408. if (m.retry_attempt >= 0) {
  2409. if (m.retry_attempt != req->r_attempts - 1) {
  2410. dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
  2411. req, req->r_tid, m.retry_attempt,
  2412. req->r_attempts - 1);
  2413. goto out_unlock_session;
  2414. }
  2415. } else {
  2416. WARN_ON(1); /* MOSDOpReply v4 is assumed */
  2417. }
  2418. if (!ceph_oloc_empty(&m.redirect.oloc)) {
  2419. dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
  2420. m.redirect.oloc.pool);
  2421. unlink_request(osd, req);
  2422. mutex_unlock(&osd->lock);
  2423. /*
  2424. * Not ceph_oloc_copy() - changing pool_ns is not
  2425. * supported.
  2426. */
  2427. req->r_t.target_oloc.pool = m.redirect.oloc.pool;
  2428. req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
  2429. req->r_tid = 0;
  2430. __submit_request(req, false);
  2431. goto out_unlock_osdc;
  2432. }
  2433. if (m.num_ops != req->r_num_ops) {
  2434. pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
  2435. req->r_num_ops, req->r_tid);
  2436. goto fail_request;
  2437. }
  2438. for (i = 0; i < req->r_num_ops; i++) {
  2439. dout(" req %p tid %llu op %d rval %d len %u\n", req,
  2440. req->r_tid, i, m.rval[i], m.outdata_len[i]);
  2441. req->r_ops[i].rval = m.rval[i];
  2442. req->r_ops[i].outdata_len = m.outdata_len[i];
  2443. data_len += m.outdata_len[i];
  2444. }
  2445. if (data_len != le32_to_cpu(msg->hdr.data_len)) {
  2446. pr_err("sum of lens %u != %u for tid %llu\n", data_len,
  2447. le32_to_cpu(msg->hdr.data_len), req->r_tid);
  2448. goto fail_request;
  2449. }
  2450. dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
  2451. req, req->r_tid, req->r_got_reply, m.result, data_len);
  2452. already_acked = req->r_got_reply;
  2453. if (!already_acked) {
  2454. req->r_result = m.result ?: data_len;
  2455. req->r_replay_version = m.replay_version; /* struct */
  2456. req->r_got_reply = true;
  2457. } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
  2458. dout("req %p tid %llu dup ack\n", req, req->r_tid);
  2459. goto out_unlock_session;
  2460. }
  2461. if (done_request(req, &m)) {
  2462. finish_request(req);
  2463. if (req->r_linger) {
  2464. WARN_ON(req->r_unsafe_callback);
  2465. dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
  2466. __complete_request(req);
  2467. }
  2468. }
  2469. mutex_unlock(&osd->lock);
  2470. up_read(&osdc->lock);
  2471. if (done_request(req, &m)) {
  2472. if (already_acked && req->r_unsafe_callback) {
  2473. dout("req %p tid %llu safe-cb\n", req, req->r_tid);
  2474. req->r_unsafe_callback(req, false);
  2475. } else if (!req->r_linger) {
  2476. dout("req %p tid %llu cb\n", req, req->r_tid);
  2477. __complete_request(req);
  2478. }
  2479. complete_all(&req->r_done_completion);
  2480. ceph_osdc_put_request(req);
  2481. } else {
  2482. if (req->r_unsafe_callback) {
  2483. dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
  2484. req->r_unsafe_callback(req, true);
  2485. } else {
  2486. WARN_ON(1);
  2487. }
  2488. }
  2489. return;
  2490. fail_request:
  2491. complete_request(req, -EIO);
  2492. out_unlock_session:
  2493. mutex_unlock(&osd->lock);
  2494. out_unlock_osdc:
  2495. up_read(&osdc->lock);
  2496. }
  2497. static void set_pool_was_full(struct ceph_osd_client *osdc)
  2498. {
  2499. struct rb_node *n;
  2500. for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
  2501. struct ceph_pg_pool_info *pi =
  2502. rb_entry(n, struct ceph_pg_pool_info, node);
  2503. pi->was_full = __pool_full(pi);
  2504. }
  2505. }
  2506. static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
  2507. {
  2508. struct ceph_pg_pool_info *pi;
  2509. pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
  2510. if (!pi)
  2511. return false;
  2512. return pi->was_full && !__pool_full(pi);
  2513. }
  2514. static enum calc_target_result
  2515. recalc_linger_target(struct ceph_osd_linger_request *lreq)
  2516. {
  2517. struct ceph_osd_client *osdc = lreq->osdc;
  2518. enum calc_target_result ct_res;
  2519. ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
  2520. if (ct_res == CALC_TARGET_NEED_RESEND) {
  2521. struct ceph_osd *osd;
  2522. osd = lookup_create_osd(osdc, lreq->t.osd, true);
  2523. if (osd != lreq->osd) {
  2524. unlink_linger(lreq->osd, lreq);
  2525. link_linger(osd, lreq);
  2526. }
  2527. }
  2528. return ct_res;
  2529. }
  2530. /*
  2531. * Requeue requests whose mapping to an OSD has changed.
  2532. */
  2533. static void scan_requests(struct ceph_osd *osd,
  2534. bool force_resend,
  2535. bool cleared_full,
  2536. bool check_pool_cleared_full,
  2537. struct rb_root *need_resend,
  2538. struct list_head *need_resend_linger)
  2539. {
  2540. struct ceph_osd_client *osdc = osd->o_osdc;
  2541. struct rb_node *n;
  2542. bool force_resend_writes;
  2543. for (n = rb_first(&osd->o_linger_requests); n; ) {
  2544. struct ceph_osd_linger_request *lreq =
  2545. rb_entry(n, struct ceph_osd_linger_request, node);
  2546. enum calc_target_result ct_res;
  2547. n = rb_next(n); /* recalc_linger_target() */
  2548. dout("%s lreq %p linger_id %llu\n", __func__, lreq,
  2549. lreq->linger_id);
  2550. ct_res = recalc_linger_target(lreq);
  2551. switch (ct_res) {
  2552. case CALC_TARGET_NO_ACTION:
  2553. force_resend_writes = cleared_full ||
  2554. (check_pool_cleared_full &&
  2555. pool_cleared_full(osdc, lreq->t.base_oloc.pool));
  2556. if (!force_resend && !force_resend_writes)
  2557. break;
  2558. /* fall through */
  2559. case CALC_TARGET_NEED_RESEND:
  2560. cancel_linger_map_check(lreq);
  2561. /*
  2562. * scan_requests() for the previous epoch(s)
  2563. * may have already added it to the list, since
  2564. * it's not unlinked here.
  2565. */
  2566. if (list_empty(&lreq->scan_item))
  2567. list_add_tail(&lreq->scan_item, need_resend_linger);
  2568. break;
  2569. case CALC_TARGET_POOL_DNE:
  2570. check_linger_pool_dne(lreq);
  2571. break;
  2572. }
  2573. }
  2574. for (n = rb_first(&osd->o_requests); n; ) {
  2575. struct ceph_osd_request *req =
  2576. rb_entry(n, struct ceph_osd_request, r_node);
  2577. enum calc_target_result ct_res;
  2578. n = rb_next(n); /* unlink_request(), check_pool_dne() */
  2579. dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
  2580. ct_res = calc_target(osdc, &req->r_t,
  2581. &req->r_last_force_resend, false);
  2582. switch (ct_res) {
  2583. case CALC_TARGET_NO_ACTION:
  2584. force_resend_writes = cleared_full ||
  2585. (check_pool_cleared_full &&
  2586. pool_cleared_full(osdc, req->r_t.base_oloc.pool));
  2587. if (!force_resend &&
  2588. (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
  2589. !force_resend_writes))
  2590. break;
  2591. /* fall through */
  2592. case CALC_TARGET_NEED_RESEND:
  2593. cancel_map_check(req);
  2594. unlink_request(osd, req);
  2595. insert_request(need_resend, req);
  2596. break;
  2597. case CALC_TARGET_POOL_DNE:
  2598. check_pool_dne(req);
  2599. break;
  2600. }
  2601. }
  2602. }
  2603. static int handle_one_map(struct ceph_osd_client *osdc,
  2604. void *p, void *end, bool incremental,
  2605. struct rb_root *need_resend,
  2606. struct list_head *need_resend_linger)
  2607. {
  2608. struct ceph_osdmap *newmap;
  2609. struct rb_node *n;
  2610. bool skipped_map = false;
  2611. bool was_full;
  2612. was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
  2613. set_pool_was_full(osdc);
  2614. if (incremental)
  2615. newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
  2616. else
  2617. newmap = ceph_osdmap_decode(&p, end);
  2618. if (IS_ERR(newmap))
  2619. return PTR_ERR(newmap);
  2620. if (newmap != osdc->osdmap) {
  2621. /*
  2622. * Preserve ->was_full before destroying the old map.
  2623. * For pools that weren't in the old map, ->was_full
  2624. * should be false.
  2625. */
  2626. for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
  2627. struct ceph_pg_pool_info *pi =
  2628. rb_entry(n, struct ceph_pg_pool_info, node);
  2629. struct ceph_pg_pool_info *old_pi;
  2630. old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
  2631. if (old_pi)
  2632. pi->was_full = old_pi->was_full;
  2633. else
  2634. WARN_ON(pi->was_full);
  2635. }
  2636. if (osdc->osdmap->epoch &&
  2637. osdc->osdmap->epoch + 1 < newmap->epoch) {
  2638. WARN_ON(incremental);
  2639. skipped_map = true;
  2640. }
  2641. ceph_osdmap_destroy(osdc->osdmap);
  2642. osdc->osdmap = newmap;
  2643. }
  2644. was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
  2645. scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
  2646. need_resend, need_resend_linger);
  2647. for (n = rb_first(&osdc->osds); n; ) {
  2648. struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
  2649. n = rb_next(n); /* close_osd() */
  2650. scan_requests(osd, skipped_map, was_full, true, need_resend,
  2651. need_resend_linger);
  2652. if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
  2653. memcmp(&osd->o_con.peer_addr,
  2654. ceph_osd_addr(osdc->osdmap, osd->o_osd),
  2655. sizeof(struct ceph_entity_addr)))
  2656. close_osd(osd);
  2657. }
  2658. return 0;
  2659. }
  2660. static void kick_requests(struct ceph_osd_client *osdc,
  2661. struct rb_root *need_resend,
  2662. struct list_head *need_resend_linger)
  2663. {
  2664. struct ceph_osd_linger_request *lreq, *nlreq;
  2665. struct rb_node *n;
  2666. for (n = rb_first(need_resend); n; ) {
  2667. struct ceph_osd_request *req =
  2668. rb_entry(n, struct ceph_osd_request, r_node);
  2669. struct ceph_osd *osd;
  2670. n = rb_next(n);
  2671. erase_request(need_resend, req); /* before link_request() */
  2672. WARN_ON(req->r_osd);
  2673. calc_target(osdc, &req->r_t, NULL, false);
  2674. osd = lookup_create_osd(osdc, req->r_t.osd, true);
  2675. link_request(osd, req);
  2676. if (!req->r_linger) {
  2677. if (!osd_homeless(osd) && !req->r_t.paused)
  2678. send_request(req);
  2679. } else {
  2680. cancel_linger_request(req);
  2681. }
  2682. }
  2683. list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
  2684. if (!osd_homeless(lreq->osd))
  2685. send_linger(lreq);
  2686. list_del_init(&lreq->scan_item);
  2687. }
  2688. }
  2689. /*
  2690. * Process updated osd map.
  2691. *
  2692. * The message contains any number of incremental and full maps, normally
  2693. * indicating some sort of topology change in the cluster. Kick requests
  2694. * off to different OSDs as needed.
  2695. */
  2696. void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
  2697. {
  2698. void *p = msg->front.iov_base;
  2699. void *const end = p + msg->front.iov_len;
  2700. u32 nr_maps, maplen;
  2701. u32 epoch;
  2702. struct ceph_fsid fsid;
  2703. struct rb_root need_resend = RB_ROOT;
  2704. LIST_HEAD(need_resend_linger);
  2705. bool handled_incremental = false;
  2706. bool was_pauserd, was_pausewr;
  2707. bool pauserd, pausewr;
  2708. int err;
  2709. dout("%s have %u\n", __func__, osdc->osdmap->epoch);
  2710. down_write(&osdc->lock);
  2711. /* verify fsid */
  2712. ceph_decode_need(&p, end, sizeof(fsid), bad);
  2713. ceph_decode_copy(&p, &fsid, sizeof(fsid));
  2714. if (ceph_check_fsid(osdc->client, &fsid) < 0)
  2715. goto bad;
  2716. was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
  2717. was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
  2718. ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
  2719. have_pool_full(osdc);
  2720. /* incremental maps */
  2721. ceph_decode_32_safe(&p, end, nr_maps, bad);
  2722. dout(" %d inc maps\n", nr_maps);
  2723. while (nr_maps > 0) {
  2724. ceph_decode_need(&p, end, 2*sizeof(u32), bad);
  2725. epoch = ceph_decode_32(&p);
  2726. maplen = ceph_decode_32(&p);
  2727. ceph_decode_need(&p, end, maplen, bad);
  2728. if (osdc->osdmap->epoch &&
  2729. osdc->osdmap->epoch + 1 == epoch) {
  2730. dout("applying incremental map %u len %d\n",
  2731. epoch, maplen);
  2732. err = handle_one_map(osdc, p, p + maplen, true,
  2733. &need_resend, &need_resend_linger);
  2734. if (err)
  2735. goto bad;
  2736. handled_incremental = true;
  2737. } else {
  2738. dout("ignoring incremental map %u len %d\n",
  2739. epoch, maplen);
  2740. }
  2741. p += maplen;
  2742. nr_maps--;
  2743. }
  2744. if (handled_incremental)
  2745. goto done;
  2746. /* full maps */
  2747. ceph_decode_32_safe(&p, end, nr_maps, bad);
  2748. dout(" %d full maps\n", nr_maps);
  2749. while (nr_maps) {
  2750. ceph_decode_need(&p, end, 2*sizeof(u32), bad);
  2751. epoch = ceph_decode_32(&p);
  2752. maplen = ceph_decode_32(&p);
  2753. ceph_decode_need(&p, end, maplen, bad);
  2754. if (nr_maps > 1) {
  2755. dout("skipping non-latest full map %u len %d\n",
  2756. epoch, maplen);
  2757. } else if (osdc->osdmap->epoch >= epoch) {
  2758. dout("skipping full map %u len %d, "
  2759. "older than our %u\n", epoch, maplen,
  2760. osdc->osdmap->epoch);
  2761. } else {
  2762. dout("taking full map %u len %d\n", epoch, maplen);
  2763. err = handle_one_map(osdc, p, p + maplen, false,
  2764. &need_resend, &need_resend_linger);
  2765. if (err)
  2766. goto bad;
  2767. }
  2768. p += maplen;
  2769. nr_maps--;
  2770. }
  2771. done:
  2772. /*
  2773. * subscribe to subsequent osdmap updates if full to ensure
  2774. * we find out when we are no longer full and stop returning
  2775. * ENOSPC.
  2776. */
  2777. pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
  2778. pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
  2779. ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
  2780. have_pool_full(osdc);
  2781. if (was_pauserd || was_pausewr || pauserd || pausewr)
  2782. maybe_request_map(osdc);
  2783. kick_requests(osdc, &need_resend, &need_resend_linger);
  2784. ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
  2785. osdc->osdmap->epoch);
  2786. up_write(&osdc->lock);
  2787. wake_up_all(&osdc->client->auth_wq);
  2788. return;
  2789. bad:
  2790. pr_err("osdc handle_map corrupt msg\n");
  2791. ceph_msg_dump(msg);
  2792. up_write(&osdc->lock);
  2793. }
  2794. /*
  2795. * Resubmit requests pending on the given osd.
  2796. */
  2797. static void kick_osd_requests(struct ceph_osd *osd)
  2798. {
  2799. struct rb_node *n;
  2800. for (n = rb_first(&osd->o_requests); n; ) {
  2801. struct ceph_osd_request *req =
  2802. rb_entry(n, struct ceph_osd_request, r_node);
  2803. n = rb_next(n); /* cancel_linger_request() */
  2804. if (!req->r_linger) {
  2805. if (!req->r_t.paused)
  2806. send_request(req);
  2807. } else {
  2808. cancel_linger_request(req);
  2809. }
  2810. }
  2811. for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
  2812. struct ceph_osd_linger_request *lreq =
  2813. rb_entry(n, struct ceph_osd_linger_request, node);
  2814. send_linger(lreq);
  2815. }
  2816. }
  2817. /*
  2818. * If the osd connection drops, we need to resubmit all requests.
  2819. */
  2820. static void osd_fault(struct ceph_connection *con)
  2821. {
  2822. struct ceph_osd *osd = con->private;
  2823. struct ceph_osd_client *osdc = osd->o_osdc;
  2824. dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
  2825. down_write(&osdc->lock);
  2826. if (!osd_registered(osd)) {
  2827. dout("%s osd%d unknown\n", __func__, osd->o_osd);
  2828. goto out_unlock;
  2829. }
  2830. if (!reopen_osd(osd))
  2831. kick_osd_requests(osd);
  2832. maybe_request_map(osdc);
  2833. out_unlock:
  2834. up_write(&osdc->lock);
  2835. }
  2836. /*
  2837. * Process osd watch notifications
  2838. */
  2839. static void handle_watch_notify(struct ceph_osd_client *osdc,
  2840. struct ceph_msg *msg)
  2841. {
  2842. void *p = msg->front.iov_base;
  2843. void *const end = p + msg->front.iov_len;
  2844. struct ceph_osd_linger_request *lreq;
  2845. struct linger_work *lwork;
  2846. u8 proto_ver, opcode;
  2847. u64 cookie, notify_id;
  2848. u64 notifier_id = 0;
  2849. s32 return_code = 0;
  2850. void *payload = NULL;
  2851. u32 payload_len = 0;
  2852. ceph_decode_8_safe(&p, end, proto_ver, bad);
  2853. ceph_decode_8_safe(&p, end, opcode, bad);
  2854. ceph_decode_64_safe(&p, end, cookie, bad);
  2855. p += 8; /* skip ver */
  2856. ceph_decode_64_safe(&p, end, notify_id, bad);
  2857. if (proto_ver >= 1) {
  2858. ceph_decode_32_safe(&p, end, payload_len, bad);
  2859. ceph_decode_need(&p, end, payload_len, bad);
  2860. payload = p;
  2861. p += payload_len;
  2862. }
  2863. if (le16_to_cpu(msg->hdr.version) >= 2)
  2864. ceph_decode_32_safe(&p, end, return_code, bad);
  2865. if (le16_to_cpu(msg->hdr.version) >= 3)
  2866. ceph_decode_64_safe(&p, end, notifier_id, bad);
  2867. down_read(&osdc->lock);
  2868. lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
  2869. if (!lreq) {
  2870. dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
  2871. cookie);
  2872. goto out_unlock_osdc;
  2873. }
  2874. mutex_lock(&lreq->lock);
  2875. dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
  2876. opcode, cookie, lreq, lreq->is_watch);
  2877. if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
  2878. if (!lreq->last_error) {
  2879. lreq->last_error = -ENOTCONN;
  2880. queue_watch_error(lreq);
  2881. }
  2882. } else if (!lreq->is_watch) {
  2883. /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
  2884. if (lreq->notify_id && lreq->notify_id != notify_id) {
  2885. dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
  2886. lreq->notify_id, notify_id);
  2887. } else if (!completion_done(&lreq->notify_finish_wait)) {
  2888. struct ceph_msg_data *data =
  2889. list_first_entry_or_null(&msg->data,
  2890. struct ceph_msg_data,
  2891. links);
  2892. if (data) {
  2893. if (lreq->preply_pages) {
  2894. WARN_ON(data->type !=
  2895. CEPH_MSG_DATA_PAGES);
  2896. *lreq->preply_pages = data->pages;
  2897. *lreq->preply_len = data->length;
  2898. } else {
  2899. ceph_release_page_vector(data->pages,
  2900. calc_pages_for(0, data->length));
  2901. }
  2902. }
  2903. lreq->notify_finish_error = return_code;
  2904. complete_all(&lreq->notify_finish_wait);
  2905. }
  2906. } else {
  2907. /* CEPH_WATCH_EVENT_NOTIFY */
  2908. lwork = lwork_alloc(lreq, do_watch_notify);
  2909. if (!lwork) {
  2910. pr_err("failed to allocate notify-lwork\n");
  2911. goto out_unlock_lreq;
  2912. }
  2913. lwork->notify.notify_id = notify_id;
  2914. lwork->notify.notifier_id = notifier_id;
  2915. lwork->notify.payload = payload;
  2916. lwork->notify.payload_len = payload_len;
  2917. lwork->notify.msg = ceph_msg_get(msg);
  2918. lwork_queue(lwork);
  2919. }
  2920. out_unlock_lreq:
  2921. mutex_unlock(&lreq->lock);
  2922. out_unlock_osdc:
  2923. up_read(&osdc->lock);
  2924. return;
  2925. bad:
  2926. pr_err("osdc handle_watch_notify corrupt msg\n");
  2927. }
  2928. /*
  2929. * Register request, send initial attempt.
  2930. */
  2931. int ceph_osdc_start_request(struct ceph_osd_client *osdc,
  2932. struct ceph_osd_request *req,
  2933. bool nofail)
  2934. {
  2935. down_read(&osdc->lock);
  2936. submit_request(req, false);
  2937. up_read(&osdc->lock);
  2938. return 0;
  2939. }
  2940. EXPORT_SYMBOL(ceph_osdc_start_request);
  2941. /*
  2942. * Unregister a registered request. The request is not completed:
  2943. * ->r_result isn't set and __complete_request() isn't called.
  2944. */
  2945. void ceph_osdc_cancel_request(struct ceph_osd_request *req)
  2946. {
  2947. struct ceph_osd_client *osdc = req->r_osdc;
  2948. down_write(&osdc->lock);
  2949. if (req->r_osd)
  2950. cancel_request(req);
  2951. up_write(&osdc->lock);
  2952. }
  2953. EXPORT_SYMBOL(ceph_osdc_cancel_request);
  2954. /*
  2955. * @timeout: in jiffies, 0 means "wait forever"
  2956. */
  2957. static int wait_request_timeout(struct ceph_osd_request *req,
  2958. unsigned long timeout)
  2959. {
  2960. long left;
  2961. dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
  2962. left = wait_for_completion_killable_timeout(&req->r_completion,
  2963. ceph_timeout_jiffies(timeout));
  2964. if (left <= 0) {
  2965. left = left ?: -ETIMEDOUT;
  2966. ceph_osdc_cancel_request(req);
  2967. } else {
  2968. left = req->r_result; /* completed */
  2969. }
  2970. return left;
  2971. }
  2972. /*
  2973. * wait for a request to complete
  2974. */
  2975. int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
  2976. struct ceph_osd_request *req)
  2977. {
  2978. return wait_request_timeout(req, 0);
  2979. }
  2980. EXPORT_SYMBOL(ceph_osdc_wait_request);
  2981. /*
  2982. * sync - wait for all in-flight requests to flush. avoid starvation.
  2983. */
  2984. void ceph_osdc_sync(struct ceph_osd_client *osdc)
  2985. {
  2986. struct rb_node *n, *p;
  2987. u64 last_tid = atomic64_read(&osdc->last_tid);
  2988. again:
  2989. down_read(&osdc->lock);
  2990. for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
  2991. struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
  2992. mutex_lock(&osd->lock);
  2993. for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
  2994. struct ceph_osd_request *req =
  2995. rb_entry(p, struct ceph_osd_request, r_node);
  2996. if (req->r_tid > last_tid)
  2997. break;
  2998. if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
  2999. continue;
  3000. ceph_osdc_get_request(req);
  3001. mutex_unlock(&osd->lock);
  3002. up_read(&osdc->lock);
  3003. dout("%s waiting on req %p tid %llu last_tid %llu\n",
  3004. __func__, req, req->r_tid, last_tid);
  3005. wait_for_completion(&req->r_done_completion);
  3006. ceph_osdc_put_request(req);
  3007. goto again;
  3008. }
  3009. mutex_unlock(&osd->lock);
  3010. }
  3011. up_read(&osdc->lock);
  3012. dout("%s done last_tid %llu\n", __func__, last_tid);
  3013. }
  3014. EXPORT_SYMBOL(ceph_osdc_sync);
  3015. static struct ceph_osd_request *
  3016. alloc_linger_request(struct ceph_osd_linger_request *lreq)
  3017. {
  3018. struct ceph_osd_request *req;
  3019. req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
  3020. if (!req)
  3021. return NULL;
  3022. ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
  3023. ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
  3024. if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
  3025. ceph_osdc_put_request(req);
  3026. return NULL;
  3027. }
  3028. return req;
  3029. }
  3030. /*
  3031. * Returns a handle, caller owns a ref.
  3032. */
  3033. struct ceph_osd_linger_request *
  3034. ceph_osdc_watch(struct ceph_osd_client *osdc,
  3035. struct ceph_object_id *oid,
  3036. struct ceph_object_locator *oloc,
  3037. rados_watchcb2_t wcb,
  3038. rados_watcherrcb_t errcb,
  3039. void *data)
  3040. {
  3041. struct ceph_osd_linger_request *lreq;
  3042. int ret;
  3043. lreq = linger_alloc(osdc);
  3044. if (!lreq)
  3045. return ERR_PTR(-ENOMEM);
  3046. lreq->is_watch = true;
  3047. lreq->wcb = wcb;
  3048. lreq->errcb = errcb;
  3049. lreq->data = data;
  3050. lreq->watch_valid_thru = jiffies;
  3051. ceph_oid_copy(&lreq->t.base_oid, oid);
  3052. ceph_oloc_copy(&lreq->t.base_oloc, oloc);
  3053. lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
  3054. lreq->mtime = CURRENT_TIME;
  3055. lreq->reg_req = alloc_linger_request(lreq);
  3056. if (!lreq->reg_req) {
  3057. ret = -ENOMEM;
  3058. goto err_put_lreq;
  3059. }
  3060. lreq->ping_req = alloc_linger_request(lreq);
  3061. if (!lreq->ping_req) {
  3062. ret = -ENOMEM;
  3063. goto err_put_lreq;
  3064. }
  3065. down_write(&osdc->lock);
  3066. linger_register(lreq); /* before osd_req_op_* */
  3067. osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
  3068. CEPH_OSD_WATCH_OP_WATCH);
  3069. osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
  3070. CEPH_OSD_WATCH_OP_PING);
  3071. linger_submit(lreq);
  3072. up_write(&osdc->lock);
  3073. ret = linger_reg_commit_wait(lreq);
  3074. if (ret) {
  3075. linger_cancel(lreq);
  3076. goto err_put_lreq;
  3077. }
  3078. return lreq;
  3079. err_put_lreq:
  3080. linger_put(lreq);
  3081. return ERR_PTR(ret);
  3082. }
  3083. EXPORT_SYMBOL(ceph_osdc_watch);
  3084. /*
  3085. * Releases a ref.
  3086. *
  3087. * Times out after mount_timeout to preserve rbd unmap behaviour
  3088. * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
  3089. * with mount_timeout").
  3090. */
  3091. int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
  3092. struct ceph_osd_linger_request *lreq)
  3093. {
  3094. struct ceph_options *opts = osdc->client->options;
  3095. struct ceph_osd_request *req;
  3096. int ret;
  3097. req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
  3098. if (!req)
  3099. return -ENOMEM;
  3100. ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
  3101. ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
  3102. req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
  3103. req->r_mtime = CURRENT_TIME;
  3104. osd_req_op_watch_init(req, 0, lreq->linger_id,
  3105. CEPH_OSD_WATCH_OP_UNWATCH);
  3106. ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
  3107. if (ret)
  3108. goto out_put_req;
  3109. ceph_osdc_start_request(osdc, req, false);
  3110. linger_cancel(lreq);
  3111. linger_put(lreq);
  3112. ret = wait_request_timeout(req, opts->mount_timeout);
  3113. out_put_req:
  3114. ceph_osdc_put_request(req);
  3115. return ret;
  3116. }
  3117. EXPORT_SYMBOL(ceph_osdc_unwatch);
  3118. static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
  3119. u64 notify_id, u64 cookie, void *payload,
  3120. size_t payload_len)
  3121. {
  3122. struct ceph_osd_req_op *op;
  3123. struct ceph_pagelist *pl;
  3124. int ret;
  3125. op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
  3126. pl = kmalloc(sizeof(*pl), GFP_NOIO);
  3127. if (!pl)
  3128. return -ENOMEM;
  3129. ceph_pagelist_init(pl);
  3130. ret = ceph_pagelist_encode_64(pl, notify_id);
  3131. ret |= ceph_pagelist_encode_64(pl, cookie);
  3132. if (payload) {
  3133. ret |= ceph_pagelist_encode_32(pl, payload_len);
  3134. ret |= ceph_pagelist_append(pl, payload, payload_len);
  3135. } else {
  3136. ret |= ceph_pagelist_encode_32(pl, 0);
  3137. }
  3138. if (ret) {
  3139. ceph_pagelist_release(pl);
  3140. return -ENOMEM;
  3141. }
  3142. ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
  3143. op->indata_len = pl->length;
  3144. return 0;
  3145. }
  3146. int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
  3147. struct ceph_object_id *oid,
  3148. struct ceph_object_locator *oloc,
  3149. u64 notify_id,
  3150. u64 cookie,
  3151. void *payload,
  3152. size_t payload_len)
  3153. {
  3154. struct ceph_osd_request *req;
  3155. int ret;
  3156. req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
  3157. if (!req)
  3158. return -ENOMEM;
  3159. ceph_oid_copy(&req->r_base_oid, oid);
  3160. ceph_oloc_copy(&req->r_base_oloc, oloc);
  3161. req->r_flags = CEPH_OSD_FLAG_READ;
  3162. ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
  3163. if (ret)
  3164. goto out_put_req;
  3165. ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
  3166. payload_len);
  3167. if (ret)
  3168. goto out_put_req;
  3169. ceph_osdc_start_request(osdc, req, false);
  3170. ret = ceph_osdc_wait_request(osdc, req);
  3171. out_put_req:
  3172. ceph_osdc_put_request(req);
  3173. return ret;
  3174. }
  3175. EXPORT_SYMBOL(ceph_osdc_notify_ack);
  3176. static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
  3177. u64 cookie, u32 prot_ver, u32 timeout,
  3178. void *payload, size_t payload_len)
  3179. {
  3180. struct ceph_osd_req_op *op;
  3181. struct ceph_pagelist *pl;
  3182. int ret;
  3183. op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
  3184. op->notify.cookie = cookie;
  3185. pl = kmalloc(sizeof(*pl), GFP_NOIO);
  3186. if (!pl)
  3187. return -ENOMEM;
  3188. ceph_pagelist_init(pl);
  3189. ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
  3190. ret |= ceph_pagelist_encode_32(pl, timeout);
  3191. ret |= ceph_pagelist_encode_32(pl, payload_len);
  3192. ret |= ceph_pagelist_append(pl, payload, payload_len);
  3193. if (ret) {
  3194. ceph_pagelist_release(pl);
  3195. return -ENOMEM;
  3196. }
  3197. ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
  3198. op->indata_len = pl->length;
  3199. return 0;
  3200. }
  3201. /*
  3202. * @timeout: in seconds
  3203. *
  3204. * @preply_{pages,len} are initialized both on success and error.
  3205. * The caller is responsible for:
  3206. *
  3207. * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
  3208. */
  3209. int ceph_osdc_notify(struct ceph_osd_client *osdc,
  3210. struct ceph_object_id *oid,
  3211. struct ceph_object_locator *oloc,
  3212. void *payload,
  3213. size_t payload_len,
  3214. u32 timeout,
  3215. struct page ***preply_pages,
  3216. size_t *preply_len)
  3217. {
  3218. struct ceph_osd_linger_request *lreq;
  3219. struct page **pages;
  3220. int ret;
  3221. WARN_ON(!timeout);
  3222. if (preply_pages) {
  3223. *preply_pages = NULL;
  3224. *preply_len = 0;
  3225. }
  3226. lreq = linger_alloc(osdc);
  3227. if (!lreq)
  3228. return -ENOMEM;
  3229. lreq->preply_pages = preply_pages;
  3230. lreq->preply_len = preply_len;
  3231. ceph_oid_copy(&lreq->t.base_oid, oid);
  3232. ceph_oloc_copy(&lreq->t.base_oloc, oloc);
  3233. lreq->t.flags = CEPH_OSD_FLAG_READ;
  3234. lreq->reg_req = alloc_linger_request(lreq);
  3235. if (!lreq->reg_req) {
  3236. ret = -ENOMEM;
  3237. goto out_put_lreq;
  3238. }
  3239. /* for notify_id */
  3240. pages = ceph_alloc_page_vector(1, GFP_NOIO);
  3241. if (IS_ERR(pages)) {
  3242. ret = PTR_ERR(pages);
  3243. goto out_put_lreq;
  3244. }
  3245. down_write(&osdc->lock);
  3246. linger_register(lreq); /* before osd_req_op_* */
  3247. ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
  3248. timeout, payload, payload_len);
  3249. if (ret) {
  3250. linger_unregister(lreq);
  3251. up_write(&osdc->lock);
  3252. ceph_release_page_vector(pages, 1);
  3253. goto out_put_lreq;
  3254. }
  3255. ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
  3256. response_data),
  3257. pages, PAGE_SIZE, 0, false, true);
  3258. linger_submit(lreq);
  3259. up_write(&osdc->lock);
  3260. ret = linger_reg_commit_wait(lreq);
  3261. if (!ret)
  3262. ret = linger_notify_finish_wait(lreq);
  3263. else
  3264. dout("lreq %p failed to initiate notify %d\n", lreq, ret);
  3265. linger_cancel(lreq);
  3266. out_put_lreq:
  3267. linger_put(lreq);
  3268. return ret;
  3269. }
  3270. EXPORT_SYMBOL(ceph_osdc_notify);
  3271. /*
  3272. * Return the number of milliseconds since the watch was last
  3273. * confirmed, or an error. If there is an error, the watch is no
  3274. * longer valid, and should be destroyed with ceph_osdc_unwatch().
  3275. */
  3276. int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
  3277. struct ceph_osd_linger_request *lreq)
  3278. {
  3279. unsigned long stamp, age;
  3280. int ret;
  3281. down_read(&osdc->lock);
  3282. mutex_lock(&lreq->lock);
  3283. stamp = lreq->watch_valid_thru;
  3284. if (!list_empty(&lreq->pending_lworks)) {
  3285. struct linger_work *lwork =
  3286. list_first_entry(&lreq->pending_lworks,
  3287. struct linger_work,
  3288. pending_item);
  3289. if (time_before(lwork->queued_stamp, stamp))
  3290. stamp = lwork->queued_stamp;
  3291. }
  3292. age = jiffies - stamp;
  3293. dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
  3294. lreq, lreq->linger_id, age, lreq->last_error);
  3295. /* we are truncating to msecs, so return a safe upper bound */
  3296. ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
  3297. mutex_unlock(&lreq->lock);
  3298. up_read(&osdc->lock);
  3299. return ret;
  3300. }
  3301. static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
  3302. {
  3303. u8 struct_v;
  3304. u32 struct_len;
  3305. int ret;
  3306. ret = ceph_start_decoding(p, end, 2, "watch_item_t",
  3307. &struct_v, &struct_len);
  3308. if (ret)
  3309. return ret;
  3310. ceph_decode_copy(p, &item->name, sizeof(item->name));
  3311. item->cookie = ceph_decode_64(p);
  3312. *p += 4; /* skip timeout_seconds */
  3313. if (struct_v >= 2) {
  3314. ceph_decode_copy(p, &item->addr, sizeof(item->addr));
  3315. ceph_decode_addr(&item->addr);
  3316. }
  3317. dout("%s %s%llu cookie %llu addr %s\n", __func__,
  3318. ENTITY_NAME(item->name), item->cookie,
  3319. ceph_pr_addr(&item->addr.in_addr));
  3320. return 0;
  3321. }
  3322. static int decode_watchers(void **p, void *end,
  3323. struct ceph_watch_item **watchers,
  3324. u32 *num_watchers)
  3325. {
  3326. u8 struct_v;
  3327. u32 struct_len;
  3328. int i;
  3329. int ret;
  3330. ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
  3331. &struct_v, &struct_len);
  3332. if (ret)
  3333. return ret;
  3334. *num_watchers = ceph_decode_32(p);
  3335. *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
  3336. if (!*watchers)
  3337. return -ENOMEM;
  3338. for (i = 0; i < *num_watchers; i++) {
  3339. ret = decode_watcher(p, end, *watchers + i);
  3340. if (ret) {
  3341. kfree(*watchers);
  3342. return ret;
  3343. }
  3344. }
  3345. return 0;
  3346. }
  3347. /*
  3348. * On success, the caller is responsible for:
  3349. *
  3350. * kfree(watchers);
  3351. */
  3352. int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
  3353. struct ceph_object_id *oid,
  3354. struct ceph_object_locator *oloc,
  3355. struct ceph_watch_item **watchers,
  3356. u32 *num_watchers)
  3357. {
  3358. struct ceph_osd_request *req;
  3359. struct page **pages;
  3360. int ret;
  3361. req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
  3362. if (!req)
  3363. return -ENOMEM;
  3364. ceph_oid_copy(&req->r_base_oid, oid);
  3365. ceph_oloc_copy(&req->r_base_oloc, oloc);
  3366. req->r_flags = CEPH_OSD_FLAG_READ;
  3367. ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
  3368. if (ret)
  3369. goto out_put_req;
  3370. pages = ceph_alloc_page_vector(1, GFP_NOIO);
  3371. if (IS_ERR(pages)) {
  3372. ret = PTR_ERR(pages);
  3373. goto out_put_req;
  3374. }
  3375. osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
  3376. ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
  3377. response_data),
  3378. pages, PAGE_SIZE, 0, false, true);
  3379. ceph_osdc_start_request(osdc, req, false);
  3380. ret = ceph_osdc_wait_request(osdc, req);
  3381. if (ret >= 0) {
  3382. void *p = page_address(pages[0]);
  3383. void *const end = p + req->r_ops[0].outdata_len;
  3384. ret = decode_watchers(&p, end, watchers, num_watchers);
  3385. }
  3386. out_put_req:
  3387. ceph_osdc_put_request(req);
  3388. return ret;
  3389. }
  3390. EXPORT_SYMBOL(ceph_osdc_list_watchers);
  3391. /*
  3392. * Call all pending notify callbacks - for use after a watch is
  3393. * unregistered, to make sure no more callbacks for it will be invoked
  3394. */
  3395. void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
  3396. {
  3397. dout("%s osdc %p\n", __func__, osdc);
  3398. flush_workqueue(osdc->notify_wq);
  3399. }
  3400. EXPORT_SYMBOL(ceph_osdc_flush_notifies);
  3401. void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
  3402. {
  3403. down_read(&osdc->lock);
  3404. maybe_request_map(osdc);
  3405. up_read(&osdc->lock);
  3406. }
  3407. EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
  3408. /*
  3409. * Execute an OSD class method on an object.
  3410. *
  3411. * @flags: CEPH_OSD_FLAG_*
  3412. * @resp_len: out param for reply length
  3413. */
  3414. int ceph_osdc_call(struct ceph_osd_client *osdc,
  3415. struct ceph_object_id *oid,
  3416. struct ceph_object_locator *oloc,
  3417. const char *class, const char *method,
  3418. unsigned int flags,
  3419. struct page *req_page, size_t req_len,
  3420. struct page *resp_page, size_t *resp_len)
  3421. {
  3422. struct ceph_osd_request *req;
  3423. int ret;
  3424. req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
  3425. if (!req)
  3426. return -ENOMEM;
  3427. ceph_oid_copy(&req->r_base_oid, oid);
  3428. ceph_oloc_copy(&req->r_base_oloc, oloc);
  3429. req->r_flags = flags;
  3430. ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
  3431. if (ret)
  3432. goto out_put_req;
  3433. osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
  3434. if (req_page)
  3435. osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
  3436. 0, false, false);
  3437. if (resp_page)
  3438. osd_req_op_cls_response_data_pages(req, 0, &resp_page,
  3439. PAGE_SIZE, 0, false, false);
  3440. ceph_osdc_start_request(osdc, req, false);
  3441. ret = ceph_osdc_wait_request(osdc, req);
  3442. if (ret >= 0) {
  3443. ret = req->r_ops[0].rval;
  3444. if (resp_page)
  3445. *resp_len = req->r_ops[0].outdata_len;
  3446. }
  3447. out_put_req:
  3448. ceph_osdc_put_request(req);
  3449. return ret;
  3450. }
  3451. EXPORT_SYMBOL(ceph_osdc_call);
  3452. /*
  3453. * init, shutdown
  3454. */
  3455. int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
  3456. {
  3457. int err;
  3458. dout("init\n");
  3459. osdc->client = client;
  3460. init_rwsem(&osdc->lock);
  3461. osdc->osds = RB_ROOT;
  3462. INIT_LIST_HEAD(&osdc->osd_lru);
  3463. spin_lock_init(&osdc->osd_lru_lock);
  3464. osd_init(&osdc->homeless_osd);
  3465. osdc->homeless_osd.o_osdc = osdc;
  3466. osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
  3467. osdc->last_linger_id = CEPH_LINGER_ID_START;
  3468. osdc->linger_requests = RB_ROOT;
  3469. osdc->map_checks = RB_ROOT;
  3470. osdc->linger_map_checks = RB_ROOT;
  3471. INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
  3472. INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
  3473. err = -ENOMEM;
  3474. osdc->osdmap = ceph_osdmap_alloc();
  3475. if (!osdc->osdmap)
  3476. goto out;
  3477. osdc->req_mempool = mempool_create_slab_pool(10,
  3478. ceph_osd_request_cache);
  3479. if (!osdc->req_mempool)
  3480. goto out_map;
  3481. err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
  3482. PAGE_SIZE, 10, true, "osd_op");
  3483. if (err < 0)
  3484. goto out_mempool;
  3485. err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
  3486. PAGE_SIZE, 10, true, "osd_op_reply");
  3487. if (err < 0)
  3488. goto out_msgpool;
  3489. err = -ENOMEM;
  3490. osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
  3491. if (!osdc->notify_wq)
  3492. goto out_msgpool_reply;
  3493. schedule_delayed_work(&osdc->timeout_work,
  3494. osdc->client->options->osd_keepalive_timeout);
  3495. schedule_delayed_work(&osdc->osds_timeout_work,
  3496. round_jiffies_relative(osdc->client->options->osd_idle_ttl));
  3497. return 0;
  3498. out_msgpool_reply:
  3499. ceph_msgpool_destroy(&osdc->msgpool_op_reply);
  3500. out_msgpool:
  3501. ceph_msgpool_destroy(&osdc->msgpool_op);
  3502. out_mempool:
  3503. mempool_destroy(osdc->req_mempool);
  3504. out_map:
  3505. ceph_osdmap_destroy(osdc->osdmap);
  3506. out:
  3507. return err;
  3508. }
  3509. void ceph_osdc_stop(struct ceph_osd_client *osdc)
  3510. {
  3511. flush_workqueue(osdc->notify_wq);
  3512. destroy_workqueue(osdc->notify_wq);
  3513. cancel_delayed_work_sync(&osdc->timeout_work);
  3514. cancel_delayed_work_sync(&osdc->osds_timeout_work);
  3515. down_write(&osdc->lock);
  3516. while (!RB_EMPTY_ROOT(&osdc->osds)) {
  3517. struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
  3518. struct ceph_osd, o_node);
  3519. close_osd(osd);
  3520. }
  3521. up_write(&osdc->lock);
  3522. WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
  3523. osd_cleanup(&osdc->homeless_osd);
  3524. WARN_ON(!list_empty(&osdc->osd_lru));
  3525. WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
  3526. WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
  3527. WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
  3528. WARN_ON(atomic_read(&osdc->num_requests));
  3529. WARN_ON(atomic_read(&osdc->num_homeless));
  3530. ceph_osdmap_destroy(osdc->osdmap);
  3531. mempool_destroy(osdc->req_mempool);
  3532. ceph_msgpool_destroy(&osdc->msgpool_op);
  3533. ceph_msgpool_destroy(&osdc->msgpool_op_reply);
  3534. }
  3535. /*
  3536. * Read some contiguous pages. If we cross a stripe boundary, shorten
  3537. * *plen. Return number of bytes read, or error.
  3538. */
  3539. int ceph_osdc_readpages(struct ceph_osd_client *osdc,
  3540. struct ceph_vino vino, struct ceph_file_layout *layout,
  3541. u64 off, u64 *plen,
  3542. u32 truncate_seq, u64 truncate_size,
  3543. struct page **pages, int num_pages, int page_align)
  3544. {
  3545. struct ceph_osd_request *req;
  3546. int rc = 0;
  3547. dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
  3548. vino.snap, off, *plen);
  3549. req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
  3550. CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
  3551. NULL, truncate_seq, truncate_size,
  3552. false);
  3553. if (IS_ERR(req))
  3554. return PTR_ERR(req);
  3555. /* it may be a short read due to an object boundary */
  3556. osd_req_op_extent_osd_data_pages(req, 0,
  3557. pages, *plen, page_align, false, false);
  3558. dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
  3559. off, *plen, *plen, page_align);
  3560. rc = ceph_osdc_start_request(osdc, req, false);
  3561. if (!rc)
  3562. rc = ceph_osdc_wait_request(osdc, req);
  3563. ceph_osdc_put_request(req);
  3564. dout("readpages result %d\n", rc);
  3565. return rc;
  3566. }
  3567. EXPORT_SYMBOL(ceph_osdc_readpages);
  3568. /*
  3569. * do a synchronous write on N pages
  3570. */
  3571. int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
  3572. struct ceph_file_layout *layout,
  3573. struct ceph_snap_context *snapc,
  3574. u64 off, u64 len,
  3575. u32 truncate_seq, u64 truncate_size,
  3576. struct timespec *mtime,
  3577. struct page **pages, int num_pages)
  3578. {
  3579. struct ceph_osd_request *req;
  3580. int rc = 0;
  3581. int page_align = off & ~PAGE_MASK;
  3582. req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
  3583. CEPH_OSD_OP_WRITE,
  3584. CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
  3585. snapc, truncate_seq, truncate_size,
  3586. true);
  3587. if (IS_ERR(req))
  3588. return PTR_ERR(req);
  3589. /* it may be a short write due to an object boundary */
  3590. osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
  3591. false, false);
  3592. dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
  3593. req->r_mtime = *mtime;
  3594. rc = ceph_osdc_start_request(osdc, req, true);
  3595. if (!rc)
  3596. rc = ceph_osdc_wait_request(osdc, req);
  3597. ceph_osdc_put_request(req);
  3598. if (rc == 0)
  3599. rc = len;
  3600. dout("writepages result %d\n", rc);
  3601. return rc;
  3602. }
  3603. EXPORT_SYMBOL(ceph_osdc_writepages);
  3604. int ceph_osdc_setup(void)
  3605. {
  3606. size_t size = sizeof(struct ceph_osd_request) +
  3607. CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
  3608. BUG_ON(ceph_osd_request_cache);
  3609. ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
  3610. 0, 0, NULL);
  3611. return ceph_osd_request_cache ? 0 : -ENOMEM;
  3612. }
  3613. EXPORT_SYMBOL(ceph_osdc_setup);
  3614. void ceph_osdc_cleanup(void)
  3615. {
  3616. BUG_ON(!ceph_osd_request_cache);
  3617. kmem_cache_destroy(ceph_osd_request_cache);
  3618. ceph_osd_request_cache = NULL;
  3619. }
  3620. EXPORT_SYMBOL(ceph_osdc_cleanup);
  3621. /*
  3622. * handle incoming message
  3623. */
  3624. static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  3625. {
  3626. struct ceph_osd *osd = con->private;
  3627. struct ceph_osd_client *osdc = osd->o_osdc;
  3628. int type = le16_to_cpu(msg->hdr.type);
  3629. switch (type) {
  3630. case CEPH_MSG_OSD_MAP:
  3631. ceph_osdc_handle_map(osdc, msg);
  3632. break;
  3633. case CEPH_MSG_OSD_OPREPLY:
  3634. handle_reply(osd, msg);
  3635. break;
  3636. case CEPH_MSG_WATCH_NOTIFY:
  3637. handle_watch_notify(osdc, msg);
  3638. break;
  3639. default:
  3640. pr_err("received unknown message type %d %s\n", type,
  3641. ceph_msg_type_name(type));
  3642. }
  3643. ceph_msg_put(msg);
  3644. }
  3645. /*
  3646. * Lookup and return message for incoming reply. Don't try to do
  3647. * anything about a larger than preallocated data portion of the
  3648. * message at the moment - for now, just skip the message.
  3649. */
  3650. static struct ceph_msg *get_reply(struct ceph_connection *con,
  3651. struct ceph_msg_header *hdr,
  3652. int *skip)
  3653. {
  3654. struct ceph_osd *osd = con->private;
  3655. struct ceph_osd_client *osdc = osd->o_osdc;
  3656. struct ceph_msg *m = NULL;
  3657. struct ceph_osd_request *req;
  3658. int front_len = le32_to_cpu(hdr->front_len);
  3659. int data_len = le32_to_cpu(hdr->data_len);
  3660. u64 tid = le64_to_cpu(hdr->tid);
  3661. down_read(&osdc->lock);
  3662. if (!osd_registered(osd)) {
  3663. dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
  3664. *skip = 1;
  3665. goto out_unlock_osdc;
  3666. }
  3667. WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
  3668. mutex_lock(&osd->lock);
  3669. req = lookup_request(&osd->o_requests, tid);
  3670. if (!req) {
  3671. dout("%s osd%d tid %llu unknown, skipping\n", __func__,
  3672. osd->o_osd, tid);
  3673. *skip = 1;
  3674. goto out_unlock_session;
  3675. }
  3676. ceph_msg_revoke_incoming(req->r_reply);
  3677. if (front_len > req->r_reply->front_alloc_len) {
  3678. pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
  3679. __func__, osd->o_osd, req->r_tid, front_len,
  3680. req->r_reply->front_alloc_len);
  3681. m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
  3682. false);
  3683. if (!m)
  3684. goto out_unlock_session;
  3685. ceph_msg_put(req->r_reply);
  3686. req->r_reply = m;
  3687. }
  3688. if (data_len > req->r_reply->data_length) {
  3689. pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
  3690. __func__, osd->o_osd, req->r_tid, data_len,
  3691. req->r_reply->data_length);
  3692. m = NULL;
  3693. *skip = 1;
  3694. goto out_unlock_session;
  3695. }
  3696. m = ceph_msg_get(req->r_reply);
  3697. dout("get_reply tid %lld %p\n", tid, m);
  3698. out_unlock_session:
  3699. mutex_unlock(&osd->lock);
  3700. out_unlock_osdc:
  3701. up_read(&osdc->lock);
  3702. return m;
  3703. }
  3704. /*
  3705. * TODO: switch to a msg-owned pagelist
  3706. */
  3707. static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
  3708. {
  3709. struct ceph_msg *m;
  3710. int type = le16_to_cpu(hdr->type);
  3711. u32 front_len = le32_to_cpu(hdr->front_len);
  3712. u32 data_len = le32_to_cpu(hdr->data_len);
  3713. m = ceph_msg_new(type, front_len, GFP_NOIO, false);
  3714. if (!m)
  3715. return NULL;
  3716. if (data_len) {
  3717. struct page **pages;
  3718. struct ceph_osd_data osd_data;
  3719. pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
  3720. GFP_NOIO);
  3721. if (IS_ERR(pages)) {
  3722. ceph_msg_put(m);
  3723. return NULL;
  3724. }
  3725. ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
  3726. false);
  3727. ceph_osdc_msg_data_add(m, &osd_data);
  3728. }
  3729. return m;
  3730. }
  3731. static struct ceph_msg *alloc_msg(struct ceph_connection *con,
  3732. struct ceph_msg_header *hdr,
  3733. int *skip)
  3734. {
  3735. struct ceph_osd *osd = con->private;
  3736. int type = le16_to_cpu(hdr->type);
  3737. *skip = 0;
  3738. switch (type) {
  3739. case CEPH_MSG_OSD_MAP:
  3740. case CEPH_MSG_WATCH_NOTIFY:
  3741. return alloc_msg_with_page_vector(hdr);
  3742. case CEPH_MSG_OSD_OPREPLY:
  3743. return get_reply(con, hdr, skip);
  3744. default:
  3745. pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
  3746. osd->o_osd, type);
  3747. *skip = 1;
  3748. return NULL;
  3749. }
  3750. }
  3751. /*
  3752. * Wrappers to refcount containing ceph_osd struct
  3753. */
  3754. static struct ceph_connection *get_osd_con(struct ceph_connection *con)
  3755. {
  3756. struct ceph_osd *osd = con->private;
  3757. if (get_osd(osd))
  3758. return con;
  3759. return NULL;
  3760. }
  3761. static void put_osd_con(struct ceph_connection *con)
  3762. {
  3763. struct ceph_osd *osd = con->private;
  3764. put_osd(osd);
  3765. }
  3766. /*
  3767. * authentication
  3768. */
  3769. /*
  3770. * Note: returned pointer is the address of a structure that's
  3771. * managed separately. Caller must *not* attempt to free it.
  3772. */
  3773. static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
  3774. int *proto, int force_new)
  3775. {
  3776. struct ceph_osd *o = con->private;
  3777. struct ceph_osd_client *osdc = o->o_osdc;
  3778. struct ceph_auth_client *ac = osdc->client->monc.auth;
  3779. struct ceph_auth_handshake *auth = &o->o_auth;
  3780. if (force_new && auth->authorizer) {
  3781. ceph_auth_destroy_authorizer(auth->authorizer);
  3782. auth->authorizer = NULL;
  3783. }
  3784. if (!auth->authorizer) {
  3785. int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
  3786. auth);
  3787. if (ret)
  3788. return ERR_PTR(ret);
  3789. } else {
  3790. int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
  3791. auth);
  3792. if (ret)
  3793. return ERR_PTR(ret);
  3794. }
  3795. *proto = ac->protocol;
  3796. return auth;
  3797. }
  3798. static int verify_authorizer_reply(struct ceph_connection *con)
  3799. {
  3800. struct ceph_osd *o = con->private;
  3801. struct ceph_osd_client *osdc = o->o_osdc;
  3802. struct ceph_auth_client *ac = osdc->client->monc.auth;
  3803. return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
  3804. }
  3805. static int invalidate_authorizer(struct ceph_connection *con)
  3806. {
  3807. struct ceph_osd *o = con->private;
  3808. struct ceph_osd_client *osdc = o->o_osdc;
  3809. struct ceph_auth_client *ac = osdc->client->monc.auth;
  3810. ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
  3811. return ceph_monc_validate_auth(&osdc->client->monc);
  3812. }
  3813. static int osd_sign_message(struct ceph_msg *msg)
  3814. {
  3815. struct ceph_osd *o = msg->con->private;
  3816. struct ceph_auth_handshake *auth = &o->o_auth;
  3817. return ceph_auth_sign_message(auth, msg);
  3818. }
  3819. static int osd_check_message_signature(struct ceph_msg *msg)
  3820. {
  3821. struct ceph_osd *o = msg->con->private;
  3822. struct ceph_auth_handshake *auth = &o->o_auth;
  3823. return ceph_auth_check_message_signature(auth, msg);
  3824. }
  3825. static const struct ceph_connection_operations osd_con_ops = {
  3826. .get = get_osd_con,
  3827. .put = put_osd_con,
  3828. .dispatch = dispatch,
  3829. .get_authorizer = get_authorizer,
  3830. .verify_authorizer_reply = verify_authorizer_reply,
  3831. .invalidate_authorizer = invalidate_authorizer,
  3832. .alloc_msg = alloc_msg,
  3833. .sign_message = osd_sign_message,
  3834. .check_message_signature = osd_check_message_signature,
  3835. .fault = osd_fault,
  3836. };