mon_client.c 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347
  1. #include <linux/ceph/ceph_debug.h>
  2. #include <linux/module.h>
  3. #include <linux/types.h>
  4. #include <linux/slab.h>
  5. #include <linux/random.h>
  6. #include <linux/sched.h>
  7. #include <linux/ceph/ceph_features.h>
  8. #include <linux/ceph/mon_client.h>
  9. #include <linux/ceph/libceph.h>
  10. #include <linux/ceph/debugfs.h>
  11. #include <linux/ceph/decode.h>
  12. #include <linux/ceph/auth.h>
  13. /*
  14. * Interact with Ceph monitor cluster. Handle requests for new map
  15. * versions, and periodically resend as needed. Also implement
  16. * statfs() and umount().
  17. *
  18. * A small cluster of Ceph "monitors" are responsible for managing critical
  19. * cluster configuration and state information. An odd number (e.g., 3, 5)
  20. * of cmon daemons use a modified version of the Paxos part-time parliament
  21. * algorithm to manage the MDS map (mds cluster membership), OSD map, and
  22. * list of clients who have mounted the file system.
  23. *
  24. * We maintain an open, active session with a monitor at all times in order to
  25. * receive timely MDSMap updates. We periodically send a keepalive byte on the
  26. * TCP socket to ensure we detect a failure. If the connection does break, we
  27. * randomly hunt for a new monitor. Once the connection is reestablished, we
  28. * resend any outstanding requests.
  29. */
  30. static const struct ceph_connection_operations mon_con_ops;
  31. static int __validate_auth(struct ceph_mon_client *monc);
  32. /*
  33. * Decode a monmap blob (e.g., during mount).
  34. */
  35. struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
  36. {
  37. struct ceph_monmap *m = NULL;
  38. int i, err = -EINVAL;
  39. struct ceph_fsid fsid;
  40. u32 epoch, num_mon;
  41. u32 len;
  42. ceph_decode_32_safe(&p, end, len, bad);
  43. ceph_decode_need(&p, end, len, bad);
  44. dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
  45. p += sizeof(u16); /* skip version */
  46. ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
  47. ceph_decode_copy(&p, &fsid, sizeof(fsid));
  48. epoch = ceph_decode_32(&p);
  49. num_mon = ceph_decode_32(&p);
  50. ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
  51. if (num_mon >= CEPH_MAX_MON)
  52. goto bad;
  53. m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
  54. if (m == NULL)
  55. return ERR_PTR(-ENOMEM);
  56. m->fsid = fsid;
  57. m->epoch = epoch;
  58. m->num_mon = num_mon;
  59. ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
  60. for (i = 0; i < num_mon; i++)
  61. ceph_decode_addr(&m->mon_inst[i].addr);
  62. dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
  63. m->num_mon);
  64. for (i = 0; i < m->num_mon; i++)
  65. dout("monmap_decode mon%d is %s\n", i,
  66. ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
  67. return m;
  68. bad:
  69. dout("monmap_decode failed with %d\n", err);
  70. kfree(m);
  71. return ERR_PTR(err);
  72. }
  73. /*
  74. * return true if *addr is included in the monmap.
  75. */
  76. int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
  77. {
  78. int i;
  79. for (i = 0; i < m->num_mon; i++)
  80. if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
  81. return 1;
  82. return 0;
  83. }
  84. /*
  85. * Send an auth request.
  86. */
  87. static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
  88. {
  89. monc->pending_auth = 1;
  90. monc->m_auth->front.iov_len = len;
  91. monc->m_auth->hdr.front_len = cpu_to_le32(len);
  92. ceph_msg_revoke(monc->m_auth);
  93. ceph_msg_get(monc->m_auth); /* keep our ref */
  94. ceph_con_send(&monc->con, monc->m_auth);
  95. }
  96. /*
  97. * Close monitor session, if any.
  98. */
  99. static void __close_session(struct ceph_mon_client *monc)
  100. {
  101. dout("__close_session closing mon%d\n", monc->cur_mon);
  102. ceph_msg_revoke(monc->m_auth);
  103. ceph_msg_revoke_incoming(monc->m_auth_reply);
  104. ceph_msg_revoke(monc->m_subscribe);
  105. ceph_msg_revoke_incoming(monc->m_subscribe_ack);
  106. ceph_con_close(&monc->con);
  107. monc->pending_auth = 0;
  108. ceph_auth_reset(monc->auth);
  109. }
  110. /*
  111. * Pick a new monitor at random and set cur_mon. If we are repicking
  112. * (i.e. cur_mon is already set), be sure to pick a different one.
  113. */
  114. static void pick_new_mon(struct ceph_mon_client *monc)
  115. {
  116. int old_mon = monc->cur_mon;
  117. BUG_ON(monc->monmap->num_mon < 1);
  118. if (monc->monmap->num_mon == 1) {
  119. monc->cur_mon = 0;
  120. } else {
  121. int max = monc->monmap->num_mon;
  122. int o = -1;
  123. int n;
  124. if (monc->cur_mon >= 0) {
  125. if (monc->cur_mon < monc->monmap->num_mon)
  126. o = monc->cur_mon;
  127. if (o >= 0)
  128. max--;
  129. }
  130. n = prandom_u32() % max;
  131. if (o >= 0 && n >= o)
  132. n++;
  133. monc->cur_mon = n;
  134. }
  135. dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
  136. monc->cur_mon, monc->monmap->num_mon);
  137. }
  138. /*
  139. * Open a session with a new monitor.
  140. */
  141. static void __open_session(struct ceph_mon_client *monc)
  142. {
  143. int ret;
  144. pick_new_mon(monc);
  145. monc->hunting = true;
  146. if (monc->had_a_connection) {
  147. monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
  148. if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
  149. monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
  150. }
  151. monc->sub_renew_after = jiffies; /* i.e., expired */
  152. monc->sub_renew_sent = 0;
  153. dout("%s opening mon%d\n", __func__, monc->cur_mon);
  154. ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
  155. &monc->monmap->mon_inst[monc->cur_mon].addr);
  156. /*
  157. * send an initial keepalive to ensure our timestamp is valid
  158. * by the time we are in an OPENED state
  159. */
  160. ceph_con_keepalive(&monc->con);
  161. /* initiate authentication handshake */
  162. ret = ceph_auth_build_hello(monc->auth,
  163. monc->m_auth->front.iov_base,
  164. monc->m_auth->front_alloc_len);
  165. BUG_ON(ret <= 0);
  166. __send_prepared_auth_request(monc, ret);
  167. }
  168. static void reopen_session(struct ceph_mon_client *monc)
  169. {
  170. if (!monc->hunting)
  171. pr_info("mon%d %s session lost, hunting for new mon\n",
  172. monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
  173. __close_session(monc);
  174. __open_session(monc);
  175. }
  176. /*
  177. * Reschedule delayed work timer.
  178. */
  179. static void __schedule_delayed(struct ceph_mon_client *monc)
  180. {
  181. unsigned long delay;
  182. if (monc->hunting)
  183. delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
  184. else
  185. delay = CEPH_MONC_PING_INTERVAL;
  186. dout("__schedule_delayed after %lu\n", delay);
  187. mod_delayed_work(system_wq, &monc->delayed_work,
  188. round_jiffies_relative(delay));
  189. }
  190. const char *ceph_sub_str[] = {
  191. [CEPH_SUB_MONMAP] = "monmap",
  192. [CEPH_SUB_OSDMAP] = "osdmap",
  193. [CEPH_SUB_FSMAP] = "fsmap.user",
  194. [CEPH_SUB_MDSMAP] = "mdsmap",
  195. };
  196. /*
  197. * Send subscribe request for one or more maps, according to
  198. * monc->subs.
  199. */
  200. static void __send_subscribe(struct ceph_mon_client *monc)
  201. {
  202. struct ceph_msg *msg = monc->m_subscribe;
  203. void *p = msg->front.iov_base;
  204. void *const end = p + msg->front_alloc_len;
  205. int num = 0;
  206. int i;
  207. dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
  208. BUG_ON(monc->cur_mon < 0);
  209. if (!monc->sub_renew_sent)
  210. monc->sub_renew_sent = jiffies | 1; /* never 0 */
  211. msg->hdr.version = cpu_to_le16(2);
  212. for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
  213. if (monc->subs[i].want)
  214. num++;
  215. }
  216. BUG_ON(num < 1); /* monmap sub is always there */
  217. ceph_encode_32(&p, num);
  218. for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
  219. char buf[32];
  220. int len;
  221. if (!monc->subs[i].want)
  222. continue;
  223. len = sprintf(buf, "%s", ceph_sub_str[i]);
  224. if (i == CEPH_SUB_MDSMAP &&
  225. monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
  226. len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
  227. dout("%s %s start %llu flags 0x%x\n", __func__, buf,
  228. le64_to_cpu(monc->subs[i].item.start),
  229. monc->subs[i].item.flags);
  230. ceph_encode_string(&p, end, buf, len);
  231. memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
  232. p += sizeof(monc->subs[i].item);
  233. }
  234. BUG_ON(p > end);
  235. msg->front.iov_len = p - msg->front.iov_base;
  236. msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
  237. ceph_msg_revoke(msg);
  238. ceph_con_send(&monc->con, ceph_msg_get(msg));
  239. }
  240. static void handle_subscribe_ack(struct ceph_mon_client *monc,
  241. struct ceph_msg *msg)
  242. {
  243. unsigned int seconds;
  244. struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
  245. if (msg->front.iov_len < sizeof(*h))
  246. goto bad;
  247. seconds = le32_to_cpu(h->duration);
  248. mutex_lock(&monc->mutex);
  249. if (monc->sub_renew_sent) {
  250. /*
  251. * This is only needed for legacy (infernalis or older)
  252. * MONs -- see delayed_work().
  253. */
  254. monc->sub_renew_after = monc->sub_renew_sent +
  255. (seconds >> 1) * HZ - 1;
  256. dout("%s sent %lu duration %d renew after %lu\n", __func__,
  257. monc->sub_renew_sent, seconds, monc->sub_renew_after);
  258. monc->sub_renew_sent = 0;
  259. } else {
  260. dout("%s sent %lu renew after %lu, ignoring\n", __func__,
  261. monc->sub_renew_sent, monc->sub_renew_after);
  262. }
  263. mutex_unlock(&monc->mutex);
  264. return;
  265. bad:
  266. pr_err("got corrupt subscribe-ack msg\n");
  267. ceph_msg_dump(msg);
  268. }
  269. /*
  270. * Register interest in a map
  271. *
  272. * @sub: one of CEPH_SUB_*
  273. * @epoch: X for "every map since X", or 0 for "just the latest"
  274. */
  275. static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
  276. u32 epoch, bool continuous)
  277. {
  278. __le64 start = cpu_to_le64(epoch);
  279. u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
  280. dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
  281. epoch, continuous);
  282. if (monc->subs[sub].want &&
  283. monc->subs[sub].item.start == start &&
  284. monc->subs[sub].item.flags == flags)
  285. return false;
  286. monc->subs[sub].item.start = start;
  287. monc->subs[sub].item.flags = flags;
  288. monc->subs[sub].want = true;
  289. return true;
  290. }
  291. bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
  292. bool continuous)
  293. {
  294. bool need_request;
  295. mutex_lock(&monc->mutex);
  296. need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
  297. mutex_unlock(&monc->mutex);
  298. return need_request;
  299. }
  300. EXPORT_SYMBOL(ceph_monc_want_map);
  301. /*
  302. * Keep track of which maps we have
  303. *
  304. * @sub: one of CEPH_SUB_*
  305. */
  306. static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
  307. u32 epoch)
  308. {
  309. dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
  310. if (monc->subs[sub].want) {
  311. if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
  312. monc->subs[sub].want = false;
  313. else
  314. monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
  315. }
  316. monc->subs[sub].have = epoch;
  317. }
  318. void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
  319. {
  320. mutex_lock(&monc->mutex);
  321. __ceph_monc_got_map(monc, sub, epoch);
  322. mutex_unlock(&monc->mutex);
  323. }
  324. EXPORT_SYMBOL(ceph_monc_got_map);
  325. void ceph_monc_renew_subs(struct ceph_mon_client *monc)
  326. {
  327. mutex_lock(&monc->mutex);
  328. __send_subscribe(monc);
  329. mutex_unlock(&monc->mutex);
  330. }
  331. EXPORT_SYMBOL(ceph_monc_renew_subs);
  332. /*
  333. * Wait for an osdmap with a given epoch.
  334. *
  335. * @epoch: epoch to wait for
  336. * @timeout: in jiffies, 0 means "wait forever"
  337. */
  338. int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
  339. unsigned long timeout)
  340. {
  341. unsigned long started = jiffies;
  342. long ret;
  343. mutex_lock(&monc->mutex);
  344. while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
  345. mutex_unlock(&monc->mutex);
  346. if (timeout && time_after_eq(jiffies, started + timeout))
  347. return -ETIMEDOUT;
  348. ret = wait_event_interruptible_timeout(monc->client->auth_wq,
  349. monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
  350. ceph_timeout_jiffies(timeout));
  351. if (ret < 0)
  352. return ret;
  353. mutex_lock(&monc->mutex);
  354. }
  355. mutex_unlock(&monc->mutex);
  356. return 0;
  357. }
  358. EXPORT_SYMBOL(ceph_monc_wait_osdmap);
  359. /*
  360. * Open a session with a random monitor. Request monmap and osdmap,
  361. * which are waited upon in __ceph_open_session().
  362. */
  363. int ceph_monc_open_session(struct ceph_mon_client *monc)
  364. {
  365. mutex_lock(&monc->mutex);
  366. __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
  367. __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
  368. __open_session(monc);
  369. __schedule_delayed(monc);
  370. mutex_unlock(&monc->mutex);
  371. return 0;
  372. }
  373. EXPORT_SYMBOL(ceph_monc_open_session);
  374. static void ceph_monc_handle_map(struct ceph_mon_client *monc,
  375. struct ceph_msg *msg)
  376. {
  377. struct ceph_client *client = monc->client;
  378. struct ceph_monmap *monmap = NULL, *old = monc->monmap;
  379. void *p, *end;
  380. mutex_lock(&monc->mutex);
  381. dout("handle_monmap\n");
  382. p = msg->front.iov_base;
  383. end = p + msg->front.iov_len;
  384. monmap = ceph_monmap_decode(p, end);
  385. if (IS_ERR(monmap)) {
  386. pr_err("problem decoding monmap, %d\n",
  387. (int)PTR_ERR(monmap));
  388. goto out;
  389. }
  390. if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
  391. kfree(monmap);
  392. goto out;
  393. }
  394. client->monc.monmap = monmap;
  395. kfree(old);
  396. __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
  397. client->have_fsid = true;
  398. out:
  399. mutex_unlock(&monc->mutex);
  400. wake_up_all(&client->auth_wq);
  401. }
  402. /*
  403. * generic requests (currently statfs, mon_get_version)
  404. */
  405. DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
  406. static void release_generic_request(struct kref *kref)
  407. {
  408. struct ceph_mon_generic_request *req =
  409. container_of(kref, struct ceph_mon_generic_request, kref);
  410. dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
  411. req->reply);
  412. WARN_ON(!RB_EMPTY_NODE(&req->node));
  413. if (req->reply)
  414. ceph_msg_put(req->reply);
  415. if (req->request)
  416. ceph_msg_put(req->request);
  417. kfree(req);
  418. }
  419. static void put_generic_request(struct ceph_mon_generic_request *req)
  420. {
  421. if (req)
  422. kref_put(&req->kref, release_generic_request);
  423. }
  424. static void get_generic_request(struct ceph_mon_generic_request *req)
  425. {
  426. kref_get(&req->kref);
  427. }
  428. static struct ceph_mon_generic_request *
  429. alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
  430. {
  431. struct ceph_mon_generic_request *req;
  432. req = kzalloc(sizeof(*req), gfp);
  433. if (!req)
  434. return NULL;
  435. req->monc = monc;
  436. kref_init(&req->kref);
  437. RB_CLEAR_NODE(&req->node);
  438. init_completion(&req->completion);
  439. dout("%s greq %p\n", __func__, req);
  440. return req;
  441. }
  442. static void register_generic_request(struct ceph_mon_generic_request *req)
  443. {
  444. struct ceph_mon_client *monc = req->monc;
  445. WARN_ON(req->tid);
  446. get_generic_request(req);
  447. req->tid = ++monc->last_tid;
  448. insert_generic_request(&monc->generic_request_tree, req);
  449. }
  450. static void send_generic_request(struct ceph_mon_client *monc,
  451. struct ceph_mon_generic_request *req)
  452. {
  453. WARN_ON(!req->tid);
  454. dout("%s greq %p tid %llu\n", __func__, req, req->tid);
  455. req->request->hdr.tid = cpu_to_le64(req->tid);
  456. ceph_con_send(&monc->con, ceph_msg_get(req->request));
  457. }
  458. static void __finish_generic_request(struct ceph_mon_generic_request *req)
  459. {
  460. struct ceph_mon_client *monc = req->monc;
  461. dout("%s greq %p tid %llu\n", __func__, req, req->tid);
  462. erase_generic_request(&monc->generic_request_tree, req);
  463. ceph_msg_revoke(req->request);
  464. ceph_msg_revoke_incoming(req->reply);
  465. }
  466. static void finish_generic_request(struct ceph_mon_generic_request *req)
  467. {
  468. __finish_generic_request(req);
  469. put_generic_request(req);
  470. }
  471. static void complete_generic_request(struct ceph_mon_generic_request *req)
  472. {
  473. if (req->complete_cb)
  474. req->complete_cb(req);
  475. else
  476. complete_all(&req->completion);
  477. put_generic_request(req);
  478. }
  479. static void cancel_generic_request(struct ceph_mon_generic_request *req)
  480. {
  481. struct ceph_mon_client *monc = req->monc;
  482. struct ceph_mon_generic_request *lookup_req;
  483. dout("%s greq %p tid %llu\n", __func__, req, req->tid);
  484. mutex_lock(&monc->mutex);
  485. lookup_req = lookup_generic_request(&monc->generic_request_tree,
  486. req->tid);
  487. if (lookup_req) {
  488. WARN_ON(lookup_req != req);
  489. finish_generic_request(req);
  490. }
  491. mutex_unlock(&monc->mutex);
  492. }
  493. static int wait_generic_request(struct ceph_mon_generic_request *req)
  494. {
  495. int ret;
  496. dout("%s greq %p tid %llu\n", __func__, req, req->tid);
  497. ret = wait_for_completion_interruptible(&req->completion);
  498. if (ret)
  499. cancel_generic_request(req);
  500. else
  501. ret = req->result; /* completed */
  502. return ret;
  503. }
  504. static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
  505. struct ceph_msg_header *hdr,
  506. int *skip)
  507. {
  508. struct ceph_mon_client *monc = con->private;
  509. struct ceph_mon_generic_request *req;
  510. u64 tid = le64_to_cpu(hdr->tid);
  511. struct ceph_msg *m;
  512. mutex_lock(&monc->mutex);
  513. req = lookup_generic_request(&monc->generic_request_tree, tid);
  514. if (!req) {
  515. dout("get_generic_reply %lld dne\n", tid);
  516. *skip = 1;
  517. m = NULL;
  518. } else {
  519. dout("get_generic_reply %lld got %p\n", tid, req->reply);
  520. *skip = 0;
  521. m = ceph_msg_get(req->reply);
  522. /*
  523. * we don't need to track the connection reading into
  524. * this reply because we only have one open connection
  525. * at a time, ever.
  526. */
  527. }
  528. mutex_unlock(&monc->mutex);
  529. return m;
  530. }
  531. /*
  532. * statfs
  533. */
  534. static void handle_statfs_reply(struct ceph_mon_client *monc,
  535. struct ceph_msg *msg)
  536. {
  537. struct ceph_mon_generic_request *req;
  538. struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
  539. u64 tid = le64_to_cpu(msg->hdr.tid);
  540. dout("%s msg %p tid %llu\n", __func__, msg, tid);
  541. if (msg->front.iov_len != sizeof(*reply))
  542. goto bad;
  543. mutex_lock(&monc->mutex);
  544. req = lookup_generic_request(&monc->generic_request_tree, tid);
  545. if (!req) {
  546. mutex_unlock(&monc->mutex);
  547. return;
  548. }
  549. req->result = 0;
  550. *req->u.st = reply->st; /* struct */
  551. __finish_generic_request(req);
  552. mutex_unlock(&monc->mutex);
  553. complete_generic_request(req);
  554. return;
  555. bad:
  556. pr_err("corrupt statfs reply, tid %llu\n", tid);
  557. ceph_msg_dump(msg);
  558. }
  559. /*
  560. * Do a synchronous statfs().
  561. */
  562. int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
  563. {
  564. struct ceph_mon_generic_request *req;
  565. struct ceph_mon_statfs *h;
  566. int ret = -ENOMEM;
  567. req = alloc_generic_request(monc, GFP_NOFS);
  568. if (!req)
  569. goto out;
  570. req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
  571. true);
  572. if (!req->request)
  573. goto out;
  574. req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
  575. if (!req->reply)
  576. goto out;
  577. req->u.st = buf;
  578. mutex_lock(&monc->mutex);
  579. register_generic_request(req);
  580. /* fill out request */
  581. h = req->request->front.iov_base;
  582. h->monhdr.have_version = 0;
  583. h->monhdr.session_mon = cpu_to_le16(-1);
  584. h->monhdr.session_mon_tid = 0;
  585. h->fsid = monc->monmap->fsid;
  586. send_generic_request(monc, req);
  587. mutex_unlock(&monc->mutex);
  588. ret = wait_generic_request(req);
  589. out:
  590. put_generic_request(req);
  591. return ret;
  592. }
  593. EXPORT_SYMBOL(ceph_monc_do_statfs);
  594. static void handle_get_version_reply(struct ceph_mon_client *monc,
  595. struct ceph_msg *msg)
  596. {
  597. struct ceph_mon_generic_request *req;
  598. u64 tid = le64_to_cpu(msg->hdr.tid);
  599. void *p = msg->front.iov_base;
  600. void *end = p + msg->front_alloc_len;
  601. u64 handle;
  602. dout("%s msg %p tid %llu\n", __func__, msg, tid);
  603. ceph_decode_need(&p, end, 2*sizeof(u64), bad);
  604. handle = ceph_decode_64(&p);
  605. if (tid != 0 && tid != handle)
  606. goto bad;
  607. mutex_lock(&monc->mutex);
  608. req = lookup_generic_request(&monc->generic_request_tree, handle);
  609. if (!req) {
  610. mutex_unlock(&monc->mutex);
  611. return;
  612. }
  613. req->result = 0;
  614. req->u.newest = ceph_decode_64(&p);
  615. __finish_generic_request(req);
  616. mutex_unlock(&monc->mutex);
  617. complete_generic_request(req);
  618. return;
  619. bad:
  620. pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
  621. ceph_msg_dump(msg);
  622. }
  623. static struct ceph_mon_generic_request *
  624. __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
  625. ceph_monc_callback_t cb, u64 private_data)
  626. {
  627. struct ceph_mon_generic_request *req;
  628. req = alloc_generic_request(monc, GFP_NOIO);
  629. if (!req)
  630. goto err_put_req;
  631. req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
  632. sizeof(u64) + sizeof(u32) + strlen(what),
  633. GFP_NOIO, true);
  634. if (!req->request)
  635. goto err_put_req;
  636. req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
  637. true);
  638. if (!req->reply)
  639. goto err_put_req;
  640. req->complete_cb = cb;
  641. req->private_data = private_data;
  642. mutex_lock(&monc->mutex);
  643. register_generic_request(req);
  644. {
  645. void *p = req->request->front.iov_base;
  646. void *const end = p + req->request->front_alloc_len;
  647. ceph_encode_64(&p, req->tid); /* handle */
  648. ceph_encode_string(&p, end, what, strlen(what));
  649. WARN_ON(p != end);
  650. }
  651. send_generic_request(monc, req);
  652. mutex_unlock(&monc->mutex);
  653. return req;
  654. err_put_req:
  655. put_generic_request(req);
  656. return ERR_PTR(-ENOMEM);
  657. }
  658. /*
  659. * Send MMonGetVersion and wait for the reply.
  660. *
  661. * @what: one of "mdsmap", "osdmap" or "monmap"
  662. */
  663. int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
  664. u64 *newest)
  665. {
  666. struct ceph_mon_generic_request *req;
  667. int ret;
  668. req = __ceph_monc_get_version(monc, what, NULL, 0);
  669. if (IS_ERR(req))
  670. return PTR_ERR(req);
  671. ret = wait_generic_request(req);
  672. if (!ret)
  673. *newest = req->u.newest;
  674. put_generic_request(req);
  675. return ret;
  676. }
  677. EXPORT_SYMBOL(ceph_monc_get_version);
  678. /*
  679. * Send MMonGetVersion,
  680. *
  681. * @what: one of "mdsmap", "osdmap" or "monmap"
  682. */
  683. int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
  684. ceph_monc_callback_t cb, u64 private_data)
  685. {
  686. struct ceph_mon_generic_request *req;
  687. req = __ceph_monc_get_version(monc, what, cb, private_data);
  688. if (IS_ERR(req))
  689. return PTR_ERR(req);
  690. put_generic_request(req);
  691. return 0;
  692. }
  693. EXPORT_SYMBOL(ceph_monc_get_version_async);
  694. static void handle_command_ack(struct ceph_mon_client *monc,
  695. struct ceph_msg *msg)
  696. {
  697. struct ceph_mon_generic_request *req;
  698. void *p = msg->front.iov_base;
  699. void *const end = p + msg->front_alloc_len;
  700. u64 tid = le64_to_cpu(msg->hdr.tid);
  701. dout("%s msg %p tid %llu\n", __func__, msg, tid);
  702. ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
  703. sizeof(u32), bad);
  704. p += sizeof(struct ceph_mon_request_header);
  705. mutex_lock(&monc->mutex);
  706. req = lookup_generic_request(&monc->generic_request_tree, tid);
  707. if (!req) {
  708. mutex_unlock(&monc->mutex);
  709. return;
  710. }
  711. req->result = ceph_decode_32(&p);
  712. __finish_generic_request(req);
  713. mutex_unlock(&monc->mutex);
  714. complete_generic_request(req);
  715. return;
  716. bad:
  717. pr_err("corrupt mon_command ack, tid %llu\n", tid);
  718. ceph_msg_dump(msg);
  719. }
  720. int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
  721. struct ceph_entity_addr *client_addr)
  722. {
  723. struct ceph_mon_generic_request *req;
  724. struct ceph_mon_command *h;
  725. int ret = -ENOMEM;
  726. int len;
  727. req = alloc_generic_request(monc, GFP_NOIO);
  728. if (!req)
  729. goto out;
  730. req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
  731. if (!req->request)
  732. goto out;
  733. req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
  734. true);
  735. if (!req->reply)
  736. goto out;
  737. mutex_lock(&monc->mutex);
  738. register_generic_request(req);
  739. h = req->request->front.iov_base;
  740. h->monhdr.have_version = 0;
  741. h->monhdr.session_mon = cpu_to_le16(-1);
  742. h->monhdr.session_mon_tid = 0;
  743. h->fsid = monc->monmap->fsid;
  744. h->num_strs = cpu_to_le32(1);
  745. len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
  746. \"blacklistop\": \"add\", \
  747. \"addr\": \"%pISpc/%u\" }",
  748. &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
  749. h->str_len = cpu_to_le32(len);
  750. send_generic_request(monc, req);
  751. mutex_unlock(&monc->mutex);
  752. ret = wait_generic_request(req);
  753. out:
  754. put_generic_request(req);
  755. return ret;
  756. }
  757. EXPORT_SYMBOL(ceph_monc_blacklist_add);
  758. /*
  759. * Resend pending generic requests.
  760. */
  761. static void __resend_generic_request(struct ceph_mon_client *monc)
  762. {
  763. struct ceph_mon_generic_request *req;
  764. struct rb_node *p;
  765. for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
  766. req = rb_entry(p, struct ceph_mon_generic_request, node);
  767. ceph_msg_revoke(req->request);
  768. ceph_msg_revoke_incoming(req->reply);
  769. ceph_con_send(&monc->con, ceph_msg_get(req->request));
  770. }
  771. }
  772. /*
  773. * Delayed work. If we haven't mounted yet, retry. Otherwise,
  774. * renew/retry subscription as needed (in case it is timing out, or we
  775. * got an ENOMEM). And keep the monitor connection alive.
  776. */
  777. static void delayed_work(struct work_struct *work)
  778. {
  779. struct ceph_mon_client *monc =
  780. container_of(work, struct ceph_mon_client, delayed_work.work);
  781. dout("monc delayed_work\n");
  782. mutex_lock(&monc->mutex);
  783. if (monc->hunting) {
  784. dout("%s continuing hunt\n", __func__);
  785. reopen_session(monc);
  786. } else {
  787. int is_auth = ceph_auth_is_authenticated(monc->auth);
  788. if (ceph_con_keepalive_expired(&monc->con,
  789. CEPH_MONC_PING_TIMEOUT)) {
  790. dout("monc keepalive timeout\n");
  791. is_auth = 0;
  792. reopen_session(monc);
  793. }
  794. if (!monc->hunting) {
  795. ceph_con_keepalive(&monc->con);
  796. __validate_auth(monc);
  797. }
  798. if (is_auth &&
  799. !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
  800. unsigned long now = jiffies;
  801. dout("%s renew subs? now %lu renew after %lu\n",
  802. __func__, now, monc->sub_renew_after);
  803. if (time_after_eq(now, monc->sub_renew_after))
  804. __send_subscribe(monc);
  805. }
  806. }
  807. __schedule_delayed(monc);
  808. mutex_unlock(&monc->mutex);
  809. }
  810. /*
  811. * On startup, we build a temporary monmap populated with the IPs
  812. * provided by mount(2).
  813. */
  814. static int build_initial_monmap(struct ceph_mon_client *monc)
  815. {
  816. struct ceph_options *opt = monc->client->options;
  817. struct ceph_entity_addr *mon_addr = opt->mon_addr;
  818. int num_mon = opt->num_mon;
  819. int i;
  820. /* build initial monmap */
  821. monc->monmap = kzalloc(sizeof(*monc->monmap) +
  822. num_mon*sizeof(monc->monmap->mon_inst[0]),
  823. GFP_KERNEL);
  824. if (!monc->monmap)
  825. return -ENOMEM;
  826. for (i = 0; i < num_mon; i++) {
  827. monc->monmap->mon_inst[i].addr = mon_addr[i];
  828. monc->monmap->mon_inst[i].addr.nonce = 0;
  829. monc->monmap->mon_inst[i].name.type =
  830. CEPH_ENTITY_TYPE_MON;
  831. monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
  832. }
  833. monc->monmap->num_mon = num_mon;
  834. return 0;
  835. }
  836. int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
  837. {
  838. int err = 0;
  839. dout("init\n");
  840. memset(monc, 0, sizeof(*monc));
  841. monc->client = cl;
  842. monc->monmap = NULL;
  843. mutex_init(&monc->mutex);
  844. err = build_initial_monmap(monc);
  845. if (err)
  846. goto out;
  847. /* connection */
  848. /* authentication */
  849. monc->auth = ceph_auth_init(cl->options->name,
  850. cl->options->key);
  851. if (IS_ERR(monc->auth)) {
  852. err = PTR_ERR(monc->auth);
  853. goto out_monmap;
  854. }
  855. monc->auth->want_keys =
  856. CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
  857. CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
  858. /* msgs */
  859. err = -ENOMEM;
  860. monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
  861. sizeof(struct ceph_mon_subscribe_ack),
  862. GFP_KERNEL, true);
  863. if (!monc->m_subscribe_ack)
  864. goto out_auth;
  865. monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
  866. GFP_KERNEL, true);
  867. if (!monc->m_subscribe)
  868. goto out_subscribe_ack;
  869. monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
  870. GFP_KERNEL, true);
  871. if (!monc->m_auth_reply)
  872. goto out_subscribe;
  873. monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
  874. monc->pending_auth = 0;
  875. if (!monc->m_auth)
  876. goto out_auth_reply;
  877. ceph_con_init(&monc->con, monc, &mon_con_ops,
  878. &monc->client->msgr);
  879. monc->cur_mon = -1;
  880. monc->had_a_connection = false;
  881. monc->hunt_mult = 1;
  882. INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
  883. monc->generic_request_tree = RB_ROOT;
  884. monc->last_tid = 0;
  885. monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
  886. return 0;
  887. out_auth_reply:
  888. ceph_msg_put(monc->m_auth_reply);
  889. out_subscribe:
  890. ceph_msg_put(monc->m_subscribe);
  891. out_subscribe_ack:
  892. ceph_msg_put(monc->m_subscribe_ack);
  893. out_auth:
  894. ceph_auth_destroy(monc->auth);
  895. out_monmap:
  896. kfree(monc->monmap);
  897. out:
  898. return err;
  899. }
  900. EXPORT_SYMBOL(ceph_monc_init);
  901. void ceph_monc_stop(struct ceph_mon_client *monc)
  902. {
  903. dout("stop\n");
  904. cancel_delayed_work_sync(&monc->delayed_work);
  905. mutex_lock(&monc->mutex);
  906. __close_session(monc);
  907. monc->cur_mon = -1;
  908. mutex_unlock(&monc->mutex);
  909. /*
  910. * flush msgr queue before we destroy ourselves to ensure that:
  911. * - any work that references our embedded con is finished.
  912. * - any osd_client or other work that may reference an authorizer
  913. * finishes before we shut down the auth subsystem.
  914. */
  915. ceph_msgr_flush();
  916. ceph_auth_destroy(monc->auth);
  917. WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
  918. ceph_msg_put(monc->m_auth);
  919. ceph_msg_put(monc->m_auth_reply);
  920. ceph_msg_put(monc->m_subscribe);
  921. ceph_msg_put(monc->m_subscribe_ack);
  922. kfree(monc->monmap);
  923. }
  924. EXPORT_SYMBOL(ceph_monc_stop);
  925. static void finish_hunting(struct ceph_mon_client *monc)
  926. {
  927. if (monc->hunting) {
  928. dout("%s found mon%d\n", __func__, monc->cur_mon);
  929. monc->hunting = false;
  930. monc->had_a_connection = true;
  931. monc->hunt_mult /= 2; /* reduce by 50% */
  932. if (monc->hunt_mult < 1)
  933. monc->hunt_mult = 1;
  934. }
  935. }
  936. static void handle_auth_reply(struct ceph_mon_client *monc,
  937. struct ceph_msg *msg)
  938. {
  939. int ret;
  940. int was_auth = 0;
  941. mutex_lock(&monc->mutex);
  942. was_auth = ceph_auth_is_authenticated(monc->auth);
  943. monc->pending_auth = 0;
  944. ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
  945. msg->front.iov_len,
  946. monc->m_auth->front.iov_base,
  947. monc->m_auth->front_alloc_len);
  948. if (ret > 0) {
  949. __send_prepared_auth_request(monc, ret);
  950. goto out;
  951. }
  952. finish_hunting(monc);
  953. if (ret < 0) {
  954. monc->client->auth_err = ret;
  955. } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
  956. dout("authenticated, starting session\n");
  957. monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
  958. monc->client->msgr.inst.name.num =
  959. cpu_to_le64(monc->auth->global_id);
  960. __send_subscribe(monc);
  961. __resend_generic_request(monc);
  962. pr_info("mon%d %s session established\n", monc->cur_mon,
  963. ceph_pr_addr(&monc->con.peer_addr.in_addr));
  964. }
  965. out:
  966. mutex_unlock(&monc->mutex);
  967. if (monc->client->auth_err < 0)
  968. wake_up_all(&monc->client->auth_wq);
  969. }
  970. static int __validate_auth(struct ceph_mon_client *monc)
  971. {
  972. int ret;
  973. if (monc->pending_auth)
  974. return 0;
  975. ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
  976. monc->m_auth->front_alloc_len);
  977. if (ret <= 0)
  978. return ret; /* either an error, or no need to authenticate */
  979. __send_prepared_auth_request(monc, ret);
  980. return 0;
  981. }
  982. int ceph_monc_validate_auth(struct ceph_mon_client *monc)
  983. {
  984. int ret;
  985. mutex_lock(&monc->mutex);
  986. ret = __validate_auth(monc);
  987. mutex_unlock(&monc->mutex);
  988. return ret;
  989. }
  990. EXPORT_SYMBOL(ceph_monc_validate_auth);
  991. /*
  992. * handle incoming message
  993. */
  994. static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  995. {
  996. struct ceph_mon_client *monc = con->private;
  997. int type = le16_to_cpu(msg->hdr.type);
  998. if (!monc)
  999. return;
  1000. switch (type) {
  1001. case CEPH_MSG_AUTH_REPLY:
  1002. handle_auth_reply(monc, msg);
  1003. break;
  1004. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  1005. handle_subscribe_ack(monc, msg);
  1006. break;
  1007. case CEPH_MSG_STATFS_REPLY:
  1008. handle_statfs_reply(monc, msg);
  1009. break;
  1010. case CEPH_MSG_MON_GET_VERSION_REPLY:
  1011. handle_get_version_reply(monc, msg);
  1012. break;
  1013. case CEPH_MSG_MON_COMMAND_ACK:
  1014. handle_command_ack(monc, msg);
  1015. break;
  1016. case CEPH_MSG_MON_MAP:
  1017. ceph_monc_handle_map(monc, msg);
  1018. break;
  1019. case CEPH_MSG_OSD_MAP:
  1020. ceph_osdc_handle_map(&monc->client->osdc, msg);
  1021. break;
  1022. default:
  1023. /* can the chained handler handle it? */
  1024. if (monc->client->extra_mon_dispatch &&
  1025. monc->client->extra_mon_dispatch(monc->client, msg) == 0)
  1026. break;
  1027. pr_err("received unknown message type %d %s\n", type,
  1028. ceph_msg_type_name(type));
  1029. }
  1030. ceph_msg_put(msg);
  1031. }
  1032. /*
  1033. * Allocate memory for incoming message
  1034. */
  1035. static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
  1036. struct ceph_msg_header *hdr,
  1037. int *skip)
  1038. {
  1039. struct ceph_mon_client *monc = con->private;
  1040. int type = le16_to_cpu(hdr->type);
  1041. int front_len = le32_to_cpu(hdr->front_len);
  1042. struct ceph_msg *m = NULL;
  1043. *skip = 0;
  1044. switch (type) {
  1045. case CEPH_MSG_MON_SUBSCRIBE_ACK:
  1046. m = ceph_msg_get(monc->m_subscribe_ack);
  1047. break;
  1048. case CEPH_MSG_STATFS_REPLY:
  1049. case CEPH_MSG_MON_COMMAND_ACK:
  1050. return get_generic_reply(con, hdr, skip);
  1051. case CEPH_MSG_AUTH_REPLY:
  1052. m = ceph_msg_get(monc->m_auth_reply);
  1053. break;
  1054. case CEPH_MSG_MON_GET_VERSION_REPLY:
  1055. if (le64_to_cpu(hdr->tid) != 0)
  1056. return get_generic_reply(con, hdr, skip);
  1057. /*
  1058. * Older OSDs don't set reply tid even if the orignal
  1059. * request had a non-zero tid. Workaround this weirdness
  1060. * by falling through to the allocate case.
  1061. */
  1062. case CEPH_MSG_MON_MAP:
  1063. case CEPH_MSG_MDS_MAP:
  1064. case CEPH_MSG_OSD_MAP:
  1065. case CEPH_MSG_FS_MAP_USER:
  1066. m = ceph_msg_new(type, front_len, GFP_NOFS, false);
  1067. if (!m)
  1068. return NULL; /* ENOMEM--return skip == 0 */
  1069. break;
  1070. }
  1071. if (!m) {
  1072. pr_info("alloc_msg unknown type %d\n", type);
  1073. *skip = 1;
  1074. } else if (front_len > m->front_alloc_len) {
  1075. pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
  1076. front_len, m->front_alloc_len,
  1077. (unsigned int)con->peer_name.type,
  1078. le64_to_cpu(con->peer_name.num));
  1079. ceph_msg_put(m);
  1080. m = ceph_msg_new(type, front_len, GFP_NOFS, false);
  1081. }
  1082. return m;
  1083. }
  1084. /*
  1085. * If the monitor connection resets, pick a new monitor and resubmit
  1086. * any pending requests.
  1087. */
  1088. static void mon_fault(struct ceph_connection *con)
  1089. {
  1090. struct ceph_mon_client *monc = con->private;
  1091. mutex_lock(&monc->mutex);
  1092. dout("%s mon%d\n", __func__, monc->cur_mon);
  1093. if (monc->cur_mon >= 0) {
  1094. if (!monc->hunting) {
  1095. dout("%s hunting for new mon\n", __func__);
  1096. reopen_session(monc);
  1097. __schedule_delayed(monc);
  1098. } else {
  1099. dout("%s already hunting\n", __func__);
  1100. }
  1101. }
  1102. mutex_unlock(&monc->mutex);
  1103. }
  1104. /*
  1105. * We can ignore refcounting on the connection struct, as all references
  1106. * will come from the messenger workqueue, which is drained prior to
  1107. * mon_client destruction.
  1108. */
  1109. static struct ceph_connection *con_get(struct ceph_connection *con)
  1110. {
  1111. return con;
  1112. }
  1113. static void con_put(struct ceph_connection *con)
  1114. {
  1115. }
  1116. static const struct ceph_connection_operations mon_con_ops = {
  1117. .get = con_get,
  1118. .put = con_put,
  1119. .dispatch = dispatch,
  1120. .fault = mon_fault,
  1121. .alloc_msg = mon_alloc_msg,
  1122. };