123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347 |
- #include <linux/ceph/ceph_debug.h>
- #include <linux/module.h>
- #include <linux/types.h>
- #include <linux/slab.h>
- #include <linux/random.h>
- #include <linux/sched.h>
- #include <linux/ceph/ceph_features.h>
- #include <linux/ceph/mon_client.h>
- #include <linux/ceph/libceph.h>
- #include <linux/ceph/debugfs.h>
- #include <linux/ceph/decode.h>
- #include <linux/ceph/auth.h>
- /*
- * Interact with Ceph monitor cluster. Handle requests for new map
- * versions, and periodically resend as needed. Also implement
- * statfs() and umount().
- *
- * A small cluster of Ceph "monitors" are responsible for managing critical
- * cluster configuration and state information. An odd number (e.g., 3, 5)
- * of cmon daemons use a modified version of the Paxos part-time parliament
- * algorithm to manage the MDS map (mds cluster membership), OSD map, and
- * list of clients who have mounted the file system.
- *
- * We maintain an open, active session with a monitor at all times in order to
- * receive timely MDSMap updates. We periodically send a keepalive byte on the
- * TCP socket to ensure we detect a failure. If the connection does break, we
- * randomly hunt for a new monitor. Once the connection is reestablished, we
- * resend any outstanding requests.
- */
- static const struct ceph_connection_operations mon_con_ops;
- static int __validate_auth(struct ceph_mon_client *monc);
- /*
- * Decode a monmap blob (e.g., during mount).
- */
- struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
- {
- struct ceph_monmap *m = NULL;
- int i, err = -EINVAL;
- struct ceph_fsid fsid;
- u32 epoch, num_mon;
- u32 len;
- ceph_decode_32_safe(&p, end, len, bad);
- ceph_decode_need(&p, end, len, bad);
- dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
- p += sizeof(u16); /* skip version */
- ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
- ceph_decode_copy(&p, &fsid, sizeof(fsid));
- epoch = ceph_decode_32(&p);
- num_mon = ceph_decode_32(&p);
- ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
- if (num_mon >= CEPH_MAX_MON)
- goto bad;
- m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
- if (m == NULL)
- return ERR_PTR(-ENOMEM);
- m->fsid = fsid;
- m->epoch = epoch;
- m->num_mon = num_mon;
- ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
- for (i = 0; i < num_mon; i++)
- ceph_decode_addr(&m->mon_inst[i].addr);
- dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
- m->num_mon);
- for (i = 0; i < m->num_mon; i++)
- dout("monmap_decode mon%d is %s\n", i,
- ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
- return m;
- bad:
- dout("monmap_decode failed with %d\n", err);
- kfree(m);
- return ERR_PTR(err);
- }
- /*
- * return true if *addr is included in the monmap.
- */
- int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
- {
- int i;
- for (i = 0; i < m->num_mon; i++)
- if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
- return 1;
- return 0;
- }
- /*
- * Send an auth request.
- */
- static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
- {
- monc->pending_auth = 1;
- monc->m_auth->front.iov_len = len;
- monc->m_auth->hdr.front_len = cpu_to_le32(len);
- ceph_msg_revoke(monc->m_auth);
- ceph_msg_get(monc->m_auth); /* keep our ref */
- ceph_con_send(&monc->con, monc->m_auth);
- }
- /*
- * Close monitor session, if any.
- */
- static void __close_session(struct ceph_mon_client *monc)
- {
- dout("__close_session closing mon%d\n", monc->cur_mon);
- ceph_msg_revoke(monc->m_auth);
- ceph_msg_revoke_incoming(monc->m_auth_reply);
- ceph_msg_revoke(monc->m_subscribe);
- ceph_msg_revoke_incoming(monc->m_subscribe_ack);
- ceph_con_close(&monc->con);
- monc->pending_auth = 0;
- ceph_auth_reset(monc->auth);
- }
- /*
- * Pick a new monitor at random and set cur_mon. If we are repicking
- * (i.e. cur_mon is already set), be sure to pick a different one.
- */
- static void pick_new_mon(struct ceph_mon_client *monc)
- {
- int old_mon = monc->cur_mon;
- BUG_ON(monc->monmap->num_mon < 1);
- if (monc->monmap->num_mon == 1) {
- monc->cur_mon = 0;
- } else {
- int max = monc->monmap->num_mon;
- int o = -1;
- int n;
- if (monc->cur_mon >= 0) {
- if (monc->cur_mon < monc->monmap->num_mon)
- o = monc->cur_mon;
- if (o >= 0)
- max--;
- }
- n = prandom_u32() % max;
- if (o >= 0 && n >= o)
- n++;
- monc->cur_mon = n;
- }
- dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
- monc->cur_mon, monc->monmap->num_mon);
- }
- /*
- * Open a session with a new monitor.
- */
- static void __open_session(struct ceph_mon_client *monc)
- {
- int ret;
- pick_new_mon(monc);
- monc->hunting = true;
- if (monc->had_a_connection) {
- monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
- if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
- monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
- }
- monc->sub_renew_after = jiffies; /* i.e., expired */
- monc->sub_renew_sent = 0;
- dout("%s opening mon%d\n", __func__, monc->cur_mon);
- ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
- &monc->monmap->mon_inst[monc->cur_mon].addr);
- /*
- * send an initial keepalive to ensure our timestamp is valid
- * by the time we are in an OPENED state
- */
- ceph_con_keepalive(&monc->con);
- /* initiate authentication handshake */
- ret = ceph_auth_build_hello(monc->auth,
- monc->m_auth->front.iov_base,
- monc->m_auth->front_alloc_len);
- BUG_ON(ret <= 0);
- __send_prepared_auth_request(monc, ret);
- }
- static void reopen_session(struct ceph_mon_client *monc)
- {
- if (!monc->hunting)
- pr_info("mon%d %s session lost, hunting for new mon\n",
- monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr));
- __close_session(monc);
- __open_session(monc);
- }
- /*
- * Reschedule delayed work timer.
- */
- static void __schedule_delayed(struct ceph_mon_client *monc)
- {
- unsigned long delay;
- if (monc->hunting)
- delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
- else
- delay = CEPH_MONC_PING_INTERVAL;
- dout("__schedule_delayed after %lu\n", delay);
- mod_delayed_work(system_wq, &monc->delayed_work,
- round_jiffies_relative(delay));
- }
- const char *ceph_sub_str[] = {
- [CEPH_SUB_MONMAP] = "monmap",
- [CEPH_SUB_OSDMAP] = "osdmap",
- [CEPH_SUB_FSMAP] = "fsmap.user",
- [CEPH_SUB_MDSMAP] = "mdsmap",
- };
- /*
- * Send subscribe request for one or more maps, according to
- * monc->subs.
- */
- static void __send_subscribe(struct ceph_mon_client *monc)
- {
- struct ceph_msg *msg = monc->m_subscribe;
- void *p = msg->front.iov_base;
- void *const end = p + msg->front_alloc_len;
- int num = 0;
- int i;
- dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
- BUG_ON(monc->cur_mon < 0);
- if (!monc->sub_renew_sent)
- monc->sub_renew_sent = jiffies | 1; /* never 0 */
- msg->hdr.version = cpu_to_le16(2);
- for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
- if (monc->subs[i].want)
- num++;
- }
- BUG_ON(num < 1); /* monmap sub is always there */
- ceph_encode_32(&p, num);
- for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
- char buf[32];
- int len;
- if (!monc->subs[i].want)
- continue;
- len = sprintf(buf, "%s", ceph_sub_str[i]);
- if (i == CEPH_SUB_MDSMAP &&
- monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
- len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
- dout("%s %s start %llu flags 0x%x\n", __func__, buf,
- le64_to_cpu(monc->subs[i].item.start),
- monc->subs[i].item.flags);
- ceph_encode_string(&p, end, buf, len);
- memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
- p += sizeof(monc->subs[i].item);
- }
- BUG_ON(p > end);
- msg->front.iov_len = p - msg->front.iov_base;
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- ceph_msg_revoke(msg);
- ceph_con_send(&monc->con, ceph_msg_get(msg));
- }
- static void handle_subscribe_ack(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- unsigned int seconds;
- struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
- if (msg->front.iov_len < sizeof(*h))
- goto bad;
- seconds = le32_to_cpu(h->duration);
- mutex_lock(&monc->mutex);
- if (monc->sub_renew_sent) {
- /*
- * This is only needed for legacy (infernalis or older)
- * MONs -- see delayed_work().
- */
- monc->sub_renew_after = monc->sub_renew_sent +
- (seconds >> 1) * HZ - 1;
- dout("%s sent %lu duration %d renew after %lu\n", __func__,
- monc->sub_renew_sent, seconds, monc->sub_renew_after);
- monc->sub_renew_sent = 0;
- } else {
- dout("%s sent %lu renew after %lu, ignoring\n", __func__,
- monc->sub_renew_sent, monc->sub_renew_after);
- }
- mutex_unlock(&monc->mutex);
- return;
- bad:
- pr_err("got corrupt subscribe-ack msg\n");
- ceph_msg_dump(msg);
- }
- /*
- * Register interest in a map
- *
- * @sub: one of CEPH_SUB_*
- * @epoch: X for "every map since X", or 0 for "just the latest"
- */
- static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
- u32 epoch, bool continuous)
- {
- __le64 start = cpu_to_le64(epoch);
- u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
- dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
- epoch, continuous);
- if (monc->subs[sub].want &&
- monc->subs[sub].item.start == start &&
- monc->subs[sub].item.flags == flags)
- return false;
- monc->subs[sub].item.start = start;
- monc->subs[sub].item.flags = flags;
- monc->subs[sub].want = true;
- return true;
- }
- bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
- bool continuous)
- {
- bool need_request;
- mutex_lock(&monc->mutex);
- need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
- mutex_unlock(&monc->mutex);
- return need_request;
- }
- EXPORT_SYMBOL(ceph_monc_want_map);
- /*
- * Keep track of which maps we have
- *
- * @sub: one of CEPH_SUB_*
- */
- static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
- u32 epoch)
- {
- dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
- if (monc->subs[sub].want) {
- if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
- monc->subs[sub].want = false;
- else
- monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
- }
- monc->subs[sub].have = epoch;
- }
- void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
- {
- mutex_lock(&monc->mutex);
- __ceph_monc_got_map(monc, sub, epoch);
- mutex_unlock(&monc->mutex);
- }
- EXPORT_SYMBOL(ceph_monc_got_map);
- void ceph_monc_renew_subs(struct ceph_mon_client *monc)
- {
- mutex_lock(&monc->mutex);
- __send_subscribe(monc);
- mutex_unlock(&monc->mutex);
- }
- EXPORT_SYMBOL(ceph_monc_renew_subs);
- /*
- * Wait for an osdmap with a given epoch.
- *
- * @epoch: epoch to wait for
- * @timeout: in jiffies, 0 means "wait forever"
- */
- int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
- unsigned long timeout)
- {
- unsigned long started = jiffies;
- long ret;
- mutex_lock(&monc->mutex);
- while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
- mutex_unlock(&monc->mutex);
- if (timeout && time_after_eq(jiffies, started + timeout))
- return -ETIMEDOUT;
- ret = wait_event_interruptible_timeout(monc->client->auth_wq,
- monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
- ceph_timeout_jiffies(timeout));
- if (ret < 0)
- return ret;
- mutex_lock(&monc->mutex);
- }
- mutex_unlock(&monc->mutex);
- return 0;
- }
- EXPORT_SYMBOL(ceph_monc_wait_osdmap);
- /*
- * Open a session with a random monitor. Request monmap and osdmap,
- * which are waited upon in __ceph_open_session().
- */
- int ceph_monc_open_session(struct ceph_mon_client *monc)
- {
- mutex_lock(&monc->mutex);
- __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
- __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
- __open_session(monc);
- __schedule_delayed(monc);
- mutex_unlock(&monc->mutex);
- return 0;
- }
- EXPORT_SYMBOL(ceph_monc_open_session);
- static void ceph_monc_handle_map(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- struct ceph_client *client = monc->client;
- struct ceph_monmap *monmap = NULL, *old = monc->monmap;
- void *p, *end;
- mutex_lock(&monc->mutex);
- dout("handle_monmap\n");
- p = msg->front.iov_base;
- end = p + msg->front.iov_len;
- monmap = ceph_monmap_decode(p, end);
- if (IS_ERR(monmap)) {
- pr_err("problem decoding monmap, %d\n",
- (int)PTR_ERR(monmap));
- goto out;
- }
- if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
- kfree(monmap);
- goto out;
- }
- client->monc.monmap = monmap;
- kfree(old);
- __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
- client->have_fsid = true;
- out:
- mutex_unlock(&monc->mutex);
- wake_up_all(&client->auth_wq);
- }
- /*
- * generic requests (currently statfs, mon_get_version)
- */
- DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
- static void release_generic_request(struct kref *kref)
- {
- struct ceph_mon_generic_request *req =
- container_of(kref, struct ceph_mon_generic_request, kref);
- dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
- req->reply);
- WARN_ON(!RB_EMPTY_NODE(&req->node));
- if (req->reply)
- ceph_msg_put(req->reply);
- if (req->request)
- ceph_msg_put(req->request);
- kfree(req);
- }
- static void put_generic_request(struct ceph_mon_generic_request *req)
- {
- if (req)
- kref_put(&req->kref, release_generic_request);
- }
- static void get_generic_request(struct ceph_mon_generic_request *req)
- {
- kref_get(&req->kref);
- }
- static struct ceph_mon_generic_request *
- alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
- {
- struct ceph_mon_generic_request *req;
- req = kzalloc(sizeof(*req), gfp);
- if (!req)
- return NULL;
- req->monc = monc;
- kref_init(&req->kref);
- RB_CLEAR_NODE(&req->node);
- init_completion(&req->completion);
- dout("%s greq %p\n", __func__, req);
- return req;
- }
- static void register_generic_request(struct ceph_mon_generic_request *req)
- {
- struct ceph_mon_client *monc = req->monc;
- WARN_ON(req->tid);
- get_generic_request(req);
- req->tid = ++monc->last_tid;
- insert_generic_request(&monc->generic_request_tree, req);
- }
- static void send_generic_request(struct ceph_mon_client *monc,
- struct ceph_mon_generic_request *req)
- {
- WARN_ON(!req->tid);
- dout("%s greq %p tid %llu\n", __func__, req, req->tid);
- req->request->hdr.tid = cpu_to_le64(req->tid);
- ceph_con_send(&monc->con, ceph_msg_get(req->request));
- }
- static void __finish_generic_request(struct ceph_mon_generic_request *req)
- {
- struct ceph_mon_client *monc = req->monc;
- dout("%s greq %p tid %llu\n", __func__, req, req->tid);
- erase_generic_request(&monc->generic_request_tree, req);
- ceph_msg_revoke(req->request);
- ceph_msg_revoke_incoming(req->reply);
- }
- static void finish_generic_request(struct ceph_mon_generic_request *req)
- {
- __finish_generic_request(req);
- put_generic_request(req);
- }
- static void complete_generic_request(struct ceph_mon_generic_request *req)
- {
- if (req->complete_cb)
- req->complete_cb(req);
- else
- complete_all(&req->completion);
- put_generic_request(req);
- }
- static void cancel_generic_request(struct ceph_mon_generic_request *req)
- {
- struct ceph_mon_client *monc = req->monc;
- struct ceph_mon_generic_request *lookup_req;
- dout("%s greq %p tid %llu\n", __func__, req, req->tid);
- mutex_lock(&monc->mutex);
- lookup_req = lookup_generic_request(&monc->generic_request_tree,
- req->tid);
- if (lookup_req) {
- WARN_ON(lookup_req != req);
- finish_generic_request(req);
- }
- mutex_unlock(&monc->mutex);
- }
- static int wait_generic_request(struct ceph_mon_generic_request *req)
- {
- int ret;
- dout("%s greq %p tid %llu\n", __func__, req, req->tid);
- ret = wait_for_completion_interruptible(&req->completion);
- if (ret)
- cancel_generic_request(req);
- else
- ret = req->result; /* completed */
- return ret;
- }
- static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
- struct ceph_msg_header *hdr,
- int *skip)
- {
- struct ceph_mon_client *monc = con->private;
- struct ceph_mon_generic_request *req;
- u64 tid = le64_to_cpu(hdr->tid);
- struct ceph_msg *m;
- mutex_lock(&monc->mutex);
- req = lookup_generic_request(&monc->generic_request_tree, tid);
- if (!req) {
- dout("get_generic_reply %lld dne\n", tid);
- *skip = 1;
- m = NULL;
- } else {
- dout("get_generic_reply %lld got %p\n", tid, req->reply);
- *skip = 0;
- m = ceph_msg_get(req->reply);
- /*
- * we don't need to track the connection reading into
- * this reply because we only have one open connection
- * at a time, ever.
- */
- }
- mutex_unlock(&monc->mutex);
- return m;
- }
- /*
- * statfs
- */
- static void handle_statfs_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- struct ceph_mon_generic_request *req;
- struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
- u64 tid = le64_to_cpu(msg->hdr.tid);
- dout("%s msg %p tid %llu\n", __func__, msg, tid);
- if (msg->front.iov_len != sizeof(*reply))
- goto bad;
- mutex_lock(&monc->mutex);
- req = lookup_generic_request(&monc->generic_request_tree, tid);
- if (!req) {
- mutex_unlock(&monc->mutex);
- return;
- }
- req->result = 0;
- *req->u.st = reply->st; /* struct */
- __finish_generic_request(req);
- mutex_unlock(&monc->mutex);
- complete_generic_request(req);
- return;
- bad:
- pr_err("corrupt statfs reply, tid %llu\n", tid);
- ceph_msg_dump(msg);
- }
- /*
- * Do a synchronous statfs().
- */
- int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
- {
- struct ceph_mon_generic_request *req;
- struct ceph_mon_statfs *h;
- int ret = -ENOMEM;
- req = alloc_generic_request(monc, GFP_NOFS);
- if (!req)
- goto out;
- req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
- true);
- if (!req->request)
- goto out;
- req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
- if (!req->reply)
- goto out;
- req->u.st = buf;
- mutex_lock(&monc->mutex);
- register_generic_request(req);
- /* fill out request */
- h = req->request->front.iov_base;
- h->monhdr.have_version = 0;
- h->monhdr.session_mon = cpu_to_le16(-1);
- h->monhdr.session_mon_tid = 0;
- h->fsid = monc->monmap->fsid;
- send_generic_request(monc, req);
- mutex_unlock(&monc->mutex);
- ret = wait_generic_request(req);
- out:
- put_generic_request(req);
- return ret;
- }
- EXPORT_SYMBOL(ceph_monc_do_statfs);
- static void handle_get_version_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- struct ceph_mon_generic_request *req;
- u64 tid = le64_to_cpu(msg->hdr.tid);
- void *p = msg->front.iov_base;
- void *end = p + msg->front_alloc_len;
- u64 handle;
- dout("%s msg %p tid %llu\n", __func__, msg, tid);
- ceph_decode_need(&p, end, 2*sizeof(u64), bad);
- handle = ceph_decode_64(&p);
- if (tid != 0 && tid != handle)
- goto bad;
- mutex_lock(&monc->mutex);
- req = lookup_generic_request(&monc->generic_request_tree, handle);
- if (!req) {
- mutex_unlock(&monc->mutex);
- return;
- }
- req->result = 0;
- req->u.newest = ceph_decode_64(&p);
- __finish_generic_request(req);
- mutex_unlock(&monc->mutex);
- complete_generic_request(req);
- return;
- bad:
- pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
- ceph_msg_dump(msg);
- }
- static struct ceph_mon_generic_request *
- __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
- ceph_monc_callback_t cb, u64 private_data)
- {
- struct ceph_mon_generic_request *req;
- req = alloc_generic_request(monc, GFP_NOIO);
- if (!req)
- goto err_put_req;
- req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
- sizeof(u64) + sizeof(u32) + strlen(what),
- GFP_NOIO, true);
- if (!req->request)
- goto err_put_req;
- req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
- true);
- if (!req->reply)
- goto err_put_req;
- req->complete_cb = cb;
- req->private_data = private_data;
- mutex_lock(&monc->mutex);
- register_generic_request(req);
- {
- void *p = req->request->front.iov_base;
- void *const end = p + req->request->front_alloc_len;
- ceph_encode_64(&p, req->tid); /* handle */
- ceph_encode_string(&p, end, what, strlen(what));
- WARN_ON(p != end);
- }
- send_generic_request(monc, req);
- mutex_unlock(&monc->mutex);
- return req;
- err_put_req:
- put_generic_request(req);
- return ERR_PTR(-ENOMEM);
- }
- /*
- * Send MMonGetVersion and wait for the reply.
- *
- * @what: one of "mdsmap", "osdmap" or "monmap"
- */
- int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
- u64 *newest)
- {
- struct ceph_mon_generic_request *req;
- int ret;
- req = __ceph_monc_get_version(monc, what, NULL, 0);
- if (IS_ERR(req))
- return PTR_ERR(req);
- ret = wait_generic_request(req);
- if (!ret)
- *newest = req->u.newest;
- put_generic_request(req);
- return ret;
- }
- EXPORT_SYMBOL(ceph_monc_get_version);
- /*
- * Send MMonGetVersion,
- *
- * @what: one of "mdsmap", "osdmap" or "monmap"
- */
- int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
- ceph_monc_callback_t cb, u64 private_data)
- {
- struct ceph_mon_generic_request *req;
- req = __ceph_monc_get_version(monc, what, cb, private_data);
- if (IS_ERR(req))
- return PTR_ERR(req);
- put_generic_request(req);
- return 0;
- }
- EXPORT_SYMBOL(ceph_monc_get_version_async);
- static void handle_command_ack(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- struct ceph_mon_generic_request *req;
- void *p = msg->front.iov_base;
- void *const end = p + msg->front_alloc_len;
- u64 tid = le64_to_cpu(msg->hdr.tid);
- dout("%s msg %p tid %llu\n", __func__, msg, tid);
- ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
- sizeof(u32), bad);
- p += sizeof(struct ceph_mon_request_header);
- mutex_lock(&monc->mutex);
- req = lookup_generic_request(&monc->generic_request_tree, tid);
- if (!req) {
- mutex_unlock(&monc->mutex);
- return;
- }
- req->result = ceph_decode_32(&p);
- __finish_generic_request(req);
- mutex_unlock(&monc->mutex);
- complete_generic_request(req);
- return;
- bad:
- pr_err("corrupt mon_command ack, tid %llu\n", tid);
- ceph_msg_dump(msg);
- }
- int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
- struct ceph_entity_addr *client_addr)
- {
- struct ceph_mon_generic_request *req;
- struct ceph_mon_command *h;
- int ret = -ENOMEM;
- int len;
- req = alloc_generic_request(monc, GFP_NOIO);
- if (!req)
- goto out;
- req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
- if (!req->request)
- goto out;
- req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
- true);
- if (!req->reply)
- goto out;
- mutex_lock(&monc->mutex);
- register_generic_request(req);
- h = req->request->front.iov_base;
- h->monhdr.have_version = 0;
- h->monhdr.session_mon = cpu_to_le16(-1);
- h->monhdr.session_mon_tid = 0;
- h->fsid = monc->monmap->fsid;
- h->num_strs = cpu_to_le32(1);
- len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
- \"blacklistop\": \"add\", \
- \"addr\": \"%pISpc/%u\" }",
- &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
- h->str_len = cpu_to_le32(len);
- send_generic_request(monc, req);
- mutex_unlock(&monc->mutex);
- ret = wait_generic_request(req);
- out:
- put_generic_request(req);
- return ret;
- }
- EXPORT_SYMBOL(ceph_monc_blacklist_add);
- /*
- * Resend pending generic requests.
- */
- static void __resend_generic_request(struct ceph_mon_client *monc)
- {
- struct ceph_mon_generic_request *req;
- struct rb_node *p;
- for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
- req = rb_entry(p, struct ceph_mon_generic_request, node);
- ceph_msg_revoke(req->request);
- ceph_msg_revoke_incoming(req->reply);
- ceph_con_send(&monc->con, ceph_msg_get(req->request));
- }
- }
- /*
- * Delayed work. If we haven't mounted yet, retry. Otherwise,
- * renew/retry subscription as needed (in case it is timing out, or we
- * got an ENOMEM). And keep the monitor connection alive.
- */
- static void delayed_work(struct work_struct *work)
- {
- struct ceph_mon_client *monc =
- container_of(work, struct ceph_mon_client, delayed_work.work);
- dout("monc delayed_work\n");
- mutex_lock(&monc->mutex);
- if (monc->hunting) {
- dout("%s continuing hunt\n", __func__);
- reopen_session(monc);
- } else {
- int is_auth = ceph_auth_is_authenticated(monc->auth);
- if (ceph_con_keepalive_expired(&monc->con,
- CEPH_MONC_PING_TIMEOUT)) {
- dout("monc keepalive timeout\n");
- is_auth = 0;
- reopen_session(monc);
- }
- if (!monc->hunting) {
- ceph_con_keepalive(&monc->con);
- __validate_auth(monc);
- }
- if (is_auth &&
- !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
- unsigned long now = jiffies;
- dout("%s renew subs? now %lu renew after %lu\n",
- __func__, now, monc->sub_renew_after);
- if (time_after_eq(now, monc->sub_renew_after))
- __send_subscribe(monc);
- }
- }
- __schedule_delayed(monc);
- mutex_unlock(&monc->mutex);
- }
- /*
- * On startup, we build a temporary monmap populated with the IPs
- * provided by mount(2).
- */
- static int build_initial_monmap(struct ceph_mon_client *monc)
- {
- struct ceph_options *opt = monc->client->options;
- struct ceph_entity_addr *mon_addr = opt->mon_addr;
- int num_mon = opt->num_mon;
- int i;
- /* build initial monmap */
- monc->monmap = kzalloc(sizeof(*monc->monmap) +
- num_mon*sizeof(monc->monmap->mon_inst[0]),
- GFP_KERNEL);
- if (!monc->monmap)
- return -ENOMEM;
- for (i = 0; i < num_mon; i++) {
- monc->monmap->mon_inst[i].addr = mon_addr[i];
- monc->monmap->mon_inst[i].addr.nonce = 0;
- monc->monmap->mon_inst[i].name.type =
- CEPH_ENTITY_TYPE_MON;
- monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
- }
- monc->monmap->num_mon = num_mon;
- return 0;
- }
- int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
- {
- int err = 0;
- dout("init\n");
- memset(monc, 0, sizeof(*monc));
- monc->client = cl;
- monc->monmap = NULL;
- mutex_init(&monc->mutex);
- err = build_initial_monmap(monc);
- if (err)
- goto out;
- /* connection */
- /* authentication */
- monc->auth = ceph_auth_init(cl->options->name,
- cl->options->key);
- if (IS_ERR(monc->auth)) {
- err = PTR_ERR(monc->auth);
- goto out_monmap;
- }
- monc->auth->want_keys =
- CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
- CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
- /* msgs */
- err = -ENOMEM;
- monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
- sizeof(struct ceph_mon_subscribe_ack),
- GFP_KERNEL, true);
- if (!monc->m_subscribe_ack)
- goto out_auth;
- monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
- GFP_KERNEL, true);
- if (!monc->m_subscribe)
- goto out_subscribe_ack;
- monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
- GFP_KERNEL, true);
- if (!monc->m_auth_reply)
- goto out_subscribe;
- monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
- monc->pending_auth = 0;
- if (!monc->m_auth)
- goto out_auth_reply;
- ceph_con_init(&monc->con, monc, &mon_con_ops,
- &monc->client->msgr);
- monc->cur_mon = -1;
- monc->had_a_connection = false;
- monc->hunt_mult = 1;
- INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
- monc->generic_request_tree = RB_ROOT;
- monc->last_tid = 0;
- monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
- return 0;
- out_auth_reply:
- ceph_msg_put(monc->m_auth_reply);
- out_subscribe:
- ceph_msg_put(monc->m_subscribe);
- out_subscribe_ack:
- ceph_msg_put(monc->m_subscribe_ack);
- out_auth:
- ceph_auth_destroy(monc->auth);
- out_monmap:
- kfree(monc->monmap);
- out:
- return err;
- }
- EXPORT_SYMBOL(ceph_monc_init);
- void ceph_monc_stop(struct ceph_mon_client *monc)
- {
- dout("stop\n");
- cancel_delayed_work_sync(&monc->delayed_work);
- mutex_lock(&monc->mutex);
- __close_session(monc);
- monc->cur_mon = -1;
- mutex_unlock(&monc->mutex);
- /*
- * flush msgr queue before we destroy ourselves to ensure that:
- * - any work that references our embedded con is finished.
- * - any osd_client or other work that may reference an authorizer
- * finishes before we shut down the auth subsystem.
- */
- ceph_msgr_flush();
- ceph_auth_destroy(monc->auth);
- WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
- ceph_msg_put(monc->m_auth);
- ceph_msg_put(monc->m_auth_reply);
- ceph_msg_put(monc->m_subscribe);
- ceph_msg_put(monc->m_subscribe_ack);
- kfree(monc->monmap);
- }
- EXPORT_SYMBOL(ceph_monc_stop);
- static void finish_hunting(struct ceph_mon_client *monc)
- {
- if (monc->hunting) {
- dout("%s found mon%d\n", __func__, monc->cur_mon);
- monc->hunting = false;
- monc->had_a_connection = true;
- monc->hunt_mult /= 2; /* reduce by 50% */
- if (monc->hunt_mult < 1)
- monc->hunt_mult = 1;
- }
- }
- static void handle_auth_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
- {
- int ret;
- int was_auth = 0;
- mutex_lock(&monc->mutex);
- was_auth = ceph_auth_is_authenticated(monc->auth);
- monc->pending_auth = 0;
- ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
- msg->front.iov_len,
- monc->m_auth->front.iov_base,
- monc->m_auth->front_alloc_len);
- if (ret > 0) {
- __send_prepared_auth_request(monc, ret);
- goto out;
- }
- finish_hunting(monc);
- if (ret < 0) {
- monc->client->auth_err = ret;
- } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) {
- dout("authenticated, starting session\n");
- monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
- monc->client->msgr.inst.name.num =
- cpu_to_le64(monc->auth->global_id);
- __send_subscribe(monc);
- __resend_generic_request(monc);
- pr_info("mon%d %s session established\n", monc->cur_mon,
- ceph_pr_addr(&monc->con.peer_addr.in_addr));
- }
- out:
- mutex_unlock(&monc->mutex);
- if (monc->client->auth_err < 0)
- wake_up_all(&monc->client->auth_wq);
- }
- static int __validate_auth(struct ceph_mon_client *monc)
- {
- int ret;
- if (monc->pending_auth)
- return 0;
- ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
- monc->m_auth->front_alloc_len);
- if (ret <= 0)
- return ret; /* either an error, or no need to authenticate */
- __send_prepared_auth_request(monc, ret);
- return 0;
- }
- int ceph_monc_validate_auth(struct ceph_mon_client *monc)
- {
- int ret;
- mutex_lock(&monc->mutex);
- ret = __validate_auth(monc);
- mutex_unlock(&monc->mutex);
- return ret;
- }
- EXPORT_SYMBOL(ceph_monc_validate_auth);
- /*
- * handle incoming message
- */
- static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
- {
- struct ceph_mon_client *monc = con->private;
- int type = le16_to_cpu(msg->hdr.type);
- if (!monc)
- return;
- switch (type) {
- case CEPH_MSG_AUTH_REPLY:
- handle_auth_reply(monc, msg);
- break;
- case CEPH_MSG_MON_SUBSCRIBE_ACK:
- handle_subscribe_ack(monc, msg);
- break;
- case CEPH_MSG_STATFS_REPLY:
- handle_statfs_reply(monc, msg);
- break;
- case CEPH_MSG_MON_GET_VERSION_REPLY:
- handle_get_version_reply(monc, msg);
- break;
- case CEPH_MSG_MON_COMMAND_ACK:
- handle_command_ack(monc, msg);
- break;
- case CEPH_MSG_MON_MAP:
- ceph_monc_handle_map(monc, msg);
- break;
- case CEPH_MSG_OSD_MAP:
- ceph_osdc_handle_map(&monc->client->osdc, msg);
- break;
- default:
- /* can the chained handler handle it? */
- if (monc->client->extra_mon_dispatch &&
- monc->client->extra_mon_dispatch(monc->client, msg) == 0)
- break;
-
- pr_err("received unknown message type %d %s\n", type,
- ceph_msg_type_name(type));
- }
- ceph_msg_put(msg);
- }
- /*
- * Allocate memory for incoming message
- */
- static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
- struct ceph_msg_header *hdr,
- int *skip)
- {
- struct ceph_mon_client *monc = con->private;
- int type = le16_to_cpu(hdr->type);
- int front_len = le32_to_cpu(hdr->front_len);
- struct ceph_msg *m = NULL;
- *skip = 0;
- switch (type) {
- case CEPH_MSG_MON_SUBSCRIBE_ACK:
- m = ceph_msg_get(monc->m_subscribe_ack);
- break;
- case CEPH_MSG_STATFS_REPLY:
- case CEPH_MSG_MON_COMMAND_ACK:
- return get_generic_reply(con, hdr, skip);
- case CEPH_MSG_AUTH_REPLY:
- m = ceph_msg_get(monc->m_auth_reply);
- break;
- case CEPH_MSG_MON_GET_VERSION_REPLY:
- if (le64_to_cpu(hdr->tid) != 0)
- return get_generic_reply(con, hdr, skip);
- /*
- * Older OSDs don't set reply tid even if the orignal
- * request had a non-zero tid. Workaround this weirdness
- * by falling through to the allocate case.
- */
- case CEPH_MSG_MON_MAP:
- case CEPH_MSG_MDS_MAP:
- case CEPH_MSG_OSD_MAP:
- case CEPH_MSG_FS_MAP_USER:
- m = ceph_msg_new(type, front_len, GFP_NOFS, false);
- if (!m)
- return NULL; /* ENOMEM--return skip == 0 */
- break;
- }
- if (!m) {
- pr_info("alloc_msg unknown type %d\n", type);
- *skip = 1;
- } else if (front_len > m->front_alloc_len) {
- pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
- front_len, m->front_alloc_len,
- (unsigned int)con->peer_name.type,
- le64_to_cpu(con->peer_name.num));
- ceph_msg_put(m);
- m = ceph_msg_new(type, front_len, GFP_NOFS, false);
- }
- return m;
- }
- /*
- * If the monitor connection resets, pick a new monitor and resubmit
- * any pending requests.
- */
- static void mon_fault(struct ceph_connection *con)
- {
- struct ceph_mon_client *monc = con->private;
- mutex_lock(&monc->mutex);
- dout("%s mon%d\n", __func__, monc->cur_mon);
- if (monc->cur_mon >= 0) {
- if (!monc->hunting) {
- dout("%s hunting for new mon\n", __func__);
- reopen_session(monc);
- __schedule_delayed(monc);
- } else {
- dout("%s already hunting\n", __func__);
- }
- }
- mutex_unlock(&monc->mutex);
- }
- /*
- * We can ignore refcounting on the connection struct, as all references
- * will come from the messenger workqueue, which is drained prior to
- * mon_client destruction.
- */
- static struct ceph_connection *con_get(struct ceph_connection *con)
- {
- return con;
- }
- static void con_put(struct ceph_connection *con)
- {
- }
- static const struct ceph_connection_operations mon_con_ops = {
- .get = con_get,
- .put = con_put,
- .dispatch = dispatch,
- .fault = mon_fault,
- .alloc_msg = mon_alloc_msg,
- };
|