md-cluster.c 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233
  1. /*
  2. * Copyright (C) 2015, SUSE
  3. *
  4. * This program is free software; you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation; either version 2, or (at your option)
  7. * any later version.
  8. *
  9. */
  10. #include <linux/module.h>
  11. #include <linux/dlm.h>
  12. #include <linux/sched.h>
  13. #include <linux/raid/md_p.h>
  14. #include "md.h"
  15. #include "bitmap.h"
  16. #include "md-cluster.h"
  17. #define LVB_SIZE 64
  18. #define NEW_DEV_TIMEOUT 5000
  19. struct dlm_lock_resource {
  20. dlm_lockspace_t *ls;
  21. struct dlm_lksb lksb;
  22. char *name; /* lock name. */
  23. uint32_t flags; /* flags to pass to dlm_lock() */
  24. struct completion completion; /* completion for synchronized locking */
  25. void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  26. struct mddev *mddev; /* pointing back to mddev. */
  27. int mode;
  28. };
  29. struct suspend_info {
  30. int slot;
  31. sector_t lo;
  32. sector_t hi;
  33. struct list_head list;
  34. };
  35. struct resync_info {
  36. __le64 lo;
  37. __le64 hi;
  38. };
  39. /* md_cluster_info flags */
  40. #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
  41. #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
  42. #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
  43. /* Lock the send communication. This is done through
  44. * bit manipulation as opposed to a mutex in order to
  45. * accomodate lock and hold. See next comment.
  46. */
  47. #define MD_CLUSTER_SEND_LOCK 4
  48. /* If cluster operations (such as adding a disk) must lock the
  49. * communication channel, so as to perform extra operations
  50. * (update metadata) and no other operation is allowed on the
  51. * MD. Token needs to be locked and held until the operation
  52. * completes witha md_update_sb(), which would eventually release
  53. * the lock.
  54. */
  55. #define MD_CLUSTER_SEND_LOCKED_ALREADY 5
  56. struct md_cluster_info {
  57. /* dlm lock space and resources for clustered raid. */
  58. dlm_lockspace_t *lockspace;
  59. int slot_number;
  60. struct completion completion;
  61. struct mutex recv_mutex;
  62. struct dlm_lock_resource *bitmap_lockres;
  63. struct dlm_lock_resource **other_bitmap_lockres;
  64. struct dlm_lock_resource *resync_lockres;
  65. struct list_head suspend_list;
  66. spinlock_t suspend_lock;
  67. struct md_thread *recovery_thread;
  68. unsigned long recovery_map;
  69. /* communication loc resources */
  70. struct dlm_lock_resource *ack_lockres;
  71. struct dlm_lock_resource *message_lockres;
  72. struct dlm_lock_resource *token_lockres;
  73. struct dlm_lock_resource *no_new_dev_lockres;
  74. struct md_thread *recv_thread;
  75. struct completion newdisk_completion;
  76. wait_queue_head_t wait;
  77. unsigned long state;
  78. /* record the region in RESYNCING message */
  79. sector_t sync_low;
  80. sector_t sync_hi;
  81. };
  82. enum msg_type {
  83. METADATA_UPDATED = 0,
  84. RESYNCING,
  85. NEWDISK,
  86. REMOVE,
  87. RE_ADD,
  88. BITMAP_NEEDS_SYNC,
  89. };
  90. struct cluster_msg {
  91. __le32 type;
  92. __le32 slot;
  93. /* TODO: Unionize this for smaller footprint */
  94. __le64 low;
  95. __le64 high;
  96. char uuid[16];
  97. __le32 raid_slot;
  98. };
  99. static void sync_ast(void *arg)
  100. {
  101. struct dlm_lock_resource *res;
  102. res = arg;
  103. complete(&res->completion);
  104. }
  105. static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
  106. {
  107. int ret = 0;
  108. ret = dlm_lock(res->ls, mode, &res->lksb,
  109. res->flags, res->name, strlen(res->name),
  110. 0, sync_ast, res, res->bast);
  111. if (ret)
  112. return ret;
  113. wait_for_completion(&res->completion);
  114. if (res->lksb.sb_status == 0)
  115. res->mode = mode;
  116. return res->lksb.sb_status;
  117. }
  118. static int dlm_unlock_sync(struct dlm_lock_resource *res)
  119. {
  120. return dlm_lock_sync(res, DLM_LOCK_NL);
  121. }
  122. static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
  123. char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
  124. {
  125. struct dlm_lock_resource *res = NULL;
  126. int ret, namelen;
  127. struct md_cluster_info *cinfo = mddev->cluster_info;
  128. res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  129. if (!res)
  130. return NULL;
  131. init_completion(&res->completion);
  132. res->ls = cinfo->lockspace;
  133. res->mddev = mddev;
  134. res->mode = DLM_LOCK_IV;
  135. namelen = strlen(name);
  136. res->name = kzalloc(namelen + 1, GFP_KERNEL);
  137. if (!res->name) {
  138. pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
  139. goto out_err;
  140. }
  141. strlcpy(res->name, name, namelen + 1);
  142. if (with_lvb) {
  143. res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
  144. if (!res->lksb.sb_lvbptr) {
  145. pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
  146. goto out_err;
  147. }
  148. res->flags = DLM_LKF_VALBLK;
  149. }
  150. if (bastfn)
  151. res->bast = bastfn;
  152. res->flags |= DLM_LKF_EXPEDITE;
  153. ret = dlm_lock_sync(res, DLM_LOCK_NL);
  154. if (ret) {
  155. pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
  156. goto out_err;
  157. }
  158. res->flags &= ~DLM_LKF_EXPEDITE;
  159. res->flags |= DLM_LKF_CONVERT;
  160. return res;
  161. out_err:
  162. kfree(res->lksb.sb_lvbptr);
  163. kfree(res->name);
  164. kfree(res);
  165. return NULL;
  166. }
  167. static void lockres_free(struct dlm_lock_resource *res)
  168. {
  169. int ret;
  170. if (!res)
  171. return;
  172. /* cancel a lock request or a conversion request that is blocked */
  173. res->flags |= DLM_LKF_CANCEL;
  174. retry:
  175. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
  176. if (unlikely(ret != 0)) {
  177. pr_info("%s: failed to unlock %s return %d\n", __func__, res->name, ret);
  178. /* if a lock conversion is cancelled, then the lock is put
  179. * back to grant queue, need to ensure it is unlocked */
  180. if (ret == -DLM_ECANCEL)
  181. goto retry;
  182. }
  183. res->flags &= ~DLM_LKF_CANCEL;
  184. wait_for_completion(&res->completion);
  185. kfree(res->name);
  186. kfree(res->lksb.sb_lvbptr);
  187. kfree(res);
  188. }
  189. static void add_resync_info(struct dlm_lock_resource *lockres,
  190. sector_t lo, sector_t hi)
  191. {
  192. struct resync_info *ri;
  193. ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
  194. ri->lo = cpu_to_le64(lo);
  195. ri->hi = cpu_to_le64(hi);
  196. }
  197. static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
  198. {
  199. struct resync_info ri;
  200. struct suspend_info *s = NULL;
  201. sector_t hi = 0;
  202. dlm_lock_sync(lockres, DLM_LOCK_CR);
  203. memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  204. hi = le64_to_cpu(ri.hi);
  205. if (hi > 0) {
  206. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  207. if (!s)
  208. goto out;
  209. s->hi = hi;
  210. s->lo = le64_to_cpu(ri.lo);
  211. }
  212. dlm_unlock_sync(lockres);
  213. out:
  214. return s;
  215. }
  216. static void recover_bitmaps(struct md_thread *thread)
  217. {
  218. struct mddev *mddev = thread->mddev;
  219. struct md_cluster_info *cinfo = mddev->cluster_info;
  220. struct dlm_lock_resource *bm_lockres;
  221. char str[64];
  222. int slot, ret;
  223. struct suspend_info *s, *tmp;
  224. sector_t lo, hi;
  225. while (cinfo->recovery_map) {
  226. slot = fls64((u64)cinfo->recovery_map) - 1;
  227. /* Clear suspend_area associated with the bitmap */
  228. spin_lock_irq(&cinfo->suspend_lock);
  229. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  230. if (slot == s->slot) {
  231. list_del(&s->list);
  232. kfree(s);
  233. }
  234. spin_unlock_irq(&cinfo->suspend_lock);
  235. snprintf(str, 64, "bitmap%04d", slot);
  236. bm_lockres = lockres_init(mddev, str, NULL, 1);
  237. if (!bm_lockres) {
  238. pr_err("md-cluster: Cannot initialize bitmaps\n");
  239. goto clear_bit;
  240. }
  241. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  242. if (ret) {
  243. pr_err("md-cluster: Could not DLM lock %s: %d\n",
  244. str, ret);
  245. goto clear_bit;
  246. }
  247. ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
  248. if (ret) {
  249. pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
  250. goto dlm_unlock;
  251. }
  252. if (hi > 0) {
  253. if (lo < mddev->recovery_cp)
  254. mddev->recovery_cp = lo;
  255. /* wake up thread to continue resync in case resync
  256. * is not finished */
  257. if (mddev->recovery_cp != MaxSector) {
  258. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  259. md_wakeup_thread(mddev->thread);
  260. }
  261. }
  262. dlm_unlock:
  263. dlm_unlock_sync(bm_lockres);
  264. clear_bit:
  265. lockres_free(bm_lockres);
  266. clear_bit(slot, &cinfo->recovery_map);
  267. }
  268. }
  269. static void recover_prep(void *arg)
  270. {
  271. struct mddev *mddev = arg;
  272. struct md_cluster_info *cinfo = mddev->cluster_info;
  273. set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  274. }
  275. static void __recover_slot(struct mddev *mddev, int slot)
  276. {
  277. struct md_cluster_info *cinfo = mddev->cluster_info;
  278. set_bit(slot, &cinfo->recovery_map);
  279. if (!cinfo->recovery_thread) {
  280. cinfo->recovery_thread = md_register_thread(recover_bitmaps,
  281. mddev, "recover");
  282. if (!cinfo->recovery_thread) {
  283. pr_warn("md-cluster: Could not create recovery thread\n");
  284. return;
  285. }
  286. }
  287. md_wakeup_thread(cinfo->recovery_thread);
  288. }
  289. static void recover_slot(void *arg, struct dlm_slot *slot)
  290. {
  291. struct mddev *mddev = arg;
  292. struct md_cluster_info *cinfo = mddev->cluster_info;
  293. pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
  294. mddev->bitmap_info.cluster_name,
  295. slot->nodeid, slot->slot,
  296. cinfo->slot_number);
  297. /* deduct one since dlm slot starts from one while the num of
  298. * cluster-md begins with 0 */
  299. __recover_slot(mddev, slot->slot - 1);
  300. }
  301. static void recover_done(void *arg, struct dlm_slot *slots,
  302. int num_slots, int our_slot,
  303. uint32_t generation)
  304. {
  305. struct mddev *mddev = arg;
  306. struct md_cluster_info *cinfo = mddev->cluster_info;
  307. cinfo->slot_number = our_slot;
  308. /* completion is only need to be complete when node join cluster,
  309. * it doesn't need to run during another node's failure */
  310. if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
  311. complete(&cinfo->completion);
  312. clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  313. }
  314. clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  315. }
  316. /* the ops is called when node join the cluster, and do lock recovery
  317. * if node failure occurs */
  318. static const struct dlm_lockspace_ops md_ls_ops = {
  319. .recover_prep = recover_prep,
  320. .recover_slot = recover_slot,
  321. .recover_done = recover_done,
  322. };
  323. /*
  324. * The BAST function for the ack lock resource
  325. * This function wakes up the receive thread in
  326. * order to receive and process the message.
  327. */
  328. static void ack_bast(void *arg, int mode)
  329. {
  330. struct dlm_lock_resource *res = arg;
  331. struct md_cluster_info *cinfo = res->mddev->cluster_info;
  332. if (mode == DLM_LOCK_EX)
  333. md_wakeup_thread(cinfo->recv_thread);
  334. }
  335. static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
  336. {
  337. struct suspend_info *s, *tmp;
  338. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  339. if (slot == s->slot) {
  340. list_del(&s->list);
  341. kfree(s);
  342. break;
  343. }
  344. }
  345. static void remove_suspend_info(struct mddev *mddev, int slot)
  346. {
  347. struct md_cluster_info *cinfo = mddev->cluster_info;
  348. spin_lock_irq(&cinfo->suspend_lock);
  349. __remove_suspend_info(cinfo, slot);
  350. spin_unlock_irq(&cinfo->suspend_lock);
  351. mddev->pers->quiesce(mddev, 2);
  352. }
  353. static void process_suspend_info(struct mddev *mddev,
  354. int slot, sector_t lo, sector_t hi)
  355. {
  356. struct md_cluster_info *cinfo = mddev->cluster_info;
  357. struct suspend_info *s;
  358. if (!hi) {
  359. remove_suspend_info(mddev, slot);
  360. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  361. md_wakeup_thread(mddev->thread);
  362. return;
  363. }
  364. /*
  365. * The bitmaps are not same for different nodes
  366. * if RESYNCING is happening in one node, then
  367. * the node which received the RESYNCING message
  368. * probably will perform resync with the region
  369. * [lo, hi] again, so we could reduce resync time
  370. * a lot if we can ensure that the bitmaps among
  371. * different nodes are match up well.
  372. *
  373. * sync_low/hi is used to record the region which
  374. * arrived in the previous RESYNCING message,
  375. *
  376. * Call bitmap_sync_with_cluster to clear
  377. * NEEDED_MASK and set RESYNC_MASK since
  378. * resync thread is running in another node,
  379. * so we don't need to do the resync again
  380. * with the same section */
  381. bitmap_sync_with_cluster(mddev, cinfo->sync_low,
  382. cinfo->sync_hi,
  383. lo, hi);
  384. cinfo->sync_low = lo;
  385. cinfo->sync_hi = hi;
  386. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  387. if (!s)
  388. return;
  389. s->slot = slot;
  390. s->lo = lo;
  391. s->hi = hi;
  392. mddev->pers->quiesce(mddev, 1);
  393. mddev->pers->quiesce(mddev, 0);
  394. spin_lock_irq(&cinfo->suspend_lock);
  395. /* Remove existing entry (if exists) before adding */
  396. __remove_suspend_info(cinfo, slot);
  397. list_add(&s->list, &cinfo->suspend_list);
  398. spin_unlock_irq(&cinfo->suspend_lock);
  399. mddev->pers->quiesce(mddev, 2);
  400. }
  401. static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
  402. {
  403. char disk_uuid[64];
  404. struct md_cluster_info *cinfo = mddev->cluster_info;
  405. char event_name[] = "EVENT=ADD_DEVICE";
  406. char raid_slot[16];
  407. char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
  408. int len;
  409. len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
  410. sprintf(disk_uuid + len, "%pU", cmsg->uuid);
  411. snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
  412. pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
  413. init_completion(&cinfo->newdisk_completion);
  414. set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  415. kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
  416. wait_for_completion_timeout(&cinfo->newdisk_completion,
  417. NEW_DEV_TIMEOUT);
  418. clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  419. }
  420. static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
  421. {
  422. struct md_cluster_info *cinfo = mddev->cluster_info;
  423. mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
  424. set_bit(MD_RELOAD_SB, &mddev->flags);
  425. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  426. md_wakeup_thread(mddev->thread);
  427. }
  428. static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
  429. {
  430. struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
  431. le32_to_cpu(msg->raid_slot));
  432. if (rdev) {
  433. set_bit(ClusterRemove, &rdev->flags);
  434. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  435. md_wakeup_thread(mddev->thread);
  436. }
  437. else
  438. pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
  439. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  440. }
  441. static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
  442. {
  443. struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
  444. le32_to_cpu(msg->raid_slot));
  445. if (rdev && test_bit(Faulty, &rdev->flags))
  446. clear_bit(Faulty, &rdev->flags);
  447. else
  448. pr_warn("%s: %d Could not find disk(%d) which is faulty",
  449. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  450. }
  451. static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
  452. {
  453. if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
  454. "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
  455. return;
  456. switch (le32_to_cpu(msg->type)) {
  457. case METADATA_UPDATED:
  458. process_metadata_update(mddev, msg);
  459. break;
  460. case RESYNCING:
  461. process_suspend_info(mddev, le32_to_cpu(msg->slot),
  462. le64_to_cpu(msg->low),
  463. le64_to_cpu(msg->high));
  464. break;
  465. case NEWDISK:
  466. process_add_new_disk(mddev, msg);
  467. break;
  468. case REMOVE:
  469. process_remove_disk(mddev, msg);
  470. break;
  471. case RE_ADD:
  472. process_readd_disk(mddev, msg);
  473. break;
  474. case BITMAP_NEEDS_SYNC:
  475. __recover_slot(mddev, le32_to_cpu(msg->slot));
  476. break;
  477. default:
  478. pr_warn("%s:%d Received unknown message from %d\n",
  479. __func__, __LINE__, msg->slot);
  480. }
  481. }
  482. /*
  483. * thread for receiving message
  484. */
  485. static void recv_daemon(struct md_thread *thread)
  486. {
  487. struct md_cluster_info *cinfo = thread->mddev->cluster_info;
  488. struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
  489. struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
  490. struct cluster_msg msg;
  491. int ret;
  492. mutex_lock(&cinfo->recv_mutex);
  493. /*get CR on Message*/
  494. if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
  495. pr_err("md/raid1:failed to get CR on MESSAGE\n");
  496. mutex_unlock(&cinfo->recv_mutex);
  497. return;
  498. }
  499. /* read lvb and wake up thread to process this message_lockres */
  500. memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
  501. process_recvd_msg(thread->mddev, &msg);
  502. /*release CR on ack_lockres*/
  503. ret = dlm_unlock_sync(ack_lockres);
  504. if (unlikely(ret != 0))
  505. pr_info("unlock ack failed return %d\n", ret);
  506. /*up-convert to PR on message_lockres*/
  507. ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
  508. if (unlikely(ret != 0))
  509. pr_info("lock PR on msg failed return %d\n", ret);
  510. /*get CR on ack_lockres again*/
  511. ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
  512. if (unlikely(ret != 0))
  513. pr_info("lock CR on ack failed return %d\n", ret);
  514. /*release CR on message_lockres*/
  515. ret = dlm_unlock_sync(message_lockres);
  516. if (unlikely(ret != 0))
  517. pr_info("unlock msg failed return %d\n", ret);
  518. mutex_unlock(&cinfo->recv_mutex);
  519. }
  520. /* lock_token()
  521. * Takes the lock on the TOKEN lock resource so no other
  522. * node can communicate while the operation is underway.
  523. */
  524. static int lock_token(struct md_cluster_info *cinfo)
  525. {
  526. int error;
  527. error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  528. if (error)
  529. pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
  530. __func__, __LINE__, error);
  531. /* Lock the receive sequence */
  532. mutex_lock(&cinfo->recv_mutex);
  533. return error;
  534. }
  535. /* lock_comm()
  536. * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
  537. */
  538. static int lock_comm(struct md_cluster_info *cinfo)
  539. {
  540. wait_event(cinfo->wait,
  541. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
  542. return lock_token(cinfo);
  543. }
  544. static void unlock_comm(struct md_cluster_info *cinfo)
  545. {
  546. WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
  547. mutex_unlock(&cinfo->recv_mutex);
  548. dlm_unlock_sync(cinfo->token_lockres);
  549. clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
  550. wake_up(&cinfo->wait);
  551. }
  552. /* __sendmsg()
  553. * This function performs the actual sending of the message. This function is
  554. * usually called after performing the encompassing operation
  555. * The function:
  556. * 1. Grabs the message lockresource in EX mode
  557. * 2. Copies the message to the message LVB
  558. * 3. Downconverts message lockresource to CW
  559. * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  560. * and the other nodes read the message. The thread will wait here until all other
  561. * nodes have released ack lock resource.
  562. * 5. Downconvert ack lockresource to CR
  563. */
  564. static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  565. {
  566. int error;
  567. int slot = cinfo->slot_number - 1;
  568. cmsg->slot = cpu_to_le32(slot);
  569. /*get EX on Message*/
  570. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
  571. if (error) {
  572. pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
  573. goto failed_message;
  574. }
  575. memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
  576. sizeof(struct cluster_msg));
  577. /*down-convert EX to CW on Message*/
  578. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
  579. if (error) {
  580. pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
  581. error);
  582. goto failed_ack;
  583. }
  584. /*up-convert CR to EX on Ack*/
  585. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
  586. if (error) {
  587. pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
  588. error);
  589. goto failed_ack;
  590. }
  591. /*down-convert EX to CR on Ack*/
  592. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
  593. if (error) {
  594. pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
  595. error);
  596. goto failed_ack;
  597. }
  598. failed_ack:
  599. error = dlm_unlock_sync(cinfo->message_lockres);
  600. if (unlikely(error != 0)) {
  601. pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
  602. error);
  603. /* in case the message can't be released due to some reason */
  604. goto failed_ack;
  605. }
  606. failed_message:
  607. return error;
  608. }
  609. static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  610. {
  611. int ret;
  612. lock_comm(cinfo);
  613. ret = __sendmsg(cinfo, cmsg);
  614. unlock_comm(cinfo);
  615. return ret;
  616. }
  617. static int gather_all_resync_info(struct mddev *mddev, int total_slots)
  618. {
  619. struct md_cluster_info *cinfo = mddev->cluster_info;
  620. int i, ret = 0;
  621. struct dlm_lock_resource *bm_lockres;
  622. struct suspend_info *s;
  623. char str[64];
  624. sector_t lo, hi;
  625. for (i = 0; i < total_slots; i++) {
  626. memset(str, '\0', 64);
  627. snprintf(str, 64, "bitmap%04d", i);
  628. bm_lockres = lockres_init(mddev, str, NULL, 1);
  629. if (!bm_lockres)
  630. return -ENOMEM;
  631. if (i == (cinfo->slot_number - 1)) {
  632. lockres_free(bm_lockres);
  633. continue;
  634. }
  635. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  636. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  637. if (ret == -EAGAIN) {
  638. memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
  639. s = read_resync_info(mddev, bm_lockres);
  640. if (s) {
  641. pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
  642. __func__, __LINE__,
  643. (unsigned long long) s->lo,
  644. (unsigned long long) s->hi, i);
  645. spin_lock_irq(&cinfo->suspend_lock);
  646. s->slot = i;
  647. list_add(&s->list, &cinfo->suspend_list);
  648. spin_unlock_irq(&cinfo->suspend_lock);
  649. }
  650. ret = 0;
  651. lockres_free(bm_lockres);
  652. continue;
  653. }
  654. if (ret) {
  655. lockres_free(bm_lockres);
  656. goto out;
  657. }
  658. /* Read the disk bitmap sb and check if it needs recovery */
  659. ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
  660. if (ret) {
  661. pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
  662. lockres_free(bm_lockres);
  663. continue;
  664. }
  665. if ((hi > 0) && (lo < mddev->recovery_cp)) {
  666. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  667. mddev->recovery_cp = lo;
  668. md_check_recovery(mddev);
  669. }
  670. dlm_unlock_sync(bm_lockres);
  671. lockres_free(bm_lockres);
  672. }
  673. out:
  674. return ret;
  675. }
  676. static int join(struct mddev *mddev, int nodes)
  677. {
  678. struct md_cluster_info *cinfo;
  679. int ret, ops_rv;
  680. char str[64];
  681. cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
  682. if (!cinfo)
  683. return -ENOMEM;
  684. INIT_LIST_HEAD(&cinfo->suspend_list);
  685. spin_lock_init(&cinfo->suspend_lock);
  686. init_completion(&cinfo->completion);
  687. set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  688. init_waitqueue_head(&cinfo->wait);
  689. mutex_init(&cinfo->recv_mutex);
  690. mddev->cluster_info = cinfo;
  691. memset(str, 0, 64);
  692. sprintf(str, "%pU", mddev->uuid);
  693. ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
  694. DLM_LSFL_FS, LVB_SIZE,
  695. &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
  696. if (ret)
  697. goto err;
  698. wait_for_completion(&cinfo->completion);
  699. if (nodes < cinfo->slot_number) {
  700. pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
  701. cinfo->slot_number, nodes);
  702. ret = -ERANGE;
  703. goto err;
  704. }
  705. /* Initiate the communication resources */
  706. ret = -ENOMEM;
  707. cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
  708. if (!cinfo->recv_thread) {
  709. pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
  710. goto err;
  711. }
  712. cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
  713. if (!cinfo->message_lockres)
  714. goto err;
  715. cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
  716. if (!cinfo->token_lockres)
  717. goto err;
  718. cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
  719. if (!cinfo->no_new_dev_lockres)
  720. goto err;
  721. ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  722. if (ret) {
  723. ret = -EAGAIN;
  724. pr_err("md-cluster: can't join cluster to avoid lock issue\n");
  725. goto err;
  726. }
  727. cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
  728. if (!cinfo->ack_lockres)
  729. goto err;
  730. /* get sync CR lock on ACK. */
  731. if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
  732. pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
  733. ret);
  734. dlm_unlock_sync(cinfo->token_lockres);
  735. /* get sync CR lock on no-new-dev. */
  736. if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
  737. pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
  738. pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
  739. snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
  740. cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
  741. if (!cinfo->bitmap_lockres)
  742. goto err;
  743. if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
  744. pr_err("Failed to get bitmap lock\n");
  745. ret = -EINVAL;
  746. goto err;
  747. }
  748. cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
  749. if (!cinfo->resync_lockres)
  750. goto err;
  751. ret = gather_all_resync_info(mddev, nodes);
  752. if (ret)
  753. goto err;
  754. return 0;
  755. err:
  756. md_unregister_thread(&cinfo->recovery_thread);
  757. md_unregister_thread(&cinfo->recv_thread);
  758. lockres_free(cinfo->message_lockres);
  759. lockres_free(cinfo->token_lockres);
  760. lockres_free(cinfo->ack_lockres);
  761. lockres_free(cinfo->no_new_dev_lockres);
  762. lockres_free(cinfo->resync_lockres);
  763. lockres_free(cinfo->bitmap_lockres);
  764. if (cinfo->lockspace)
  765. dlm_release_lockspace(cinfo->lockspace, 2);
  766. mddev->cluster_info = NULL;
  767. kfree(cinfo);
  768. return ret;
  769. }
  770. static void resync_bitmap(struct mddev *mddev)
  771. {
  772. struct md_cluster_info *cinfo = mddev->cluster_info;
  773. struct cluster_msg cmsg = {0};
  774. int err;
  775. cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
  776. err = sendmsg(cinfo, &cmsg);
  777. if (err)
  778. pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
  779. __func__, __LINE__, err);
  780. }
  781. static void unlock_all_bitmaps(struct mddev *mddev);
  782. static int leave(struct mddev *mddev)
  783. {
  784. struct md_cluster_info *cinfo = mddev->cluster_info;
  785. if (!cinfo)
  786. return 0;
  787. /* BITMAP_NEEDS_SYNC message should be sent when node
  788. * is leaving the cluster with dirty bitmap, also we
  789. * can only deliver it when dlm connection is available */
  790. if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
  791. resync_bitmap(mddev);
  792. md_unregister_thread(&cinfo->recovery_thread);
  793. md_unregister_thread(&cinfo->recv_thread);
  794. lockres_free(cinfo->message_lockres);
  795. lockres_free(cinfo->token_lockres);
  796. lockres_free(cinfo->ack_lockres);
  797. lockres_free(cinfo->no_new_dev_lockres);
  798. lockres_free(cinfo->resync_lockres);
  799. lockres_free(cinfo->bitmap_lockres);
  800. unlock_all_bitmaps(mddev);
  801. dlm_release_lockspace(cinfo->lockspace, 2);
  802. return 0;
  803. }
  804. /* slot_number(): Returns the MD slot number to use
  805. * DLM starts the slot numbers from 1, wheras cluster-md
  806. * wants the number to be from zero, so we deduct one
  807. */
  808. static int slot_number(struct mddev *mddev)
  809. {
  810. struct md_cluster_info *cinfo = mddev->cluster_info;
  811. return cinfo->slot_number - 1;
  812. }
  813. /*
  814. * Check if the communication is already locked, else lock the communication
  815. * channel.
  816. * If it is already locked, token is in EX mode, and hence lock_token()
  817. * should not be called.
  818. */
  819. static int metadata_update_start(struct mddev *mddev)
  820. {
  821. struct md_cluster_info *cinfo = mddev->cluster_info;
  822. wait_event(cinfo->wait,
  823. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
  824. test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
  825. /* If token is already locked, return 0 */
  826. if (cinfo->token_lockres->mode == DLM_LOCK_EX)
  827. return 0;
  828. return lock_token(cinfo);
  829. }
  830. static int metadata_update_finish(struct mddev *mddev)
  831. {
  832. struct md_cluster_info *cinfo = mddev->cluster_info;
  833. struct cluster_msg cmsg;
  834. struct md_rdev *rdev;
  835. int ret = 0;
  836. int raid_slot = -1;
  837. memset(&cmsg, 0, sizeof(cmsg));
  838. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  839. /* Pick up a good active device number to send.
  840. */
  841. rdev_for_each(rdev, mddev)
  842. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  843. raid_slot = rdev->desc_nr;
  844. break;
  845. }
  846. if (raid_slot >= 0) {
  847. cmsg.raid_slot = cpu_to_le32(raid_slot);
  848. ret = __sendmsg(cinfo, &cmsg);
  849. } else
  850. pr_warn("md-cluster: No good device id found to send\n");
  851. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  852. unlock_comm(cinfo);
  853. return ret;
  854. }
  855. static void metadata_update_cancel(struct mddev *mddev)
  856. {
  857. struct md_cluster_info *cinfo = mddev->cluster_info;
  858. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  859. unlock_comm(cinfo);
  860. }
  861. static int resync_start(struct mddev *mddev)
  862. {
  863. struct md_cluster_info *cinfo = mddev->cluster_info;
  864. return dlm_lock_sync(cinfo->resync_lockres, DLM_LOCK_EX);
  865. }
  866. static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
  867. {
  868. struct md_cluster_info *cinfo = mddev->cluster_info;
  869. struct resync_info ri;
  870. struct cluster_msg cmsg = {0};
  871. /* do not send zero again, if we have sent before */
  872. if (hi == 0) {
  873. memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  874. if (le64_to_cpu(ri.hi) == 0)
  875. return 0;
  876. }
  877. add_resync_info(cinfo->bitmap_lockres, lo, hi);
  878. /* Re-acquire the lock to refresh LVB */
  879. dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
  880. cmsg.type = cpu_to_le32(RESYNCING);
  881. cmsg.low = cpu_to_le64(lo);
  882. cmsg.high = cpu_to_le64(hi);
  883. return sendmsg(cinfo, &cmsg);
  884. }
  885. static int resync_finish(struct mddev *mddev)
  886. {
  887. struct md_cluster_info *cinfo = mddev->cluster_info;
  888. dlm_unlock_sync(cinfo->resync_lockres);
  889. return resync_info_update(mddev, 0, 0);
  890. }
  891. static int area_resyncing(struct mddev *mddev, int direction,
  892. sector_t lo, sector_t hi)
  893. {
  894. struct md_cluster_info *cinfo = mddev->cluster_info;
  895. int ret = 0;
  896. struct suspend_info *s;
  897. if ((direction == READ) &&
  898. test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
  899. return 1;
  900. spin_lock_irq(&cinfo->suspend_lock);
  901. if (list_empty(&cinfo->suspend_list))
  902. goto out;
  903. list_for_each_entry(s, &cinfo->suspend_list, list)
  904. if (hi > s->lo && lo < s->hi) {
  905. ret = 1;
  906. break;
  907. }
  908. out:
  909. spin_unlock_irq(&cinfo->suspend_lock);
  910. return ret;
  911. }
  912. /* add_new_disk() - initiates a disk add
  913. * However, if this fails before writing md_update_sb(),
  914. * add_new_disk_cancel() must be called to release token lock
  915. */
  916. static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
  917. {
  918. struct md_cluster_info *cinfo = mddev->cluster_info;
  919. struct cluster_msg cmsg;
  920. int ret = 0;
  921. struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
  922. char *uuid = sb->device_uuid;
  923. memset(&cmsg, 0, sizeof(cmsg));
  924. cmsg.type = cpu_to_le32(NEWDISK);
  925. memcpy(cmsg.uuid, uuid, 16);
  926. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  927. lock_comm(cinfo);
  928. ret = __sendmsg(cinfo, &cmsg);
  929. if (ret)
  930. return ret;
  931. cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
  932. ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
  933. cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
  934. /* Some node does not "see" the device */
  935. if (ret == -EAGAIN)
  936. ret = -ENOENT;
  937. if (ret)
  938. unlock_comm(cinfo);
  939. else {
  940. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  941. /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
  942. * will run soon after add_new_disk, the below path will be
  943. * invoked:
  944. * md_wakeup_thread(mddev->thread)
  945. * -> conf->thread (raid1d)
  946. * -> md_check_recovery -> md_update_sb
  947. * -> metadata_update_start/finish
  948. * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
  949. *
  950. * For other failure cases, metadata_update_cancel and
  951. * add_new_disk_cancel also clear below bit as well.
  952. * */
  953. set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  954. wake_up(&cinfo->wait);
  955. }
  956. return ret;
  957. }
  958. static void add_new_disk_cancel(struct mddev *mddev)
  959. {
  960. struct md_cluster_info *cinfo = mddev->cluster_info;
  961. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  962. unlock_comm(cinfo);
  963. }
  964. static int new_disk_ack(struct mddev *mddev, bool ack)
  965. {
  966. struct md_cluster_info *cinfo = mddev->cluster_info;
  967. if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
  968. pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
  969. return -EINVAL;
  970. }
  971. if (ack)
  972. dlm_unlock_sync(cinfo->no_new_dev_lockres);
  973. complete(&cinfo->newdisk_completion);
  974. return 0;
  975. }
  976. static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
  977. {
  978. struct cluster_msg cmsg = {0};
  979. struct md_cluster_info *cinfo = mddev->cluster_info;
  980. cmsg.type = cpu_to_le32(REMOVE);
  981. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  982. return sendmsg(cinfo, &cmsg);
  983. }
  984. static int lock_all_bitmaps(struct mddev *mddev)
  985. {
  986. int slot, my_slot, ret, held = 1, i = 0;
  987. char str[64];
  988. struct md_cluster_info *cinfo = mddev->cluster_info;
  989. cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
  990. sizeof(struct dlm_lock_resource *),
  991. GFP_KERNEL);
  992. if (!cinfo->other_bitmap_lockres) {
  993. pr_err("md: can't alloc mem for other bitmap locks\n");
  994. return 0;
  995. }
  996. my_slot = slot_number(mddev);
  997. for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
  998. if (slot == my_slot)
  999. continue;
  1000. memset(str, '\0', 64);
  1001. snprintf(str, 64, "bitmap%04d", slot);
  1002. cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
  1003. if (!cinfo->other_bitmap_lockres[i])
  1004. return -ENOMEM;
  1005. cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
  1006. ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
  1007. if (ret)
  1008. held = -1;
  1009. i++;
  1010. }
  1011. return held;
  1012. }
  1013. static void unlock_all_bitmaps(struct mddev *mddev)
  1014. {
  1015. struct md_cluster_info *cinfo = mddev->cluster_info;
  1016. int i;
  1017. /* release other node's bitmap lock if they are existed */
  1018. if (cinfo->other_bitmap_lockres) {
  1019. for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
  1020. if (cinfo->other_bitmap_lockres[i]) {
  1021. dlm_unlock_sync(cinfo->other_bitmap_lockres[i]);
  1022. lockres_free(cinfo->other_bitmap_lockres[i]);
  1023. }
  1024. }
  1025. kfree(cinfo->other_bitmap_lockres);
  1026. }
  1027. }
  1028. static int gather_bitmaps(struct md_rdev *rdev)
  1029. {
  1030. int sn, err;
  1031. sector_t lo, hi;
  1032. struct cluster_msg cmsg = {0};
  1033. struct mddev *mddev = rdev->mddev;
  1034. struct md_cluster_info *cinfo = mddev->cluster_info;
  1035. cmsg.type = cpu_to_le32(RE_ADD);
  1036. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1037. err = sendmsg(cinfo, &cmsg);
  1038. if (err)
  1039. goto out;
  1040. for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
  1041. if (sn == (cinfo->slot_number - 1))
  1042. continue;
  1043. err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
  1044. if (err) {
  1045. pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
  1046. goto out;
  1047. }
  1048. if ((hi > 0) && (lo < mddev->recovery_cp))
  1049. mddev->recovery_cp = lo;
  1050. }
  1051. out:
  1052. return err;
  1053. }
  1054. static struct md_cluster_operations cluster_ops = {
  1055. .join = join,
  1056. .leave = leave,
  1057. .slot_number = slot_number,
  1058. .resync_start = resync_start,
  1059. .resync_finish = resync_finish,
  1060. .resync_info_update = resync_info_update,
  1061. .metadata_update_start = metadata_update_start,
  1062. .metadata_update_finish = metadata_update_finish,
  1063. .metadata_update_cancel = metadata_update_cancel,
  1064. .area_resyncing = area_resyncing,
  1065. .add_new_disk = add_new_disk,
  1066. .add_new_disk_cancel = add_new_disk_cancel,
  1067. .new_disk_ack = new_disk_ack,
  1068. .remove_disk = remove_disk,
  1069. .gather_bitmaps = gather_bitmaps,
  1070. .lock_all_bitmaps = lock_all_bitmaps,
  1071. .unlock_all_bitmaps = unlock_all_bitmaps,
  1072. };
  1073. static int __init cluster_init(void)
  1074. {
  1075. pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
  1076. pr_info("Registering Cluster MD functions\n");
  1077. register_md_cluster_operations(&cluster_ops, THIS_MODULE);
  1078. return 0;
  1079. }
  1080. static void cluster_exit(void)
  1081. {
  1082. unregister_md_cluster_operations();
  1083. }
  1084. module_init(cluster_init);
  1085. module_exit(cluster_exit);
  1086. MODULE_AUTHOR("SUSE");
  1087. MODULE_LICENSE("GPL");
  1088. MODULE_DESCRIPTION("Clustering support for MD");