mon_client.c 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/module.h>
  3. #include <linux/types.h>
  4. #include <linux/slab.h>
  5. #include <linux/random.h>
  6. #include <linux/sched.h>
  7. #include <linux/ceph/mon_client.h>
  8. #include <linux/ceph/libceph.h>
  9. #include <linux/ceph/debugfs.h>
  10. #include <linux/ceph/decode.h>
  11. #include <linux/ceph/auth.h>
  12. /*
  13. * Interact with Ceph monitor cluster. Handle requests for new map
  14. * versions, and periodically resend as needed. Also implement
  15. * statfs() and umount().
  16. *
  17. * A small cluster of Ceph "monitors" are responsible for managing critical
  18. * cluster configuration and state information. An odd number (e.g., 3, 5)
  19. * of cmon daemons use a modified version of the Paxos part-time parliament
  20. * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  21. * list of clients who have mounted the file system.
  22. *
  23. * We maintain an open, active session with a monitor at all times in order to
  24. * receive timely MDSMap updates. We periodically send a keepalive byte on the
  25. * TCP socket to ensure we detect a failure. If the connection does break, we
  26. * randomly hunt for a new monitor. Once the connection is reestablished, we
  27. * resend any outstanding requests.
  28. */
  29. static const struct ceph_connection_operations mon_con_ops;
  30. static int __validate_auth(struct ceph_mon_client *monc);
  31. /*
  32. * Decode a monmap blob (e.g., during mount).
  33. */
  34. struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  35. {
  36. struct ceph_monmap *m = NULL;
  37. int i, err = -EINVAL;
  38. struct ceph_fsid fsid;
  39. u32 epoch, num_mon;
  40. u16 version;
  41. u32 len;
  42. ceph_decode_32_safe(&p, end, len, bad);
  43. ceph_decode_need(&p, end, len, bad);
  44. dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  45. ceph_decode_16_safe(&p, end, version, bad);
  46. ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  47. ceph_decode_copy(&p, &fsid, sizeof(fsid));
  48. epoch = ceph_decode_32(&p);
  49. num_mon = ceph_decode_32(&p);
  50. ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  51. if (num_mon >= CEPH_MAX_MON)
  52. goto bad;
  53. m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  54. if (m == NULL)
  55. return ERR_PTR(-ENOMEM);
  56. m->fsid = fsid;
  57. m->epoch = epoch;
  58. m->num_mon = num_mon;
  59. ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  60. for (i = 0; i < num_mon; i++)
  61. ceph_decode_addr(&m->mon_inst[i].addr);
  62. dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  63. m->num_mon);
  64. for (i = 0; i < m->num_mon; i++)
  65. dout("monmap_decode mon%d is %s\n", i,
  66. ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  67. return m;
  68. bad:
  69. dout("monmap_decode failed with %d\n", err);
  70. kfree(m);
  71. return ERR_PTR(err);
  72. }
  73. /*
  74. * return true if *addr is included in the monmap.
  75. */
  76. int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  77. {
  78. int i;
  79. for (i = 0; i < m->num_mon; i++)
  80. if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  81. return 1;
  82. return 0;
  83. }
  84. /*
  85. * Send an auth request.
  86. */
  87. static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
  88. {
  89. monc->pending_auth = 1;
  90. monc->m_auth->front.iov_len = len;
  91. monc->m_auth->hdr.front_len = cpu_to_le32(len);
  92. ceph_msg_revoke(monc->m_auth);
  93. ceph_msg_get(monc->m_auth); /* keep our ref */
  94. ceph_con_send(&monc->con, monc->m_auth);
  95. }
  96. /*
  97. * Close monitor session, if any.
  98. */
  99. static void __close_session(struct ceph_mon_client *monc)
  100. {
  101. dout("__close_session closing mon%d\n", monc->cur_mon);
  102. ceph_msg_revoke(monc->m_auth);
  103. ceph_msg_revoke_incoming(monc->m_auth_reply);
  104. ceph_msg_revoke(monc->m_subscribe);
  105. ceph_msg_revoke_incoming(monc->m_subscribe_ack);
  106. ceph_con_close(&monc->con);
  107. monc->pending_auth = 0;
  108. ceph_auth_reset(monc->auth);
  109. }
  110. /*
  111. * Pick a new monitor at random and set cur_mon. If we are repicking
  112. * (i.e. cur_mon is already set), be sure to pick a different one.
  113. */
  114. static void pick_new_mon(struct ceph_mon_client *monc)
  115. {
  116. int old_mon = monc->cur_mon;
  117. BUG_ON(monc->monmap->num_mon < 1);
  118. if (monc->monmap->num_mon == 1) {
  119. monc->cur_mon = 0;
  120. } else {
  121. int max = monc->monmap->num_mon;
  122. int o = -1;
  123. int n;
  124. if (monc->cur_mon >= 0) {
  125. if (monc->cur_mon < monc->monmap->num_mon)
  126. o = monc->cur_mon;
  127. if (o >= 0)
  128. max--;
  129. }
  130. n = prandom_u32() % max;
  131. if (o >= 0 && n >= o)
  132. n++;
  133. monc->cur_mon = n;
  134. }
  135. dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
  136. monc->cur_mon, monc->monmap->num_mon);
  137. }
  138. /*
  139. * Open a session with a new monitor.
  140. */
  141. static void __open_session(struct ceph_mon_client *monc)
  142. {
  143. int ret;
  144. pick_new_mon(monc);
  145. monc->hunting = true;
  146. if (monc->had_a_connection) {
  147. monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
  148. if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
  149. monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
  150. }
  151. monc->sub_renew_after = jiffies; /* i.e., expired */
  152. monc->sub_renew_sent = 0;
  153. dout("%s opening mon%d\n", __func__, monc->cur_mon);
  154. ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
  155. &monc->monmap->mon_inst[monc->cur_mon].addr);
  156. /*
  157. * send an initial keepalive to ensure our timestamp is valid
  158. * by the time we are in an OPENED state
  159. */
  160. ceph_con_keepalive(&monc->con);
  161. /* initiate authentication handshake */
  162. ret = ceph_auth_build_hello(monc->auth,
  163. monc->m_auth->front.iov_base,
  164. monc->m_auth->front_alloc_len);
  165. BUG_ON(ret <= 0);
  166. __send_prepared_auth_request(monc, ret);
  167. }
  168. static void reopen_session(struct ceph_mon_client *monc)
  169. {
  170. if (!monc->hunting)
  171. pr_info("mon%d %s session lost, hunting for new mon\n",
  172. monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
  173. __close_session(monc);
  174. __open_session(monc);
  175. }
  176. /*
  177. * Reschedule delayed work timer.
  178. */
  179. static void __schedule_delayed(struct ceph_mon_client *monc)
  180. {
  181. unsigned long delay;
  182. if (monc->hunting)
  183. delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
  184. else
  185. delay = CEPH_MONC_PING_INTERVAL;
  186. dout("__schedule_delayed after %lu\n", delay);
  187. mod_delayed_work(system_wq, &monc->delayed_work,
  188. round_jiffies_relative(delay));
  189. }
  190. const char *ceph_sub_str[] = {
  191. [CEPH_SUB_MDSMAP] = "mdsmap",
  192. [CEPH_SUB_MONMAP] = "monmap",
  193. [CEPH_SUB_OSDMAP] = "osdmap",
  194. };
  195. /*
  196. * Send subscribe request for one or more maps, according to
  197. * monc->subs.
  198. */
  199. static void __send_subscribe(struct ceph_mon_client *monc)
  200. {
  201. struct ceph_msg *msg = monc->m_subscribe;
  202. void *p = msg->front.iov_base;
  203. void *const end = p + msg->front_alloc_len;
  204. int num = 0;
  205. int i;
  206. dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
  207. BUG_ON(monc->cur_mon < 0);
  208. if (!monc->sub_renew_sent)
  209. monc->sub_renew_sent = jiffies | 1; /* never 0 */
  210. msg->hdr.version = cpu_to_le16(2);
  211. for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
  212. if (monc->subs[i].want)
  213. num++;
  214. }
  215. BUG_ON(num < 1); /* monmap sub is always there */
  216. ceph_encode_32(&p, num);
  217. for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
  218. const char *s = ceph_sub_str[i];
  219. if (!monc->subs[i].want)
  220. continue;
  221. dout("%s %s start %llu flags 0x%x\n", __func__, s,
  222. le64_to_cpu(monc->subs[i].item.start),
  223. monc->subs[i].item.flags);
  224. ceph_encode_string(&p, end, s, strlen(s));
  225. memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
  226. p += sizeof(monc->subs[i].item);
  227. }
  228. BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
  229. msg->front.iov_len = p - msg->front.iov_base;
  230. msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
  231. ceph_msg_revoke(msg);
  232. ceph_con_send(&monc->con, ceph_msg_get(msg));
  233. }
  234. static void handle_subscribe_ack(struct ceph_mon_client *monc,
  235. struct ceph_msg *msg)
  236. {
  237. unsigned int seconds;
  238. struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
  239. if (msg->front.iov_len < sizeof(*h))
  240. goto bad;
  241. seconds = le32_to_cpu(h->duration);
  242. mutex_lock(&monc->mutex);
  243. if (monc->sub_renew_sent) {
  244. monc->sub_renew_after = monc->sub_renew_sent +
  245. (seconds >> 1) * HZ - 1;
  246. dout("%s sent %lu duration %d renew after %lu\n", __func__,
  247. monc->sub_renew_sent, seconds, monc->sub_renew_after);
  248. monc->sub_renew_sent = 0;
  249. } else {
  250. dout("%s sent %lu renew after %lu, ignoring\n", __func__,
  251. monc->sub_renew_sent, monc->sub_renew_after);
  252. }
  253. mutex_unlock(&monc->mutex);
  254. return;
  255. bad:
  256. pr_err("got corrupt subscribe-ack msg\n");
  257. ceph_msg_dump(msg);
  258. }
  259. /*
  260. * Register interest in a map
  261. *
  262. * @sub: one of CEPH_SUB_*
  263. * @epoch: X for "every map since X", or 0 for "just the latest"
  264. */
  265. static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
  266. u32 epoch, bool continuous)
  267. {
  268. __le64 start = cpu_to_le64(epoch);
  269. u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
  270. dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
  271. epoch, continuous);
  272. if (monc->subs[sub].want &&
  273. monc->subs[sub].item.start == start &&
  274. monc->subs[sub].item.flags == flags)
  275. return false;
  276. monc->subs[sub].item.start = start;
  277. monc->subs[sub].item.flags = flags;
  278. monc->subs[sub].want = true;
  279. return true;
  280. }
  281. bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
  282. bool continuous)
  283. {
  284. bool need_request;
  285. mutex_lock(&monc->mutex);
  286. need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
  287. mutex_unlock(&monc->mutex);
  288. return need_request;
  289. }
  290. EXPORT_SYMBOL(ceph_monc_want_map);
  291. /*
  292. * Keep track of which maps we have
  293. *
  294. * @sub: one of CEPH_SUB_*
  295. */
  296. static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
  297. u32 epoch)
  298. {
  299. dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
  300. if (monc->subs[sub].want) {
  301. if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
  302. monc->subs[sub].want = false;
  303. else
  304. monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
  305. }
  306. monc->subs[sub].have = epoch;
  307. }
  308. void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
  309. {
  310. mutex_lock(&monc->mutex);
  311. __ceph_monc_got_map(monc, sub, epoch);
  312. mutex_unlock(&monc->mutex);
  313. }
  314. EXPORT_SYMBOL(ceph_monc_got_map);
  315. /*
  316. * Register interest in the next osdmap
  317. */
  318. void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
  319. {
  320. dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
  321. mutex_lock(&monc->mutex);
  322. if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
  323. monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
  324. __send_subscribe(monc);
  325. mutex_unlock(&monc->mutex);
  326. }
  327. EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
  328. /*
  329. * Wait for an osdmap with a given epoch.
  330. *
  331. * @epoch: epoch to wait for
  332. * @timeout: in jiffies, 0 means "wait forever"
  333. */
  334. int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
  335. unsigned long timeout)
  336. {
  337. unsigned long started = jiffies;
  338. long ret;
  339. mutex_lock(&monc->mutex);
  340. while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
  341. mutex_unlock(&monc->mutex);
  342. if (timeout && time_after_eq(jiffies, started + timeout))
  343. return -ETIMEDOUT;
  344. ret = wait_event_interruptible_timeout(monc->client->auth_wq,
  345. monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
  346. ceph_timeout_jiffies(timeout));
  347. if (ret < 0)
  348. return ret;
  349. mutex_lock(&monc->mutex);
  350. }
  351. mutex_unlock(&monc->mutex);
  352. return 0;
  353. }
  354. EXPORT_SYMBOL(ceph_monc_wait_osdmap);
  355. /*
  356. * Open a session with a random monitor. Request monmap and osdmap,
  357. * which are waited upon in __ceph_open_session().
  358. */
  359. int ceph_monc_open_session(struct ceph_mon_client *monc)
  360. {
  361. mutex_lock(&monc->mutex);
  362. __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
  363. __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
  364. __open_session(monc);
  365. __schedule_delayed(monc);
  366. mutex_unlock(&monc->mutex);
  367. return 0;
  368. }
  369. EXPORT_SYMBOL(ceph_monc_open_session);
  370. static void ceph_monc_handle_map(struct ceph_mon_client *monc,
  371. struct ceph_msg *msg)
  372. {
  373. struct ceph_client *client = monc->client;
  374. struct ceph_monmap *monmap = NULL, *old = monc->monmap;
  375. void *p, *end;
  376. mutex_lock(&monc->mutex);
  377. dout("handle_monmap\n");
  378. p = msg->front.iov_base;
  379. end = p + msg->front.iov_len;
  380. monmap = ceph_monmap_decode(p, end);
  381. if (IS_ERR(monmap)) {
  382. pr_err("problem decoding monmap, %d\n",
  383. (int)PTR_ERR(monmap));
  384. goto out;
  385. }
  386. if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
  387. kfree(monmap);
  388. goto out;
  389. }
  390. client->monc.monmap = monmap;
  391. kfree(old);
  392. __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
  393. client->have_fsid = true;
  394. out:
  395. mutex_unlock(&monc->mutex);
  396. wake_up_all(&client->auth_wq);
  397. }
  398. /*
  399. * generic requests (currently statfs, mon_get_version)
  400. */
  401. static struct ceph_mon_generic_request *__lookup_generic_req(
  402. struct ceph_mon_client *monc, u64 tid)
  403. {
  404. struct ceph_mon_generic_request *req;
  405. struct rb_node *n = monc->generic_request_tree.rb_node;
  406. while (n) {
  407. req = rb_entry(n, struct ceph_mon_generic_request, node);
  408. if (tid < req->tid)
  409. n = n->rb_left;
  410. else if (tid > req->tid)
  411. n = n->rb_right;
  412. else
  413. return req;
  414. }
  415. return NULL;
  416. }
  417. static void __insert_generic_request(struct ceph_mon_client *monc,
  418. struct ceph_mon_generic_request *new)
  419. {
  420. struct rb_node **p = &monc->generic_request_tree.rb_node;
  421. struct rb_node *parent = NULL;
  422. struct ceph_mon_generic_request *req = NULL;
  423. while (*p) {
  424. parent = *p;
  425. req = rb_entry(parent, struct ceph_mon_generic_request, node);
  426. if (new->tid < req->tid)
  427. p = &(*p)->rb_left;
  428. else if (new->tid > req->tid)
  429. p = &(*p)->rb_right;
  430. else
  431. BUG();
  432. }
  433. rb_link_node(&new->node, parent, p);
  434. rb_insert_color(&new->node, &monc->generic_request_tree);
  435. }
  436. static void release_generic_request(struct kref *kref)
  437. {
  438. struct ceph_mon_generic_request *req =
  439. container_of(kref, struct ceph_mon_generic_request, kref);
  440. if (req->reply)
  441. ceph_msg_put(req->reply);
  442. if (req->request)
  443. ceph_msg_put(req->request);
  444. kfree(req);
  445. }
  446. static void put_generic_request(struct ceph_mon_generic_request *req)
  447. {
  448. kref_put(&req->kref, release_generic_request);
  449. }
  450. static void get_generic_request(struct ceph_mon_generic_request *req)
  451. {
  452. kref_get(&req->kref);
  453. }
  454. static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
  455. struct ceph_msg_header *hdr,
  456. int *skip)
  457. {
  458. struct ceph_mon_client *monc = con->private;
  459. struct ceph_mon_generic_request *req;
  460. u64 tid = le64_to_cpu(hdr->tid);
  461. struct ceph_msg *m;
  462. mutex_lock(&monc->mutex);
  463. req = __lookup_generic_req(monc, tid);
  464. if (!req) {
  465. dout("get_generic_reply %lld dne\n", tid);
  466. *skip = 1;
  467. m = NULL;
  468. } else {
  469. dout("get_generic_reply %lld got %p\n", tid, req->reply);
  470. *skip = 0;
  471. m = ceph_msg_get(req->reply);
  472. /*
  473. * we don't need to track the connection reading into
  474. * this reply because we only have one open connection
  475. * at a time, ever.
  476. */
  477. }
  478. mutex_unlock(&monc->mutex);
  479. return m;
  480. }
  481. static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
  482. struct ceph_mon_generic_request *req)
  483. {
  484. int err;
  485. /* register request */
  486. req->tid = tid != 0 ? tid : ++monc->last_tid;
  487. req->request->hdr.tid = cpu_to_le64(req->tid);
  488. __insert_generic_request(monc, req);
  489. monc->num_generic_requests++;
  490. ceph_con_send(&monc->con, ceph_msg_get(req->request));
  491. mutex_unlock(&monc->mutex);
  492. err = wait_for_completion_interruptible(&req->completion);
  493. mutex_lock(&monc->mutex);
  494. rb_erase(&req->node, &monc->generic_request_tree);
  495. monc->num_generic_requests--;
  496. if (!err)
  497. err = req->result;
  498. return err;
  499. }
  500. static int do_generic_request(struct ceph_mon_client *monc,
  501. struct ceph_mon_generic_request *req)
  502. {
  503. int err;
  504. mutex_lock(&monc->mutex);
  505. err = __do_generic_request(monc, 0, req);
  506. mutex_unlock(&monc->mutex);
  507. return err;
  508. }
  509. /*
  510. * statfs
  511. */
  512. static void handle_statfs_reply(struct ceph_mon_client *monc,
  513. struct ceph_msg *msg)
  514. {
  515. struct ceph_mon_generic_request *req;
  516. struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
  517. u64 tid = le64_to_cpu(msg->hdr.tid);
  518. if (msg->front.iov_len != sizeof(*reply))
  519. goto bad;
  520. dout("handle_statfs_reply %p tid %llu\n", msg, tid);
  521. mutex_lock(&monc->mutex);
  522. req = __lookup_generic_req(monc, tid);
  523. if (req) {
  524. *(struct ceph_statfs *)req->buf = reply->st;
  525. req->result = 0;
  526. get_generic_request(req);
  527. }
  528. mutex_unlock(&monc->mutex);
  529. if (req) {
  530. complete_all(&req->completion);
  531. put_generic_request(req);
  532. }
  533. return;
  534. bad:
  535. pr_err("corrupt statfs reply, tid %llu\n", tid);
  536. ceph_msg_dump(msg);
  537. }
  538. /*
  539. * Do a synchronous statfs().
  540. */
  541. int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
  542. {
  543. struct ceph_mon_generic_request *req;
  544. struct ceph_mon_statfs *h;
  545. int err;
  546. req = kzalloc(sizeof(*req), GFP_NOFS);
  547. if (!req)
  548. return -ENOMEM;
  549. kref_init(&req->kref);
  550. req->buf = buf;
  551. init_completion(&req->completion);
  552. err = -ENOMEM;
  553. req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
  554. true);
  555. if (!req->request)
  556. goto out;
  557. req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
  558. true);
  559. if (!req->reply)
  560. goto out;
  561. /* fill out request */
  562. h = req->request->front.iov_base;
  563. h->monhdr.have_version = 0;
  564. h->monhdr.session_mon = cpu_to_le16(-1);
  565. h->monhdr.session_mon_tid = 0;
  566. h->fsid = monc->monmap->fsid;
  567. err = do_generic_request(monc, req);
  568. out:
  569. put_generic_request(req);
  570. return err;
  571. }
  572. EXPORT_SYMBOL(ceph_monc_do_statfs);
  573. static void handle_get_version_reply(struct ceph_mon_client *monc,
  574. struct ceph_msg *msg)
  575. {
  576. struct ceph_mon_generic_request *req;
  577. u64 tid = le64_to_cpu(msg->hdr.tid);
  578. void *p = msg->front.iov_base;
  579. void *end = p + msg->front_alloc_len;
  580. u64 handle;
  581. dout("%s %p tid %llu\n", __func__, msg, tid);
  582. ceph_decode_need(&p, end, 2*sizeof(u64), bad);
  583. handle = ceph_decode_64(&p);
  584. if (tid != 0 && tid != handle)
  585. goto bad;
  586. mutex_lock(&monc->mutex);
  587. req = __lookup_generic_req(monc, handle);
  588. if (req) {
  589. *(u64 *)req->buf = ceph_decode_64(&p);
  590. req->result = 0;
  591. get_generic_request(req);
  592. }
  593. mutex_unlock(&monc->mutex);
  594. if (req) {
  595. complete_all(&req->completion);
  596. put_generic_request(req);
  597. }
  598. return;
  599. bad:
  600. pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
  601. ceph_msg_dump(msg);
  602. }
  603. /*
  604. * Send MMonGetVersion and wait for the reply.
  605. *
  606. * @what: one of "mdsmap", "osdmap" or "monmap"
  607. */
  608. int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
  609. u64 *newest)
  610. {
  611. struct ceph_mon_generic_request *req;
  612. void *p, *end;
  613. u64 tid;
  614. int err;
  615. req = kzalloc(sizeof(*req), GFP_NOFS);
  616. if (!req)
  617. return -ENOMEM;
  618. kref_init(&req->kref);
  619. req->buf = newest;
  620. init_completion(&req->completion);
  621. req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
  622. sizeof(u64) + sizeof(u32) + strlen(what),
  623. GFP_NOFS, true);
  624. if (!req->request) {
  625. err = -ENOMEM;
  626. goto out;
  627. }
  628. req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
  629. GFP_NOFS, true);
  630. if (!req->reply) {
  631. err = -ENOMEM;
  632. goto out;
  633. }
  634. p = req->request->front.iov_base;
  635. end = p + req->request->front_alloc_len;
  636. /* fill out request */
  637. mutex_lock(&monc->mutex);
  638. tid = ++monc->last_tid;
  639. ceph_encode_64(&p, tid); /* handle */
  640. ceph_encode_string(&p, end, what, strlen(what));
  641. err = __do_generic_request(monc, tid, req);
  642. mutex_unlock(&monc->mutex);
  643. out:
  644. put_generic_request(req);
  645. return err;
  646. }
  647. EXPORT_SYMBOL(ceph_monc_do_get_version);
  648. /*
  649. * Resend pending generic requests.
  650. */
  651. static void __resend_generic_request(struct ceph_mon_client *monc)
  652. {
  653. struct ceph_mon_generic_request *req;
  654. struct rb_node *p;
  655. for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
  656. req = rb_entry(p, struct ceph_mon_generic_request, node);
  657. ceph_msg_revoke(req->request);
  658. ceph_msg_revoke_incoming(req->reply);
  659. ceph_con_send(&monc->con, ceph_msg_get(req->request));
  660. }
  661. }
  662. /*
  663. * Delayed work. If we haven't mounted yet, retry. Otherwise,
  664. * renew/retry subscription as needed (in case it is timing out, or we
  665. * got an ENOMEM). And keep the monitor connection alive.
  666. */
  667. static void delayed_work(struct work_struct *work)
  668. {
  669. struct ceph_mon_client *monc =
  670. container_of(work, struct ceph_mon_client, delayed_work.work);
  671. dout("monc delayed_work\n");
  672. mutex_lock(&monc->mutex);
  673. if (monc->hunting) {
  674. dout("%s continuing hunt\n", __func__);
  675. reopen_session(monc);
  676. } else {
  677. int is_auth = ceph_auth_is_authenticated(monc->auth);
  678. if (ceph_con_keepalive_expired(&monc->con,
  679. CEPH_MONC_PING_TIMEOUT)) {
  680. dout("monc keepalive timeout\n");
  681. is_auth = 0;
  682. reopen_session(monc);
  683. }
  684. if (!monc->hunting) {
  685. ceph_con_keepalive(&monc->con);
  686. __validate_auth(monc);
  687. }
  688. if (is_auth) {
  689. unsigned long now = jiffies;
  690. dout("%s renew subs? now %lu renew after %lu\n",
  691. __func__, now, monc->sub_renew_after);
  692. if (time_after_eq(now, monc->sub_renew_after))
  693. __send_subscribe(monc);
  694. }
  695. }
  696. __schedule_delayed(monc);
  697. mutex_unlock(&monc->mutex);
  698. }
  699. /*
  700. * On startup, we build a temporary monmap populated with the IPs
  701. * provided by mount(2).
  702. */
  703. static int build_initial_monmap(struct ceph_mon_client *monc)
  704. {
  705. struct ceph_options *opt = monc->client->options;
  706. struct ceph_entity_addr *mon_addr = opt->mon_addr;
  707. int num_mon = opt->num_mon;
  708. int i;
  709. /* build initial monmap */
  710. monc->monmap = kzalloc(sizeof(*monc->monmap) +
  711. num_mon*sizeof(monc->monmap->mon_inst[0]),
  712. GFP_KERNEL);
  713. if (!monc->monmap)
  714. return -ENOMEM;
  715. for (i = 0; i < num_mon; i++) {
  716. monc->monmap->mon_inst[i].addr = mon_addr[i];
  717. monc->monmap->mon_inst[i].addr.nonce = 0;
  718. monc->monmap->mon_inst[i].name.type =
  719. CEPH_ENTITY_TYPE_MON;
  720. monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
  721. }
  722. monc->monmap->num_mon = num_mon;
  723. return 0;
  724. }
  725. int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
  726. {
  727. int err = 0;
  728. dout("init\n");
  729. memset(monc, 0, sizeof(*monc));
  730. monc->client = cl;
  731. monc->monmap = NULL;
  732. mutex_init(&monc->mutex);
  733. err = build_initial_monmap(monc);
  734. if (err)
  735. goto out;
  736. /* connection */
  737. /* authentication */
  738. monc->auth = ceph_auth_init(cl->options->name,
  739. cl->options->key);
  740. if (IS_ERR(monc->auth)) {
  741. err = PTR_ERR(monc->auth);
  742. goto out_monmap;
  743. }
  744. monc->auth->want_keys =
  745. CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
  746. CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
  747. /* msgs */
  748. err = -ENOMEM;
  749. monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
  750. sizeof(struct ceph_mon_subscribe_ack),
  751. GFP_NOFS, true);
  752. if (!monc->m_subscribe_ack)
  753. goto out_auth;
  754. monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
  755. true);
  756. if (!monc->m_subscribe)
  757. goto out_subscribe_ack;
  758. monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
  759. true);
  760. if (!monc->m_auth_reply)
  761. goto out_subscribe;
  762. monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
  763. monc->pending_auth = 0;
  764. if (!monc->m_auth)
  765. goto out_auth_reply;
  766. ceph_con_init(&monc->con, monc, &mon_con_ops,
  767. &monc->client->msgr);
  768. monc->cur_mon = -1;
  769. monc->had_a_connection = false;
  770. monc->hunt_mult = 1;
  771. INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
  772. monc->generic_request_tree = RB_ROOT;
  773. monc->num_generic_requests = 0;
  774. monc->last_tid = 0;
  775. return 0;
  776. out_auth_reply:
  777. ceph_msg_put(monc->m_auth_reply);
  778. out_subscribe:
  779. ceph_msg_put(monc->m_subscribe);
  780. out_subscribe_ack:
  781. ceph_msg_put(monc->m_subscribe_ack);
  782. out_auth:
  783. ceph_auth_destroy(monc->auth);
  784. out_monmap:
  785. kfree(monc->monmap);
  786. out:
  787. return err;
  788. }
  789. EXPORT_SYMBOL(ceph_monc_init);
  790. void ceph_monc_stop(struct ceph_mon_client *monc)
  791. {
  792. dout("stop\n");
  793. cancel_delayed_work_sync(&monc->delayed_work);
  794. mutex_lock(&monc->mutex);
  795. __close_session(monc);
  796. monc->cur_mon = -1;
  797. mutex_unlock(&monc->mutex);
  798. /*
  799. * flush msgr queue before we destroy ourselves to ensure that:
  800. * - any work that references our embedded con is finished.
  801. * - any osd_client or other work that may reference an authorizer
  802. * finishes before we shut down the auth subsystem.
  803. */
  804. ceph_msgr_flush();
  805. ceph_auth_destroy(monc->auth);
  806. ceph_msg_put(monc->m_auth);
  807. ceph_msg_put(monc->m_auth_reply);
  808. ceph_msg_put(monc->m_subscribe);
  809. ceph_msg_put(monc->m_subscribe_ack);
  810. kfree(monc->monmap);
  811. }
  812. EXPORT_SYMBOL(ceph_monc_stop);
  813. static void finish_hunting(struct ceph_mon_client *monc)
  814. {
  815. if (monc->hunting) {
  816. dout("%s found mon%d\n", __func__, monc->cur_mon);
  817. monc->hunting = false;
  818. monc->had_a_connection = true;
  819. monc->hunt_mult /= 2; /* reduce by 50% */
  820. if (monc->hunt_mult < 1)
  821. monc->hunt_mult = 1;
  822. }
  823. }
  824. static void handle_auth_reply(struct ceph_mon_client *monc,
  825. struct ceph_msg *msg)
  826. {
  827. int ret;
  828. int was_auth = 0;
  829. mutex_lock(&monc->mutex);
  830. was_auth = ceph_auth_is_authenticated(monc->auth);
  831. monc->pending_auth = 0;
  832. ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
  833. msg->front.iov_len,
  834. monc->m_auth->front.iov_base,
  835. monc->m_auth->front_alloc_len);
  836. if (ret > 0) {
  837. __send_prepared_auth_request(monc, ret);
  838. goto out;
  839. }
  840. finish_hunting(monc);
  841. if (ret < 0) {
  842. monc->client->auth_err = ret;
  843. } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
  844. dout("authenticated, starting session\n");
  845. monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
  846. monc->client->msgr.inst.name.num =
  847. cpu_to_le64(monc->auth->global_id);
  848. __send_subscribe(monc);
  849. __resend_generic_request(monc);
  850. pr_info("mon%d %s session established\n", monc->cur_mon,
  851. ceph_pr_addr(&monc->con.peer_addr.in_addr));
  852. }
  853. out:
  854. mutex_unlock(&monc->mutex);
  855. if (monc->client->auth_err < 0)
  856. wake_up_all(&monc->client->auth_wq);
  857. }
  858. static int __validate_auth(struct ceph_mon_client *monc)
  859. {
  860. int ret;
  861. if (monc->pending_auth)
  862. return 0;
  863. ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
  864. monc->m_auth->front_alloc_len);
  865. if (ret <= 0)
  866. return ret; /* either an error, or no need to authenticate */
  867. __send_prepared_auth_request(monc, ret);
  868. return 0;
  869. }
  870. int ceph_monc_validate_auth(struct ceph_mon_client *monc)
  871. {
  872. int ret;
  873. mutex_lock(&monc->mutex);
  874. ret = __validate_auth(monc);
  875. mutex_unlock(&monc->mutex);
  876. return ret;
  877. }
  878. EXPORT_SYMBOL(ceph_monc_validate_auth);
  879. /*
  880. * handle incoming message
  881. */
  882. static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  883. {
  884. struct ceph_mon_client *monc = con->private;
  885. int type = le16_to_cpu(msg->hdr.type);
  886. if (!monc)
  887. return;
  888. switch (type) {
  889. case CEPH_MSG_AUTH_REPLY:
  890. handle_auth_reply(monc, msg);
  891. break;
  892. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  893. handle_subscribe_ack(monc, msg);
  894. break;
  895. case CEPH_MSG_STATFS_REPLY:
  896. handle_statfs_reply(monc, msg);
  897. break;
  898. case CEPH_MSG_MON_GET_VERSION_REPLY:
  899. handle_get_version_reply(monc, msg);
  900. break;
  901. case CEPH_MSG_MON_MAP:
  902. ceph_monc_handle_map(monc, msg);
  903. break;
  904. case CEPH_MSG_OSD_MAP:
  905. ceph_osdc_handle_map(&monc->client->osdc, msg);
  906. break;
  907. default:
  908. /* can the chained handler handle it? */
  909. if (monc->client->extra_mon_dispatch &&
  910. monc->client->extra_mon_dispatch(monc->client, msg) == 0)
  911. break;
  912. pr_err("received unknown message type %d %s\n", type,
  913. ceph_msg_type_name(type));
  914. }
  915. ceph_msg_put(msg);
  916. }
  917. /*
  918. * Allocate memory for incoming message
  919. */
  920. static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
  921. struct ceph_msg_header *hdr,
  922. int *skip)
  923. {
  924. struct ceph_mon_client *monc = con->private;
  925. int type = le16_to_cpu(hdr->type);
  926. int front_len = le32_to_cpu(hdr->front_len);
  927. struct ceph_msg *m = NULL;
  928. *skip = 0;
  929. switch (type) {
  930. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  931. m = ceph_msg_get(monc->m_subscribe_ack);
  932. break;
  933. case CEPH_MSG_STATFS_REPLY:
  934. return get_generic_reply(con, hdr, skip);
  935. case CEPH_MSG_AUTH_REPLY:
  936. m = ceph_msg_get(monc->m_auth_reply);
  937. break;
  938. case CEPH_MSG_MON_GET_VERSION_REPLY:
  939. if (le64_to_cpu(hdr->tid) != 0)
  940. return get_generic_reply(con, hdr, skip);
  941. /*
  942. * Older OSDs don't set reply tid even if the orignal
  943. * request had a non-zero tid. Workaround this weirdness
  944. * by falling through to the allocate case.
  945. */
  946. case CEPH_MSG_MON_MAP:
  947. case CEPH_MSG_MDS_MAP:
  948. case CEPH_MSG_OSD_MAP:
  949. m = ceph_msg_new(type, front_len, GFP_NOFS, false);
  950. if (!m)
  951. return NULL; /* ENOMEM--return skip == 0 */
  952. break;
  953. }
  954. if (!m) {
  955. pr_info("alloc_msg unknown type %d\n", type);
  956. *skip = 1;
  957. } else if (front_len > m->front_alloc_len) {
  958. pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
  959. front_len, m->front_alloc_len,
  960. (unsigned int)con->peer_name.type,
  961. le64_to_cpu(con->peer_name.num));
  962. ceph_msg_put(m);
  963. m = ceph_msg_new(type, front_len, GFP_NOFS, false);
  964. }
  965. return m;
  966. }
  967. /*
  968. * If the monitor connection resets, pick a new monitor and resubmit
  969. * any pending requests.
  970. */
  971. static void mon_fault(struct ceph_connection *con)
  972. {
  973. struct ceph_mon_client *monc = con->private;
  974. mutex_lock(&monc->mutex);
  975. dout("%s mon%d\n", __func__, monc->cur_mon);
  976. if (monc->cur_mon >= 0) {
  977. if (!monc->hunting) {
  978. dout("%s hunting for new mon\n", __func__);
  979. reopen_session(monc);
  980. __schedule_delayed(monc);
  981. } else {
  982. dout("%s already hunting\n", __func__);
  983. }
  984. }
  985. mutex_unlock(&monc->mutex);
  986. }
  987. /*
  988. * We can ignore refcounting on the connection struct, as all references
  989. * will come from the messenger workqueue, which is drained prior to
  990. * mon_client destruction.
  991. */
  992. static struct ceph_connection *con_get(struct ceph_connection *con)
  993. {
  994. return con;
  995. }
  996. static void con_put(struct ceph_connection *con)
  997. {
  998. }
  999. static const struct ceph_connection_operations mon_con_ops = {
  1000. .get = con_get,
  1001. .put = con_put,
  1002. .dispatch = dispatch,
  1003. .fault = mon_fault,
  1004. .alloc_msg = mon_alloc_msg,
  1005. };