addr.c 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/backing-dev.h>
  3. #include <linux/fs.h>
  4. #include <linux/mm.h>
  5. #include <linux/pagemap.h>
  6. #include <linux/writeback.h> /* generic_writepages */
  7. #include <linux/slab.h>
  8. #include <linux/pagevec.h>
  9. #include <linux/task_io_accounting_ops.h>
  10. #include "super.h"
  11. #include "mds_client.h"
  12. #include "cache.h"
  13. #include <linux/ceph/osd_client.h>
  14. /*
  15. * Ceph address space ops.
  16. *
  17. * There are a few funny things going on here.
  18. *
  19. * The page->private field is used to reference a struct
  20. * ceph_snap_context for _every_ dirty page. This indicates which
  21. * snapshot the page was logically dirtied in, and thus which snap
  22. * context needs to be associated with the osd write during writeback.
  23. *
  24. * Similarly, struct ceph_inode_info maintains a set of counters to
  25. * count dirty pages on the inode. In the absence of snapshots,
  26. * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
  27. *
  28. * When a snapshot is taken (that is, when the client receives
  29. * notification that a snapshot was taken), each inode with caps and
  30. * with dirty pages (dirty pages implies there is a cap) gets a new
  31. * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
  32. * order, new snaps go to the tail). The i_wrbuffer_ref_head count is
  33. * moved to capsnap->dirty. (Unless a sync write is currently in
  34. * progress. In that case, the capsnap is said to be "pending", new
  35. * writes cannot start, and the capsnap isn't "finalized" until the
  36. * write completes (or fails) and a final size/mtime for the inode for
  37. * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0.
  38. *
  39. * On writeback, we must submit writes to the osd IN SNAP ORDER. So,
  40. * we look for the first capsnap in i_cap_snaps and write out pages in
  41. * that snap context _only_. Then we move on to the next capsnap,
  42. * eventually reaching the "live" or "head" context (i.e., pages that
  43. * are not yet snapped) and are writing the most recently dirtied
  44. * pages.
  45. *
  46. * Invalidate and so forth must take care to ensure the dirty page
  47. * accounting is preserved.
  48. */
  49. #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
  50. #define CONGESTION_OFF_THRESH(congestion_kb) \
  51. (CONGESTION_ON_THRESH(congestion_kb) - \
  52. (CONGESTION_ON_THRESH(congestion_kb) >> 2))
  53. static inline struct ceph_snap_context *page_snap_context(struct page *page)
  54. {
  55. if (PagePrivate(page))
  56. return (void *)page->private;
  57. return NULL;
  58. }
  59. /*
  60. * Dirty a page. Optimistically adjust accounting, on the assumption
  61. * that we won't race with invalidate. If we do, readjust.
  62. */
  63. static int ceph_set_page_dirty(struct page *page)
  64. {
  65. struct address_space *mapping = page->mapping;
  66. struct inode *inode;
  67. struct ceph_inode_info *ci;
  68. struct ceph_snap_context *snapc;
  69. int ret;
  70. if (unlikely(!mapping))
  71. return !TestSetPageDirty(page);
  72. if (PageDirty(page)) {
  73. dout("%p set_page_dirty %p idx %lu -- already dirty\n",
  74. mapping->host, page, page->index);
  75. BUG_ON(!PagePrivate(page));
  76. return 0;
  77. }
  78. inode = mapping->host;
  79. ci = ceph_inode(inode);
  80. /* dirty the head */
  81. spin_lock(&ci->i_ceph_lock);
  82. BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
  83. if (__ceph_have_pending_cap_snap(ci)) {
  84. struct ceph_cap_snap *capsnap =
  85. list_last_entry(&ci->i_cap_snaps,
  86. struct ceph_cap_snap,
  87. ci_item);
  88. snapc = ceph_get_snap_context(capsnap->context);
  89. capsnap->dirty_pages++;
  90. } else {
  91. BUG_ON(!ci->i_head_snapc);
  92. snapc = ceph_get_snap_context(ci->i_head_snapc);
  93. ++ci->i_wrbuffer_ref_head;
  94. }
  95. if (ci->i_wrbuffer_ref == 0)
  96. ihold(inode);
  97. ++ci->i_wrbuffer_ref;
  98. dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
  99. "snapc %p seq %lld (%d snaps)\n",
  100. mapping->host, page, page->index,
  101. ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
  102. ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
  103. snapc, snapc->seq, snapc->num_snaps);
  104. spin_unlock(&ci->i_ceph_lock);
  105. /*
  106. * Reference snap context in page->private. Also set
  107. * PagePrivate so that we get invalidatepage callback.
  108. */
  109. BUG_ON(PagePrivate(page));
  110. page->private = (unsigned long)snapc;
  111. SetPagePrivate(page);
  112. ret = __set_page_dirty_nobuffers(page);
  113. WARN_ON(!PageLocked(page));
  114. WARN_ON(!page->mapping);
  115. return ret;
  116. }
  117. /*
  118. * If we are truncating the full page (i.e. offset == 0), adjust the
  119. * dirty page counters appropriately. Only called if there is private
  120. * data on the page.
  121. */
  122. static void ceph_invalidatepage(struct page *page, unsigned int offset,
  123. unsigned int length)
  124. {
  125. struct inode *inode;
  126. struct ceph_inode_info *ci;
  127. struct ceph_snap_context *snapc = page_snap_context(page);
  128. inode = page->mapping->host;
  129. ci = ceph_inode(inode);
  130. if (offset != 0 || length != PAGE_SIZE) {
  131. dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
  132. inode, page, page->index, offset, length);
  133. return;
  134. }
  135. ceph_invalidate_fscache_page(inode, page);
  136. if (!PagePrivate(page))
  137. return;
  138. /*
  139. * We can get non-dirty pages here due to races between
  140. * set_page_dirty and truncate_complete_page; just spit out a
  141. * warning, in case we end up with accounting problems later.
  142. */
  143. if (!PageDirty(page))
  144. pr_err("%p invalidatepage %p page not dirty\n", inode, page);
  145. ClearPageChecked(page);
  146. dout("%p invalidatepage %p idx %lu full dirty page\n",
  147. inode, page, page->index);
  148. ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
  149. ceph_put_snap_context(snapc);
  150. page->private = 0;
  151. ClearPagePrivate(page);
  152. }
  153. static int ceph_releasepage(struct page *page, gfp_t g)
  154. {
  155. dout("%p releasepage %p idx %lu\n", page->mapping->host,
  156. page, page->index);
  157. WARN_ON(PageDirty(page));
  158. /* Can we release the page from the cache? */
  159. if (!ceph_release_fscache_page(page, g))
  160. return 0;
  161. return !PagePrivate(page);
  162. }
  163. /*
  164. * read a single page, without unlocking it.
  165. */
  166. static int readpage_nounlock(struct file *filp, struct page *page)
  167. {
  168. struct inode *inode = file_inode(filp);
  169. struct ceph_inode_info *ci = ceph_inode(inode);
  170. struct ceph_osd_client *osdc =
  171. &ceph_inode_to_client(inode)->client->osdc;
  172. int err = 0;
  173. u64 off = page_offset(page);
  174. u64 len = PAGE_SIZE;
  175. if (off >= i_size_read(inode)) {
  176. zero_user_segment(page, 0, PAGE_SIZE);
  177. SetPageUptodate(page);
  178. return 0;
  179. }
  180. if (ci->i_inline_version != CEPH_INLINE_NONE) {
  181. /*
  182. * Uptodate inline data should have been added
  183. * into page cache while getting Fcr caps.
  184. */
  185. if (off == 0)
  186. return -EINVAL;
  187. zero_user_segment(page, 0, PAGE_SIZE);
  188. SetPageUptodate(page);
  189. return 0;
  190. }
  191. err = ceph_readpage_from_fscache(inode, page);
  192. if (err == 0)
  193. goto out;
  194. dout("readpage inode %p file %p page %p index %lu\n",
  195. inode, filp, page, page->index);
  196. err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
  197. off, &len,
  198. ci->i_truncate_seq, ci->i_truncate_size,
  199. &page, 1, 0);
  200. if (err == -ENOENT)
  201. err = 0;
  202. if (err < 0) {
  203. SetPageError(page);
  204. ceph_fscache_readpage_cancel(inode, page);
  205. goto out;
  206. }
  207. if (err < PAGE_SIZE)
  208. /* zero fill remainder of page */
  209. zero_user_segment(page, err, PAGE_SIZE);
  210. else
  211. flush_dcache_page(page);
  212. SetPageUptodate(page);
  213. ceph_readpage_to_fscache(inode, page);
  214. out:
  215. return err < 0 ? err : 0;
  216. }
  217. static int ceph_readpage(struct file *filp, struct page *page)
  218. {
  219. int r = readpage_nounlock(filp, page);
  220. unlock_page(page);
  221. return r;
  222. }
  223. /*
  224. * Finish an async read(ahead) op.
  225. */
  226. static void finish_read(struct ceph_osd_request *req)
  227. {
  228. struct inode *inode = req->r_inode;
  229. struct ceph_osd_data *osd_data;
  230. int rc = req->r_result <= 0 ? req->r_result : 0;
  231. int bytes = req->r_result >= 0 ? req->r_result : 0;
  232. int num_pages;
  233. int i;
  234. dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
  235. /* unlock all pages, zeroing any data we didn't read */
  236. osd_data = osd_req_op_extent_osd_data(req, 0);
  237. BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
  238. num_pages = calc_pages_for((u64)osd_data->alignment,
  239. (u64)osd_data->length);
  240. for (i = 0; i < num_pages; i++) {
  241. struct page *page = osd_data->pages[i];
  242. if (rc < 0 && rc != -ENOENT) {
  243. ceph_fscache_readpage_cancel(inode, page);
  244. goto unlock;
  245. }
  246. if (bytes < (int)PAGE_SIZE) {
  247. /* zero (remainder of) page */
  248. int s = bytes < 0 ? 0 : bytes;
  249. zero_user_segment(page, s, PAGE_SIZE);
  250. }
  251. dout("finish_read %p uptodate %p idx %lu\n", inode, page,
  252. page->index);
  253. flush_dcache_page(page);
  254. SetPageUptodate(page);
  255. ceph_readpage_to_fscache(inode, page);
  256. unlock:
  257. unlock_page(page);
  258. put_page(page);
  259. bytes -= PAGE_SIZE;
  260. }
  261. kfree(osd_data->pages);
  262. }
  263. static void ceph_unlock_page_vector(struct page **pages, int num_pages)
  264. {
  265. int i;
  266. for (i = 0; i < num_pages; i++)
  267. unlock_page(pages[i]);
  268. }
  269. /*
  270. * start an async read(ahead) operation. return nr_pages we submitted
  271. * a read for on success, or negative error code.
  272. */
  273. static int start_read(struct inode *inode, struct list_head *page_list, int max)
  274. {
  275. struct ceph_osd_client *osdc =
  276. &ceph_inode_to_client(inode)->client->osdc;
  277. struct ceph_inode_info *ci = ceph_inode(inode);
  278. struct page *page = list_entry(page_list->prev, struct page, lru);
  279. struct ceph_vino vino;
  280. struct ceph_osd_request *req;
  281. u64 off;
  282. u64 len;
  283. int i;
  284. struct page **pages;
  285. pgoff_t next_index;
  286. int nr_pages = 0;
  287. int ret;
  288. off = (u64) page_offset(page);
  289. /* count pages */
  290. next_index = page->index;
  291. list_for_each_entry_reverse(page, page_list, lru) {
  292. if (page->index != next_index)
  293. break;
  294. nr_pages++;
  295. next_index++;
  296. if (max && nr_pages == max)
  297. break;
  298. }
  299. len = nr_pages << PAGE_SHIFT;
  300. dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
  301. off, len);
  302. vino = ceph_vino(inode);
  303. req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
  304. 0, 1, CEPH_OSD_OP_READ,
  305. CEPH_OSD_FLAG_READ, NULL,
  306. ci->i_truncate_seq, ci->i_truncate_size,
  307. false);
  308. if (IS_ERR(req))
  309. return PTR_ERR(req);
  310. /* build page vector */
  311. nr_pages = calc_pages_for(0, len);
  312. pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
  313. ret = -ENOMEM;
  314. if (!pages)
  315. goto out;
  316. for (i = 0; i < nr_pages; ++i) {
  317. page = list_entry(page_list->prev, struct page, lru);
  318. BUG_ON(PageLocked(page));
  319. list_del(&page->lru);
  320. dout("start_read %p adding %p idx %lu\n", inode, page,
  321. page->index);
  322. if (add_to_page_cache_lru(page, &inode->i_data, page->index,
  323. GFP_KERNEL)) {
  324. ceph_fscache_uncache_page(inode, page);
  325. put_page(page);
  326. dout("start_read %p add_to_page_cache failed %p\n",
  327. inode, page);
  328. nr_pages = i;
  329. goto out_pages;
  330. }
  331. pages[i] = page;
  332. }
  333. osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
  334. req->r_callback = finish_read;
  335. req->r_inode = inode;
  336. dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
  337. ret = ceph_osdc_start_request(osdc, req, false);
  338. if (ret < 0)
  339. goto out_pages;
  340. ceph_osdc_put_request(req);
  341. return nr_pages;
  342. out_pages:
  343. ceph_unlock_page_vector(pages, nr_pages);
  344. ceph_release_page_vector(pages, nr_pages);
  345. out:
  346. ceph_osdc_put_request(req);
  347. return ret;
  348. }
  349. /*
  350. * Read multiple pages. Leave pages we don't read + unlock in page_list;
  351. * the caller (VM) cleans them up.
  352. */
  353. static int ceph_readpages(struct file *file, struct address_space *mapping,
  354. struct list_head *page_list, unsigned nr_pages)
  355. {
  356. struct inode *inode = file_inode(file);
  357. struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
  358. int rc = 0;
  359. int max = 0;
  360. if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
  361. return -EINVAL;
  362. rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
  363. &nr_pages);
  364. if (rc == 0)
  365. goto out;
  366. if (fsc->mount_options->rsize >= PAGE_SIZE)
  367. max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
  368. >> PAGE_SHIFT;
  369. dout("readpages %p file %p nr_pages %d max %d\n", inode,
  370. file, nr_pages,
  371. max);
  372. while (!list_empty(page_list)) {
  373. rc = start_read(inode, page_list, max);
  374. if (rc < 0)
  375. goto out;
  376. BUG_ON(rc == 0);
  377. }
  378. out:
  379. ceph_fscache_readpages_cancel(inode, page_list);
  380. dout("readpages %p file %p ret %d\n", inode, file, rc);
  381. return rc;
  382. }
  383. /*
  384. * Get ref for the oldest snapc for an inode with dirty data... that is, the
  385. * only snap context we are allowed to write back.
  386. */
  387. static struct ceph_snap_context *get_oldest_context(struct inode *inode,
  388. loff_t *snap_size)
  389. {
  390. struct ceph_inode_info *ci = ceph_inode(inode);
  391. struct ceph_snap_context *snapc = NULL;
  392. struct ceph_cap_snap *capsnap = NULL;
  393. spin_lock(&ci->i_ceph_lock);
  394. list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
  395. dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
  396. capsnap->context, capsnap->dirty_pages);
  397. if (capsnap->dirty_pages) {
  398. snapc = ceph_get_snap_context(capsnap->context);
  399. if (snap_size)
  400. *snap_size = capsnap->size;
  401. break;
  402. }
  403. }
  404. if (!snapc && ci->i_wrbuffer_ref_head) {
  405. snapc = ceph_get_snap_context(ci->i_head_snapc);
  406. dout(" head snapc %p has %d dirty pages\n",
  407. snapc, ci->i_wrbuffer_ref_head);
  408. }
  409. spin_unlock(&ci->i_ceph_lock);
  410. return snapc;
  411. }
  412. /*
  413. * Write a single page, but leave the page locked.
  414. *
  415. * If we get a write error, set the page error bit, but still adjust the
  416. * dirty page accounting (i.e., page is no longer dirty).
  417. */
  418. static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
  419. {
  420. struct inode *inode;
  421. struct ceph_inode_info *ci;
  422. struct ceph_fs_client *fsc;
  423. struct ceph_osd_client *osdc;
  424. struct ceph_snap_context *snapc, *oldest;
  425. loff_t page_off = page_offset(page);
  426. loff_t snap_size = -1;
  427. long writeback_stat;
  428. u64 truncate_size;
  429. u32 truncate_seq;
  430. int err = 0, len = PAGE_SIZE;
  431. dout("writepage %p idx %lu\n", page, page->index);
  432. if (!page->mapping || !page->mapping->host) {
  433. dout("writepage %p - no mapping\n", page);
  434. return -EFAULT;
  435. }
  436. inode = page->mapping->host;
  437. ci = ceph_inode(inode);
  438. fsc = ceph_inode_to_client(inode);
  439. osdc = &fsc->client->osdc;
  440. /* verify this is a writeable snap context */
  441. snapc = page_snap_context(page);
  442. if (snapc == NULL) {
  443. dout("writepage %p page %p not dirty?\n", inode, page);
  444. goto out;
  445. }
  446. oldest = get_oldest_context(inode, &snap_size);
  447. if (snapc->seq > oldest->seq) {
  448. dout("writepage %p page %p snapc %p not writeable - noop\n",
  449. inode, page, snapc);
  450. /* we should only noop if called by kswapd */
  451. WARN_ON((current->flags & PF_MEMALLOC) == 0);
  452. ceph_put_snap_context(oldest);
  453. goto out;
  454. }
  455. ceph_put_snap_context(oldest);
  456. spin_lock(&ci->i_ceph_lock);
  457. truncate_seq = ci->i_truncate_seq;
  458. truncate_size = ci->i_truncate_size;
  459. if (snap_size == -1)
  460. snap_size = i_size_read(inode);
  461. spin_unlock(&ci->i_ceph_lock);
  462. /* is this a partial page at end of file? */
  463. if (page_off >= snap_size) {
  464. dout("%p page eof %llu\n", page, snap_size);
  465. goto out;
  466. }
  467. if (snap_size < page_off + len)
  468. len = snap_size - page_off;
  469. dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
  470. inode, page, page->index, page_off, len, snapc);
  471. writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
  472. if (writeback_stat >
  473. CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
  474. set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
  475. set_page_writeback(page);
  476. err = ceph_osdc_writepages(osdc, ceph_vino(inode),
  477. &ci->i_layout, snapc,
  478. page_off, len,
  479. truncate_seq, truncate_size,
  480. &inode->i_mtime, &page, 1);
  481. if (err < 0) {
  482. struct writeback_control tmp_wbc;
  483. if (!wbc)
  484. wbc = &tmp_wbc;
  485. if (err == -ERESTARTSYS) {
  486. /* killed by SIGKILL */
  487. dout("writepage interrupted page %p\n", page);
  488. redirty_page_for_writepage(wbc, page);
  489. end_page_writeback(page);
  490. goto out;
  491. }
  492. dout("writepage setting page/mapping error %d %p\n",
  493. err, page);
  494. SetPageError(page);
  495. mapping_set_error(&inode->i_data, err);
  496. wbc->pages_skipped++;
  497. } else {
  498. dout("writepage cleaned page %p\n", page);
  499. err = 0; /* vfs expects us to return 0 */
  500. }
  501. page->private = 0;
  502. ClearPagePrivate(page);
  503. end_page_writeback(page);
  504. ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
  505. ceph_put_snap_context(snapc); /* page's reference */
  506. out:
  507. return err;
  508. }
  509. static int ceph_writepage(struct page *page, struct writeback_control *wbc)
  510. {
  511. int err;
  512. struct inode *inode = page->mapping->host;
  513. BUG_ON(!inode);
  514. ihold(inode);
  515. err = writepage_nounlock(page, wbc);
  516. if (err == -ERESTARTSYS) {
  517. /* direct memory reclaimer was killed by SIGKILL. return 0
  518. * to prevent caller from setting mapping/page error */
  519. err = 0;
  520. }
  521. unlock_page(page);
  522. iput(inode);
  523. return err;
  524. }
  525. /*
  526. * lame release_pages helper. release_pages() isn't exported to
  527. * modules.
  528. */
  529. static void ceph_release_pages(struct page **pages, int num)
  530. {
  531. struct pagevec pvec;
  532. int i;
  533. pagevec_init(&pvec, 0);
  534. for (i = 0; i < num; i++) {
  535. if (pagevec_add(&pvec, pages[i]) == 0)
  536. pagevec_release(&pvec);
  537. }
  538. pagevec_release(&pvec);
  539. }
  540. /*
  541. * async writeback completion handler.
  542. *
  543. * If we get an error, set the mapping error bit, but not the individual
  544. * page error bits.
  545. */
  546. static void writepages_finish(struct ceph_osd_request *req)
  547. {
  548. struct inode *inode = req->r_inode;
  549. struct ceph_inode_info *ci = ceph_inode(inode);
  550. struct ceph_osd_data *osd_data;
  551. struct page *page;
  552. int num_pages, total_pages = 0;
  553. int i, j;
  554. int rc = req->r_result;
  555. struct ceph_snap_context *snapc = req->r_snapc;
  556. struct address_space *mapping = inode->i_mapping;
  557. struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
  558. bool remove_page;
  559. dout("writepages_finish %p rc %d\n", inode, rc);
  560. if (rc < 0)
  561. mapping_set_error(mapping, rc);
  562. /*
  563. * We lost the cache cap, need to truncate the page before
  564. * it is unlocked, otherwise we'd truncate it later in the
  565. * page truncation thread, possibly losing some data that
  566. * raced its way in
  567. */
  568. remove_page = !(ceph_caps_issued(ci) &
  569. (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
  570. /* clean all pages */
  571. for (i = 0; i < req->r_num_ops; i++) {
  572. if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
  573. break;
  574. osd_data = osd_req_op_extent_osd_data(req, i);
  575. BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
  576. num_pages = calc_pages_for((u64)osd_data->alignment,
  577. (u64)osd_data->length);
  578. total_pages += num_pages;
  579. for (j = 0; j < num_pages; j++) {
  580. page = osd_data->pages[j];
  581. BUG_ON(!page);
  582. WARN_ON(!PageUptodate(page));
  583. if (atomic_long_dec_return(&fsc->writeback_count) <
  584. CONGESTION_OFF_THRESH(
  585. fsc->mount_options->congestion_kb))
  586. clear_bdi_congested(&fsc->backing_dev_info,
  587. BLK_RW_ASYNC);
  588. if (rc < 0)
  589. SetPageError(page);
  590. ceph_put_snap_context(page_snap_context(page));
  591. page->private = 0;
  592. ClearPagePrivate(page);
  593. dout("unlocking %p\n", page);
  594. end_page_writeback(page);
  595. if (remove_page)
  596. generic_error_remove_page(inode->i_mapping,
  597. page);
  598. unlock_page(page);
  599. }
  600. dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
  601. inode, osd_data->length, rc >= 0 ? num_pages : 0);
  602. ceph_release_pages(osd_data->pages, num_pages);
  603. }
  604. ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
  605. osd_data = osd_req_op_extent_osd_data(req, 0);
  606. if (osd_data->pages_from_pool)
  607. mempool_free(osd_data->pages,
  608. ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
  609. else
  610. kfree(osd_data->pages);
  611. ceph_osdc_put_request(req);
  612. }
  613. /*
  614. * initiate async writeback
  615. */
  616. static int ceph_writepages_start(struct address_space *mapping,
  617. struct writeback_control *wbc)
  618. {
  619. struct inode *inode = mapping->host;
  620. struct ceph_inode_info *ci = ceph_inode(inode);
  621. struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
  622. struct ceph_vino vino = ceph_vino(inode);
  623. pgoff_t index, start, end;
  624. int range_whole = 0;
  625. int should_loop = 1;
  626. pgoff_t max_pages = 0, max_pages_ever = 0;
  627. struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
  628. struct pagevec pvec;
  629. int done = 0;
  630. int rc = 0;
  631. unsigned wsize = 1 << inode->i_blkbits;
  632. struct ceph_osd_request *req = NULL;
  633. int do_sync = 0;
  634. loff_t snap_size, i_size;
  635. u64 truncate_size;
  636. u32 truncate_seq;
  637. /*
  638. * Include a 'sync' in the OSD request if this is a data
  639. * integrity write (e.g., O_SYNC write or fsync()), or if our
  640. * cap is being revoked.
  641. */
  642. if ((wbc->sync_mode == WB_SYNC_ALL) ||
  643. ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
  644. do_sync = 1;
  645. dout("writepages_start %p dosync=%d (mode=%s)\n",
  646. inode, do_sync,
  647. wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
  648. (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
  649. if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
  650. if (ci->i_wrbuffer_ref > 0) {
  651. pr_warn_ratelimited(
  652. "writepage_start %p %lld forced umount\n",
  653. inode, ceph_ino(inode));
  654. }
  655. mapping_set_error(mapping, -EIO);
  656. return -EIO; /* we're in a forced umount, don't write! */
  657. }
  658. if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
  659. wsize = fsc->mount_options->wsize;
  660. if (wsize < PAGE_SIZE)
  661. wsize = PAGE_SIZE;
  662. max_pages_ever = wsize >> PAGE_SHIFT;
  663. pagevec_init(&pvec, 0);
  664. /* where to start/end? */
  665. if (wbc->range_cyclic) {
  666. start = mapping->writeback_index; /* Start from prev offset */
  667. end = -1;
  668. dout(" cyclic, start at %lu\n", start);
  669. } else {
  670. start = wbc->range_start >> PAGE_SHIFT;
  671. end = wbc->range_end >> PAGE_SHIFT;
  672. if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
  673. range_whole = 1;
  674. should_loop = 0;
  675. dout(" not cyclic, %lu to %lu\n", start, end);
  676. }
  677. index = start;
  678. retry:
  679. /* find oldest snap context with dirty data */
  680. ceph_put_snap_context(snapc);
  681. snap_size = -1;
  682. snapc = get_oldest_context(inode, &snap_size);
  683. if (!snapc) {
  684. /* hmm, why does writepages get called when there
  685. is no dirty data? */
  686. dout(" no snap context with dirty data?\n");
  687. goto out;
  688. }
  689. dout(" oldest snapc is %p seq %lld (%d snaps)\n",
  690. snapc, snapc->seq, snapc->num_snaps);
  691. spin_lock(&ci->i_ceph_lock);
  692. truncate_seq = ci->i_truncate_seq;
  693. truncate_size = ci->i_truncate_size;
  694. i_size = i_size_read(inode);
  695. spin_unlock(&ci->i_ceph_lock);
  696. if (last_snapc && snapc != last_snapc) {
  697. /* if we switched to a newer snapc, restart our scan at the
  698. * start of the original file range. */
  699. dout(" snapc differs from last pass, restarting at %lu\n",
  700. index);
  701. index = start;
  702. }
  703. last_snapc = snapc;
  704. while (!done && index <= end) {
  705. unsigned i;
  706. int first;
  707. pgoff_t strip_unit_end = 0;
  708. int num_ops = 0, op_idx;
  709. int pvec_pages, locked_pages = 0;
  710. struct page **pages = NULL, **data_pages;
  711. mempool_t *pool = NULL; /* Becomes non-null if mempool used */
  712. struct page *page;
  713. int want;
  714. u64 offset = 0, len = 0;
  715. max_pages = max_pages_ever;
  716. get_more_pages:
  717. first = -1;
  718. want = min(end - index,
  719. min((pgoff_t)PAGEVEC_SIZE,
  720. max_pages - (pgoff_t)locked_pages) - 1)
  721. + 1;
  722. pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
  723. PAGECACHE_TAG_DIRTY,
  724. want);
  725. dout("pagevec_lookup_tag got %d\n", pvec_pages);
  726. if (!pvec_pages && !locked_pages)
  727. break;
  728. for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
  729. page = pvec.pages[i];
  730. dout("? %p idx %lu\n", page, page->index);
  731. if (locked_pages == 0)
  732. lock_page(page); /* first page */
  733. else if (!trylock_page(page))
  734. break;
  735. /* only dirty pages, or our accounting breaks */
  736. if (unlikely(!PageDirty(page)) ||
  737. unlikely(page->mapping != mapping)) {
  738. dout("!dirty or !mapping %p\n", page);
  739. unlock_page(page);
  740. break;
  741. }
  742. if (!wbc->range_cyclic && page->index > end) {
  743. dout("end of range %p\n", page);
  744. done = 1;
  745. unlock_page(page);
  746. break;
  747. }
  748. if (strip_unit_end && (page->index > strip_unit_end)) {
  749. dout("end of strip unit %p\n", page);
  750. unlock_page(page);
  751. break;
  752. }
  753. if (wbc->sync_mode != WB_SYNC_NONE) {
  754. dout("waiting on writeback %p\n", page);
  755. wait_on_page_writeback(page);
  756. }
  757. if (page_offset(page) >=
  758. (snap_size == -1 ? i_size : snap_size)) {
  759. dout("%p page eof %llu\n", page,
  760. (snap_size == -1 ? i_size : snap_size));
  761. done = 1;
  762. unlock_page(page);
  763. break;
  764. }
  765. if (PageWriteback(page)) {
  766. dout("%p under writeback\n", page);
  767. unlock_page(page);
  768. break;
  769. }
  770. /* only if matching snap context */
  771. pgsnapc = page_snap_context(page);
  772. if (pgsnapc->seq > snapc->seq) {
  773. dout("page snapc %p %lld > oldest %p %lld\n",
  774. pgsnapc, pgsnapc->seq, snapc, snapc->seq);
  775. unlock_page(page);
  776. if (!locked_pages)
  777. continue; /* keep looking for snap */
  778. break;
  779. }
  780. if (!clear_page_dirty_for_io(page)) {
  781. dout("%p !clear_page_dirty_for_io\n", page);
  782. unlock_page(page);
  783. break;
  784. }
  785. /*
  786. * We have something to write. If this is
  787. * the first locked page this time through,
  788. * calculate max possinle write size and
  789. * allocate a page array
  790. */
  791. if (locked_pages == 0) {
  792. u64 objnum;
  793. u64 objoff;
  794. /* prepare async write request */
  795. offset = (u64)page_offset(page);
  796. len = wsize;
  797. rc = ceph_calc_file_object_mapping(&ci->i_layout,
  798. offset, len,
  799. &objnum, &objoff,
  800. &len);
  801. if (rc < 0) {
  802. unlock_page(page);
  803. break;
  804. }
  805. num_ops = 1 + do_sync;
  806. strip_unit_end = page->index +
  807. ((len - 1) >> PAGE_SHIFT);
  808. BUG_ON(pages);
  809. max_pages = calc_pages_for(0, (u64)len);
  810. pages = kmalloc(max_pages * sizeof (*pages),
  811. GFP_NOFS);
  812. if (!pages) {
  813. pool = fsc->wb_pagevec_pool;
  814. pages = mempool_alloc(pool, GFP_NOFS);
  815. BUG_ON(!pages);
  816. }
  817. len = 0;
  818. } else if (page->index !=
  819. (offset + len) >> PAGE_SHIFT) {
  820. if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS :
  821. CEPH_OSD_MAX_OPS)) {
  822. redirty_page_for_writepage(wbc, page);
  823. unlock_page(page);
  824. break;
  825. }
  826. num_ops++;
  827. offset = (u64)page_offset(page);
  828. len = 0;
  829. }
  830. /* note position of first page in pvec */
  831. if (first < 0)
  832. first = i;
  833. dout("%p will write page %p idx %lu\n",
  834. inode, page, page->index);
  835. if (atomic_long_inc_return(&fsc->writeback_count) >
  836. CONGESTION_ON_THRESH(
  837. fsc->mount_options->congestion_kb)) {
  838. set_bdi_congested(&fsc->backing_dev_info,
  839. BLK_RW_ASYNC);
  840. }
  841. pages[locked_pages] = page;
  842. locked_pages++;
  843. len += PAGE_SIZE;
  844. }
  845. /* did we get anything? */
  846. if (!locked_pages)
  847. goto release_pvec_pages;
  848. if (i) {
  849. int j;
  850. BUG_ON(!locked_pages || first < 0);
  851. if (pvec_pages && i == pvec_pages &&
  852. locked_pages < max_pages) {
  853. dout("reached end pvec, trying for more\n");
  854. pagevec_reinit(&pvec);
  855. goto get_more_pages;
  856. }
  857. /* shift unused pages over in the pvec... we
  858. * will need to release them below. */
  859. for (j = i; j < pvec_pages; j++) {
  860. dout(" pvec leftover page %p\n", pvec.pages[j]);
  861. pvec.pages[j-i+first] = pvec.pages[j];
  862. }
  863. pvec.nr -= i-first;
  864. }
  865. new_request:
  866. offset = page_offset(pages[0]);
  867. len = wsize;
  868. req = ceph_osdc_new_request(&fsc->client->osdc,
  869. &ci->i_layout, vino,
  870. offset, &len, 0, num_ops,
  871. CEPH_OSD_OP_WRITE,
  872. CEPH_OSD_FLAG_WRITE |
  873. CEPH_OSD_FLAG_ONDISK,
  874. snapc, truncate_seq,
  875. truncate_size, false);
  876. if (IS_ERR(req)) {
  877. req = ceph_osdc_new_request(&fsc->client->osdc,
  878. &ci->i_layout, vino,
  879. offset, &len, 0,
  880. min(num_ops,
  881. CEPH_OSD_SLAB_OPS),
  882. CEPH_OSD_OP_WRITE,
  883. CEPH_OSD_FLAG_WRITE |
  884. CEPH_OSD_FLAG_ONDISK,
  885. snapc, truncate_seq,
  886. truncate_size, true);
  887. BUG_ON(IS_ERR(req));
  888. }
  889. BUG_ON(len < page_offset(pages[locked_pages - 1]) +
  890. PAGE_SIZE - offset);
  891. req->r_callback = writepages_finish;
  892. req->r_inode = inode;
  893. /* Format the osd request message and submit the write */
  894. len = 0;
  895. data_pages = pages;
  896. op_idx = 0;
  897. for (i = 0; i < locked_pages; i++) {
  898. u64 cur_offset = page_offset(pages[i]);
  899. if (offset + len != cur_offset) {
  900. if (op_idx + do_sync + 1 == req->r_num_ops)
  901. break;
  902. osd_req_op_extent_dup_last(req, op_idx,
  903. cur_offset - offset);
  904. dout("writepages got pages at %llu~%llu\n",
  905. offset, len);
  906. osd_req_op_extent_osd_data_pages(req, op_idx,
  907. data_pages, len, 0,
  908. !!pool, false);
  909. osd_req_op_extent_update(req, op_idx, len);
  910. len = 0;
  911. offset = cur_offset;
  912. data_pages = pages + i;
  913. op_idx++;
  914. }
  915. set_page_writeback(pages[i]);
  916. len += PAGE_SIZE;
  917. }
  918. if (snap_size != -1) {
  919. len = min(len, snap_size - offset);
  920. } else if (i == locked_pages) {
  921. /* writepages_finish() clears writeback pages
  922. * according to the data length, so make sure
  923. * data length covers all locked pages */
  924. u64 min_len = len + 1 - PAGE_SIZE;
  925. len = min(len, (u64)i_size_read(inode) - offset);
  926. len = max(len, min_len);
  927. }
  928. dout("writepages got pages at %llu~%llu\n", offset, len);
  929. osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
  930. 0, !!pool, false);
  931. osd_req_op_extent_update(req, op_idx, len);
  932. if (do_sync) {
  933. op_idx++;
  934. osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
  935. }
  936. BUG_ON(op_idx + 1 != req->r_num_ops);
  937. pool = NULL;
  938. if (i < locked_pages) {
  939. BUG_ON(num_ops <= req->r_num_ops);
  940. num_ops -= req->r_num_ops;
  941. num_ops += do_sync;
  942. locked_pages -= i;
  943. /* allocate new pages array for next request */
  944. data_pages = pages;
  945. pages = kmalloc(locked_pages * sizeof (*pages),
  946. GFP_NOFS);
  947. if (!pages) {
  948. pool = fsc->wb_pagevec_pool;
  949. pages = mempool_alloc(pool, GFP_NOFS);
  950. BUG_ON(!pages);
  951. }
  952. memcpy(pages, data_pages + i,
  953. locked_pages * sizeof(*pages));
  954. memset(data_pages + i, 0,
  955. locked_pages * sizeof(*pages));
  956. } else {
  957. BUG_ON(num_ops != req->r_num_ops);
  958. index = pages[i - 1]->index + 1;
  959. /* request message now owns the pages array */
  960. pages = NULL;
  961. }
  962. req->r_mtime = inode->i_mtime;
  963. rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
  964. BUG_ON(rc);
  965. req = NULL;
  966. wbc->nr_to_write -= i;
  967. if (pages)
  968. goto new_request;
  969. if (wbc->nr_to_write <= 0)
  970. done = 1;
  971. release_pvec_pages:
  972. dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
  973. pvec.nr ? pvec.pages[0] : NULL);
  974. pagevec_release(&pvec);
  975. if (locked_pages && !done)
  976. goto retry;
  977. }
  978. if (should_loop && !done) {
  979. /* more to do; loop back to beginning of file */
  980. dout("writepages looping back to beginning of file\n");
  981. should_loop = 0;
  982. index = 0;
  983. goto retry;
  984. }
  985. if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
  986. mapping->writeback_index = index;
  987. out:
  988. ceph_osdc_put_request(req);
  989. ceph_put_snap_context(snapc);
  990. dout("writepages done, rc = %d\n", rc);
  991. return rc;
  992. }
  993. /*
  994. * See if a given @snapc is either writeable, or already written.
  995. */
  996. static int context_is_writeable_or_written(struct inode *inode,
  997. struct ceph_snap_context *snapc)
  998. {
  999. struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
  1000. int ret = !oldest || snapc->seq <= oldest->seq;
  1001. ceph_put_snap_context(oldest);
  1002. return ret;
  1003. }
  1004. /*
  1005. * We are only allowed to write into/dirty the page if the page is
  1006. * clean, or already dirty within the same snap context.
  1007. *
  1008. * called with page locked.
  1009. * return success with page locked,
  1010. * or any failure (incl -EAGAIN) with page unlocked.
  1011. */
  1012. static int ceph_update_writeable_page(struct file *file,
  1013. loff_t pos, unsigned len,
  1014. struct page *page)
  1015. {
  1016. struct inode *inode = file_inode(file);
  1017. struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
  1018. struct ceph_inode_info *ci = ceph_inode(inode);
  1019. loff_t page_off = pos & PAGE_MASK;
  1020. int pos_in_page = pos & ~PAGE_MASK;
  1021. int end_in_page = pos_in_page + len;
  1022. loff_t i_size;
  1023. int r;
  1024. struct ceph_snap_context *snapc, *oldest;
  1025. if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
  1026. dout(" page %p forced umount\n", page);
  1027. unlock_page(page);
  1028. return -EIO;
  1029. }
  1030. retry_locked:
  1031. /* writepages currently holds page lock, but if we change that later, */
  1032. wait_on_page_writeback(page);
  1033. snapc = page_snap_context(page);
  1034. if (snapc && snapc != ci->i_head_snapc) {
  1035. /*
  1036. * this page is already dirty in another (older) snap
  1037. * context! is it writeable now?
  1038. */
  1039. oldest = get_oldest_context(inode, NULL);
  1040. if (snapc->seq > oldest->seq) {
  1041. ceph_put_snap_context(oldest);
  1042. dout(" page %p snapc %p not current or oldest\n",
  1043. page, snapc);
  1044. /*
  1045. * queue for writeback, and wait for snapc to
  1046. * be writeable or written
  1047. */
  1048. snapc = ceph_get_snap_context(snapc);
  1049. unlock_page(page);
  1050. ceph_queue_writeback(inode);
  1051. r = wait_event_killable(ci->i_cap_wq,
  1052. context_is_writeable_or_written(inode, snapc));
  1053. ceph_put_snap_context(snapc);
  1054. if (r == -ERESTARTSYS)
  1055. return r;
  1056. return -EAGAIN;
  1057. }
  1058. ceph_put_snap_context(oldest);
  1059. /* yay, writeable, do it now (without dropping page lock) */
  1060. dout(" page %p snapc %p not current, but oldest\n",
  1061. page, snapc);
  1062. if (!clear_page_dirty_for_io(page))
  1063. goto retry_locked;
  1064. r = writepage_nounlock(page, NULL);
  1065. if (r < 0)
  1066. goto fail_nosnap;
  1067. goto retry_locked;
  1068. }
  1069. if (PageUptodate(page)) {
  1070. dout(" page %p already uptodate\n", page);
  1071. return 0;
  1072. }
  1073. /* full page? */
  1074. if (pos_in_page == 0 && len == PAGE_SIZE)
  1075. return 0;
  1076. /* past end of file? */
  1077. i_size = i_size_read(inode);
  1078. if (page_off >= i_size ||
  1079. (pos_in_page == 0 && (pos+len) >= i_size &&
  1080. end_in_page - pos_in_page != PAGE_SIZE)) {
  1081. dout(" zeroing %p 0 - %d and %d - %d\n",
  1082. page, pos_in_page, end_in_page, (int)PAGE_SIZE);
  1083. zero_user_segments(page,
  1084. 0, pos_in_page,
  1085. end_in_page, PAGE_SIZE);
  1086. return 0;
  1087. }
  1088. /* we need to read it. */
  1089. r = readpage_nounlock(file, page);
  1090. if (r < 0)
  1091. goto fail_nosnap;
  1092. goto retry_locked;
  1093. fail_nosnap:
  1094. unlock_page(page);
  1095. return r;
  1096. }
  1097. /*
  1098. * We are only allowed to write into/dirty the page if the page is
  1099. * clean, or already dirty within the same snap context.
  1100. */
  1101. static int ceph_write_begin(struct file *file, struct address_space *mapping,
  1102. loff_t pos, unsigned len, unsigned flags,
  1103. struct page **pagep, void **fsdata)
  1104. {
  1105. struct inode *inode = file_inode(file);
  1106. struct page *page;
  1107. pgoff_t index = pos >> PAGE_SHIFT;
  1108. int r;
  1109. do {
  1110. /* get a page */
  1111. page = grab_cache_page_write_begin(mapping, index, 0);
  1112. if (!page)
  1113. return -ENOMEM;
  1114. dout("write_begin file %p inode %p page %p %d~%d\n", file,
  1115. inode, page, (int)pos, (int)len);
  1116. r = ceph_update_writeable_page(file, pos, len, page);
  1117. if (r < 0)
  1118. put_page(page);
  1119. else
  1120. *pagep = page;
  1121. } while (r == -EAGAIN);
  1122. return r;
  1123. }
  1124. /*
  1125. * we don't do anything in here that simple_write_end doesn't do
  1126. * except adjust dirty page accounting
  1127. */
  1128. static int ceph_write_end(struct file *file, struct address_space *mapping,
  1129. loff_t pos, unsigned len, unsigned copied,
  1130. struct page *page, void *fsdata)
  1131. {
  1132. struct inode *inode = file_inode(file);
  1133. unsigned from = pos & (PAGE_SIZE - 1);
  1134. int check_cap = 0;
  1135. dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
  1136. inode, page, (int)pos, (int)copied, (int)len);
  1137. /* zero the stale part of the page if we did a short copy */
  1138. if (copied < len)
  1139. zero_user_segment(page, from+copied, len);
  1140. /* did file size increase? */
  1141. if (pos+copied > i_size_read(inode))
  1142. check_cap = ceph_inode_set_size(inode, pos+copied);
  1143. if (!PageUptodate(page))
  1144. SetPageUptodate(page);
  1145. set_page_dirty(page);
  1146. unlock_page(page);
  1147. put_page(page);
  1148. if (check_cap)
  1149. ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
  1150. return copied;
  1151. }
  1152. /*
  1153. * we set .direct_IO to indicate direct io is supported, but since we
  1154. * intercept O_DIRECT reads and writes early, this function should
  1155. * never get called.
  1156. */
  1157. static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
  1158. {
  1159. WARN_ON(1);
  1160. return -EINVAL;
  1161. }
  1162. const struct address_space_operations ceph_aops = {
  1163. .readpage = ceph_readpage,
  1164. .readpages = ceph_readpages,
  1165. .writepage = ceph_writepage,
  1166. .writepages = ceph_writepages_start,
  1167. .write_begin = ceph_write_begin,
  1168. .write_end = ceph_write_end,
  1169. .set_page_dirty = ceph_set_page_dirty,
  1170. .invalidatepage = ceph_invalidatepage,
  1171. .releasepage = ceph_releasepage,
  1172. .direct_IO = ceph_direct_io,
  1173. };
  1174. static void ceph_block_sigs(sigset_t *oldset)
  1175. {
  1176. sigset_t mask;
  1177. siginitsetinv(&mask, sigmask(SIGKILL));
  1178. sigprocmask(SIG_BLOCK, &mask, oldset);
  1179. }
  1180. static void ceph_restore_sigs(sigset_t *oldset)
  1181. {
  1182. sigprocmask(SIG_SETMASK, oldset, NULL);
  1183. }
  1184. /*
  1185. * vm ops
  1186. */
  1187. static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
  1188. {
  1189. struct inode *inode = file_inode(vma->vm_file);
  1190. struct ceph_inode_info *ci = ceph_inode(inode);
  1191. struct ceph_file_info *fi = vma->vm_file->private_data;
  1192. struct page *pinned_page = NULL;
  1193. loff_t off = vmf->pgoff << PAGE_SHIFT;
  1194. int want, got, ret;
  1195. sigset_t oldset;
  1196. ceph_block_sigs(&oldset);
  1197. dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
  1198. inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
  1199. if (fi->fmode & CEPH_FILE_MODE_LAZY)
  1200. want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
  1201. else
  1202. want = CEPH_CAP_FILE_CACHE;
  1203. got = 0;
  1204. ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
  1205. if (ret < 0)
  1206. goto out_restore;
  1207. dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
  1208. inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
  1209. if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
  1210. ci->i_inline_version == CEPH_INLINE_NONE)
  1211. ret = filemap_fault(vma, vmf);
  1212. else
  1213. ret = -EAGAIN;
  1214. dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
  1215. inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got), ret);
  1216. if (pinned_page)
  1217. put_page(pinned_page);
  1218. ceph_put_cap_refs(ci, got);
  1219. if (ret != -EAGAIN)
  1220. goto out_restore;
  1221. /* read inline data */
  1222. if (off >= PAGE_SIZE) {
  1223. /* does not support inline data > PAGE_SIZE */
  1224. ret = VM_FAULT_SIGBUS;
  1225. } else {
  1226. int ret1;
  1227. struct address_space *mapping = inode->i_mapping;
  1228. struct page *page = find_or_create_page(mapping, 0,
  1229. mapping_gfp_constraint(mapping,
  1230. ~__GFP_FS));
  1231. if (!page) {
  1232. ret = VM_FAULT_OOM;
  1233. goto out_inline;
  1234. }
  1235. ret1 = __ceph_do_getattr(inode, page,
  1236. CEPH_STAT_CAP_INLINE_DATA, true);
  1237. if (ret1 < 0 || off >= i_size_read(inode)) {
  1238. unlock_page(page);
  1239. put_page(page);
  1240. if (ret1 < 0)
  1241. ret = ret1;
  1242. else
  1243. ret = VM_FAULT_SIGBUS;
  1244. goto out_inline;
  1245. }
  1246. if (ret1 < PAGE_SIZE)
  1247. zero_user_segment(page, ret1, PAGE_SIZE);
  1248. else
  1249. flush_dcache_page(page);
  1250. SetPageUptodate(page);
  1251. vmf->page = page;
  1252. ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
  1253. out_inline:
  1254. dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
  1255. inode, off, (size_t)PAGE_SIZE, ret);
  1256. }
  1257. out_restore:
  1258. ceph_restore_sigs(&oldset);
  1259. if (ret < 0)
  1260. ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
  1261. return ret;
  1262. }
  1263. /*
  1264. * Reuse write_begin here for simplicity.
  1265. */
  1266. static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
  1267. {
  1268. struct inode *inode = file_inode(vma->vm_file);
  1269. struct ceph_inode_info *ci = ceph_inode(inode);
  1270. struct ceph_file_info *fi = vma->vm_file->private_data;
  1271. struct ceph_cap_flush *prealloc_cf;
  1272. struct page *page = vmf->page;
  1273. loff_t off = page_offset(page);
  1274. loff_t size = i_size_read(inode);
  1275. size_t len;
  1276. int want, got, ret;
  1277. sigset_t oldset;
  1278. prealloc_cf = ceph_alloc_cap_flush();
  1279. if (!prealloc_cf)
  1280. return VM_FAULT_OOM;
  1281. ceph_block_sigs(&oldset);
  1282. if (ci->i_inline_version != CEPH_INLINE_NONE) {
  1283. struct page *locked_page = NULL;
  1284. if (off == 0) {
  1285. lock_page(page);
  1286. locked_page = page;
  1287. }
  1288. ret = ceph_uninline_data(vma->vm_file, locked_page);
  1289. if (locked_page)
  1290. unlock_page(locked_page);
  1291. if (ret < 0)
  1292. goto out_free;
  1293. }
  1294. if (off + PAGE_SIZE <= size)
  1295. len = PAGE_SIZE;
  1296. else
  1297. len = size & ~PAGE_MASK;
  1298. dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
  1299. inode, ceph_vinop(inode), off, len, size);
  1300. if (fi->fmode & CEPH_FILE_MODE_LAZY)
  1301. want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
  1302. else
  1303. want = CEPH_CAP_FILE_BUFFER;
  1304. got = 0;
  1305. ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
  1306. &got, NULL);
  1307. if (ret < 0)
  1308. goto out_free;
  1309. dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
  1310. inode, off, len, ceph_cap_string(got));
  1311. /* Update time before taking page lock */
  1312. file_update_time(vma->vm_file);
  1313. do {
  1314. lock_page(page);
  1315. if ((off > size) || (page->mapping != inode->i_mapping)) {
  1316. unlock_page(page);
  1317. ret = VM_FAULT_NOPAGE;
  1318. break;
  1319. }
  1320. ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
  1321. if (ret >= 0) {
  1322. /* success. we'll keep the page locked. */
  1323. set_page_dirty(page);
  1324. ret = VM_FAULT_LOCKED;
  1325. }
  1326. } while (ret == -EAGAIN);
  1327. if (ret == VM_FAULT_LOCKED ||
  1328. ci->i_inline_version != CEPH_INLINE_NONE) {
  1329. int dirty;
  1330. spin_lock(&ci->i_ceph_lock);
  1331. ci->i_inline_version = CEPH_INLINE_NONE;
  1332. dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
  1333. &prealloc_cf);
  1334. spin_unlock(&ci->i_ceph_lock);
  1335. if (dirty)
  1336. __mark_inode_dirty(inode, dirty);
  1337. }
  1338. dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
  1339. inode, off, len, ceph_cap_string(got), ret);
  1340. ceph_put_cap_refs(ci, got);
  1341. out_free:
  1342. ceph_restore_sigs(&oldset);
  1343. ceph_free_cap_flush(prealloc_cf);
  1344. if (ret < 0)
  1345. ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;
  1346. return ret;
  1347. }
  1348. void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
  1349. char *data, size_t len)
  1350. {
  1351. struct address_space *mapping = inode->i_mapping;
  1352. struct page *page;
  1353. if (locked_page) {
  1354. page = locked_page;
  1355. } else {
  1356. if (i_size_read(inode) == 0)
  1357. return;
  1358. page = find_or_create_page(mapping, 0,
  1359. mapping_gfp_constraint(mapping,
  1360. ~__GFP_FS));
  1361. if (!page)
  1362. return;
  1363. if (PageUptodate(page)) {
  1364. unlock_page(page);
  1365. put_page(page);
  1366. return;
  1367. }
  1368. }
  1369. dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
  1370. inode, ceph_vinop(inode), len, locked_page);
  1371. if (len > 0) {
  1372. void *kaddr = kmap_atomic(page);
  1373. memcpy(kaddr, data, len);
  1374. kunmap_atomic(kaddr);
  1375. }
  1376. if (page != locked_page) {
  1377. if (len < PAGE_SIZE)
  1378. zero_user_segment(page, len, PAGE_SIZE);
  1379. else
  1380. flush_dcache_page(page);
  1381. SetPageUptodate(page);
  1382. unlock_page(page);
  1383. put_page(page);
  1384. }
  1385. }
  1386. int ceph_uninline_data(struct file *filp, struct page *locked_page)
  1387. {
  1388. struct inode *inode = file_inode(filp);
  1389. struct ceph_inode_info *ci = ceph_inode(inode);
  1390. struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
  1391. struct ceph_osd_request *req;
  1392. struct page *page = NULL;
  1393. u64 len, inline_version;
  1394. int err = 0;
  1395. bool from_pagecache = false;
  1396. spin_lock(&ci->i_ceph_lock);
  1397. inline_version = ci->i_inline_version;
  1398. spin_unlock(&ci->i_ceph_lock);
  1399. dout("uninline_data %p %llx.%llx inline_version %llu\n",
  1400. inode, ceph_vinop(inode), inline_version);
  1401. if (inline_version == 1 || /* initial version, no data */
  1402. inline_version == CEPH_INLINE_NONE)
  1403. goto out;
  1404. if (locked_page) {
  1405. page = locked_page;
  1406. WARN_ON(!PageUptodate(page));
  1407. } else if (ceph_caps_issued(ci) &
  1408. (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
  1409. page = find_get_page(inode->i_mapping, 0);
  1410. if (page) {
  1411. if (PageUptodate(page)) {
  1412. from_pagecache = true;
  1413. lock_page(page);
  1414. } else {
  1415. put_page(page);
  1416. page = NULL;
  1417. }
  1418. }
  1419. }
  1420. if (page) {
  1421. len = i_size_read(inode);
  1422. if (len > PAGE_SIZE)
  1423. len = PAGE_SIZE;
  1424. } else {
  1425. page = __page_cache_alloc(GFP_NOFS);
  1426. if (!page) {
  1427. err = -ENOMEM;
  1428. goto out;
  1429. }
  1430. err = __ceph_do_getattr(inode, page,
  1431. CEPH_STAT_CAP_INLINE_DATA, true);
  1432. if (err < 0) {
  1433. /* no inline data */
  1434. if (err == -ENODATA)
  1435. err = 0;
  1436. goto out;
  1437. }
  1438. len = err;
  1439. }
  1440. req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
  1441. ceph_vino(inode), 0, &len, 0, 1,
  1442. CEPH_OSD_OP_CREATE,
  1443. CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
  1444. NULL, 0, 0, false);
  1445. if (IS_ERR(req)) {
  1446. err = PTR_ERR(req);
  1447. goto out;
  1448. }
  1449. req->r_mtime = inode->i_mtime;
  1450. err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
  1451. if (!err)
  1452. err = ceph_osdc_wait_request(&fsc->client->osdc, req);
  1453. ceph_osdc_put_request(req);
  1454. if (err < 0)
  1455. goto out;
  1456. req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
  1457. ceph_vino(inode), 0, &len, 1, 3,
  1458. CEPH_OSD_OP_WRITE,
  1459. CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
  1460. NULL, ci->i_truncate_seq,
  1461. ci->i_truncate_size, false);
  1462. if (IS_ERR(req)) {
  1463. err = PTR_ERR(req);
  1464. goto out;
  1465. }
  1466. osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
  1467. {
  1468. __le64 xattr_buf = cpu_to_le64(inline_version);
  1469. err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
  1470. "inline_version", &xattr_buf,
  1471. sizeof(xattr_buf),
  1472. CEPH_OSD_CMPXATTR_OP_GT,
  1473. CEPH_OSD_CMPXATTR_MODE_U64);
  1474. if (err)
  1475. goto out_put;
  1476. }
  1477. {
  1478. char xattr_buf[32];
  1479. int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
  1480. "%llu", inline_version);
  1481. err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
  1482. "inline_version",
  1483. xattr_buf, xattr_len, 0, 0);
  1484. if (err)
  1485. goto out_put;
  1486. }
  1487. req->r_mtime = inode->i_mtime;
  1488. err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
  1489. if (!err)
  1490. err = ceph_osdc_wait_request(&fsc->client->osdc, req);
  1491. out_put:
  1492. ceph_osdc_put_request(req);
  1493. if (err == -ECANCELED)
  1494. err = 0;
  1495. out:
  1496. if (page && page != locked_page) {
  1497. if (from_pagecache) {
  1498. unlock_page(page);
  1499. put_page(page);
  1500. } else
  1501. __free_pages(page, 0);
  1502. }
  1503. dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
  1504. inode, ceph_vinop(inode), inline_version, err);
  1505. return err;
  1506. }
  1507. static const struct vm_operations_struct ceph_vmops = {
  1508. .fault = ceph_filemap_fault,
  1509. .page_mkwrite = ceph_page_mkwrite,
  1510. };
  1511. int ceph_mmap(struct file *file, struct vm_area_struct *vma)
  1512. {
  1513. struct address_space *mapping = file->f_mapping;
  1514. if (!mapping->a_ops->readpage)
  1515. return -ENOEXEC;
  1516. file_accessed(file);
  1517. vma->vm_ops = &ceph_vmops;
  1518. return 0;
  1519. }
  1520. enum {
  1521. POOL_READ = 1,
  1522. POOL_WRITE = 2,
  1523. };
  1524. static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
  1525. s64 pool, struct ceph_string *pool_ns)
  1526. {
  1527. struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
  1528. struct ceph_mds_client *mdsc = fsc->mdsc;
  1529. struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
  1530. struct rb_node **p, *parent;
  1531. struct ceph_pool_perm *perm;
  1532. struct page **pages;
  1533. size_t pool_ns_len;
  1534. int err = 0, err2 = 0, have = 0;
  1535. down_read(&mdsc->pool_perm_rwsem);
  1536. p = &mdsc->pool_perm_tree.rb_node;
  1537. while (*p) {
  1538. perm = rb_entry(*p, struct ceph_pool_perm, node);
  1539. if (pool < perm->pool)
  1540. p = &(*p)->rb_left;
  1541. else if (pool > perm->pool)
  1542. p = &(*p)->rb_right;
  1543. else {
  1544. int ret = ceph_compare_string(pool_ns,
  1545. perm->pool_ns,
  1546. perm->pool_ns_len);
  1547. if (ret < 0)
  1548. p = &(*p)->rb_left;
  1549. else if (ret > 0)
  1550. p = &(*p)->rb_right;
  1551. else {
  1552. have = perm->perm;
  1553. break;
  1554. }
  1555. }
  1556. }
  1557. up_read(&mdsc->pool_perm_rwsem);
  1558. if (*p)
  1559. goto out;
  1560. if (pool_ns)
  1561. dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
  1562. pool, (int)pool_ns->len, pool_ns->str);
  1563. else
  1564. dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
  1565. down_write(&mdsc->pool_perm_rwsem);
  1566. p = &mdsc->pool_perm_tree.rb_node;
  1567. parent = NULL;
  1568. while (*p) {
  1569. parent = *p;
  1570. perm = rb_entry(parent, struct ceph_pool_perm, node);
  1571. if (pool < perm->pool)
  1572. p = &(*p)->rb_left;
  1573. else if (pool > perm->pool)
  1574. p = &(*p)->rb_right;
  1575. else {
  1576. int ret = ceph_compare_string(pool_ns,
  1577. perm->pool_ns,
  1578. perm->pool_ns_len);
  1579. if (ret < 0)
  1580. p = &(*p)->rb_left;
  1581. else if (ret > 0)
  1582. p = &(*p)->rb_right;
  1583. else {
  1584. have = perm->perm;
  1585. break;
  1586. }
  1587. }
  1588. }
  1589. if (*p) {
  1590. up_write(&mdsc->pool_perm_rwsem);
  1591. goto out;
  1592. }
  1593. rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
  1594. 1, false, GFP_NOFS);
  1595. if (!rd_req) {
  1596. err = -ENOMEM;
  1597. goto out_unlock;
  1598. }
  1599. rd_req->r_flags = CEPH_OSD_FLAG_READ;
  1600. osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
  1601. rd_req->r_base_oloc.pool = pool;
  1602. if (pool_ns)
  1603. rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
  1604. ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
  1605. err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
  1606. if (err)
  1607. goto out_unlock;
  1608. wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
  1609. 1, false, GFP_NOFS);
  1610. if (!wr_req) {
  1611. err = -ENOMEM;
  1612. goto out_unlock;
  1613. }
  1614. wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;
  1615. osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
  1616. ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
  1617. ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
  1618. err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
  1619. if (err)
  1620. goto out_unlock;
  1621. /* one page should be large enough for STAT data */
  1622. pages = ceph_alloc_page_vector(1, GFP_KERNEL);
  1623. if (IS_ERR(pages)) {
  1624. err = PTR_ERR(pages);
  1625. goto out_unlock;
  1626. }
  1627. osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
  1628. 0, false, true);
  1629. err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
  1630. wr_req->r_mtime = ci->vfs_inode.i_mtime;
  1631. err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
  1632. if (!err)
  1633. err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
  1634. if (!err2)
  1635. err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
  1636. if (err >= 0 || err == -ENOENT)
  1637. have |= POOL_READ;
  1638. else if (err != -EPERM)
  1639. goto out_unlock;
  1640. if (err2 == 0 || err2 == -EEXIST)
  1641. have |= POOL_WRITE;
  1642. else if (err2 != -EPERM) {
  1643. err = err2;
  1644. goto out_unlock;
  1645. }
  1646. pool_ns_len = pool_ns ? pool_ns->len : 0;
  1647. perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
  1648. if (!perm) {
  1649. err = -ENOMEM;
  1650. goto out_unlock;
  1651. }
  1652. perm->pool = pool;
  1653. perm->perm = have;
  1654. perm->pool_ns_len = pool_ns_len;
  1655. if (pool_ns_len > 0)
  1656. memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
  1657. perm->pool_ns[pool_ns_len] = 0;
  1658. rb_link_node(&perm->node, parent, p);
  1659. rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
  1660. err = 0;
  1661. out_unlock:
  1662. up_write(&mdsc->pool_perm_rwsem);
  1663. ceph_osdc_put_request(rd_req);
  1664. ceph_osdc_put_request(wr_req);
  1665. out:
  1666. if (!err)
  1667. err = have;
  1668. if (pool_ns)
  1669. dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
  1670. pool, (int)pool_ns->len, pool_ns->str, err);
  1671. else
  1672. dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
  1673. return err;
  1674. }
  1675. int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
  1676. {
  1677. s64 pool;
  1678. struct ceph_string *pool_ns;
  1679. int ret, flags;
  1680. if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
  1681. NOPOOLPERM))
  1682. return 0;
  1683. spin_lock(&ci->i_ceph_lock);
  1684. flags = ci->i_ceph_flags;
  1685. pool = ci->i_layout.pool_id;
  1686. spin_unlock(&ci->i_ceph_lock);
  1687. check:
  1688. if (flags & CEPH_I_POOL_PERM) {
  1689. if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
  1690. dout("ceph_pool_perm_check pool %lld no read perm\n",
  1691. pool);
  1692. return -EPERM;
  1693. }
  1694. if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
  1695. dout("ceph_pool_perm_check pool %lld no write perm\n",
  1696. pool);
  1697. return -EPERM;
  1698. }
  1699. return 0;
  1700. }
  1701. pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
  1702. ret = __ceph_pool_perm_get(ci, pool, pool_ns);
  1703. ceph_put_string(pool_ns);
  1704. if (ret < 0)
  1705. return ret;
  1706. flags = CEPH_I_POOL_PERM;
  1707. if (ret & POOL_READ)
  1708. flags |= CEPH_I_POOL_RD;
  1709. if (ret & POOL_WRITE)
  1710. flags |= CEPH_I_POOL_WR;
  1711. spin_lock(&ci->i_ceph_lock);
  1712. if (pool == ci->i_layout.pool_id &&
  1713. pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
  1714. ci->i_ceph_flags |= flags;
  1715. } else {
  1716. pool = ci->i_layout.pool_id;
  1717. flags = ci->i_ceph_flags;
  1718. }
  1719. spin_unlock(&ci->i_ceph_lock);
  1720. goto check;
  1721. }
  1722. void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
  1723. {
  1724. struct ceph_pool_perm *perm;
  1725. struct rb_node *n;
  1726. while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
  1727. n = rb_first(&mdsc->pool_perm_tree);
  1728. perm = rb_entry(n, struct ceph_pool_perm, node);
  1729. rb_erase(n, &mdsc->pool_perm_tree);
  1730. kfree(perm);
  1731. }
  1732. }