소스 검색

Merge tag 'md/4.5' of git://neil.brown.name/md

Pull md updates from Neil Brown:
 "Mostly clustered-raid1 and raid5 journal updates.  one Y2038 fix and
  other minor stuff.

  One patch removes me from the MAINTAINERS file and adds a record of my
  md maintainership to Credits"

Many thanks to Neil, who has been around for a _looong_ time.

* tag 'md/4.5' of git://neil.brown.name/md: (26 commits)
  md/raid: only permit hot-add of compatible integrity profiles
  Remove myself as MD Maintainer, and add to Credits.
  raid5-cache: handle journal hotadd in quiesce
  MD: add journal with array suspended
  md: set MD_HAS_JOURNAL in correct places
  md: Remove 'ready' field from mddev.
  md: remove unnecesary md_new_event_inintr
  raid5: allow r5l_io_unit allocations to fail
  raid5-cache: use a mempool for the metadata block
  raid5-cache: use a bio_set
  raid5-cache: add journal hot add/remove support
  drivers: md: use ktime_get_real_seconds()
  md: avoid warning for 32-bit sector_t
  raid5-cache: free meta_page earlier
  raid5-cache: simplify r5l_move_io_unit_list
  md: update comment for md_allow_write
  md-cluster: update comments for MD_CLUSTER_SEND_LOCKED_ALREADY
  md-cluster: Protect communication with mutexes
  md-cluster: Defer MD reloading to mddev->thread
  md-cluster: update the documentation
  ...
Linus Torvalds 9 년 전
부모
커밋
3c28c9ccaf
13개의 변경된 파일649개의 추가작업 그리고 231개의 파일을 삭제
  1. 1 0
      CREDITS
  2. 228 86
      Documentation/md-cluster.txt
  3. 0 1
      MAINTAINERS
  4. 150 14
      drivers/md/md-cluster.c
  5. 2 0
      drivers/md/md-cluster.h
  6. 105 66
      drivers/md/md.c
  7. 7 4
      drivers/md/md.h
  8. 3 3
      drivers/md/multipath.c
  9. 3 3
      drivers/md/raid1.c
  10. 3 3
      drivers/md/raid10.c
  11. 119 39
      drivers/md/raid5-cache.c
  12. 26 10
      drivers/md/raid5.c
  13. 2 2
      include/uapi/linux/raid/md_u.h

+ 1 - 0
CREDITS

@@ -534,6 +534,7 @@ N: NeilBrown
 E: neil@brown.name
 P: 4096R/566281B9 1BC6 29EB D390 D870 7B5F  497A 39EC 9EDD 5662 81B9
 D: NFSD Maintainer 2000-2007
+D: MD Maintainer 2001-2016
 
 N: Zach Brown
 E: zab@zabbo.net

+ 228 - 86
Documentation/md-cluster.txt

@@ -3,7 +3,7 @@ The cluster MD is a shared-device RAID for a cluster.
 
 1. On-disk format
 
-Separate write-intent-bitmap are used for each cluster node.
+Separate write-intent-bitmaps are used for each cluster node.
 The bitmaps record all writes that may have been started on that node,
 and may not yet have finished. The on-disk layout is:
 
@@ -14,117 +14,161 @@ and may not yet have finished. The on-disk layout is:
 | bm super[2] + bits  | bm bits [2, contd]  | bm super[3] + bits  |
 | bm bits [3, contd]  |                     |                     |
 
-During "normal" functioning we assume the filesystem ensures that only one
-node writes to any given block at a time, so a write
-request will
+During "normal" functioning we assume the filesystem ensures that only
+one node writes to any given block at a time, so a write request will
+
  - set the appropriate bit (if not already set)
  - commit the write to all mirrors
  - schedule the bit to be cleared after a timeout.
 
-Reads are just handled normally.  It is up to the filesystem to
-ensure one node doesn't read from a location where another node (or the same
+Reads are just handled normally. It is up to the filesystem to ensure
+one node doesn't read from a location where another node (or the same
 node) is writing.
 
 
 2. DLM Locks for management
 
-There are two locks for managing the device:
+There are three groups of locks for managing the device:
 
 2.1 Bitmap lock resource (bm_lockres)
 
- The bm_lockres protects individual node bitmaps. They are named in the
- form bitmap001 for node 1, bitmap002 for node and so on. When a node
- joins the cluster, it acquires the lock in PW mode and it stays so
- during the lifetime the node is part of the cluster. The lock resource
- number is based on the slot number returned by the DLM subsystem. Since
- DLM starts node count from one and bitmap slots start from zero, one is
- subtracted from the DLM slot number to arrive at the bitmap slot number.
+ The bm_lockres protects individual node bitmaps. They are named in
+ the form bitmap000 for node 1, bitmap001 for node 2 and so on. When a
+ node joins the cluster, it acquires the lock in PW mode and it stays
+ so during the lifetime the node is part of the cluster. The lock
+ resource number is based on the slot number returned by the DLM
+ subsystem. Since DLM starts node count from one and bitmap slots
+ start from zero, one is subtracted from the DLM slot number to arrive
+ at the bitmap slot number.
+
+ The LVB of the bitmap lock for a particular node records the range
+ of sectors that are being re-synced by that node.  No other
+ node may write to those sectors.  This is used when a new nodes
+ joins the cluster.
+
+2.2 Message passing locks
+
+ Each node has to communicate with other nodes when starting or ending
+ resync, and for metadata superblock updates.  This communication is
+ managed through three locks: "token", "message", and "ack", together
+ with the Lock Value Block (LVB) of one of the "message" lock.
+
+2.3 new-device management
+
+ A single lock: "no-new-dev" is used to co-ordinate the addition of
+ new devices - this must be synchronized across the array.
+ Normally all nodes hold a concurrent-read lock on this device.
 
 3. Communication
 
-Each node has to communicate with other nodes when starting or ending
-resync, and metadata superblock updates.
+ Messages can be broadcast to all nodes, and the sender waits for all
+ other nodes to acknowledge the message before proceeding.  Only one
+ message can be processed at a time.
 
 3.1 Message Types
 
- There are 3 types, of messages which are passed
+ There are six types of messages which are passed:
 
- 3.1.1 METADATA_UPDATED: informs other nodes that the metadata has been
-   updated, and the node must re-read the md superblock. This is performed
-   synchronously.
+ 3.1.1 METADATA_UPDATED: informs other nodes that the metadata has
+   been updated, and the node must re-read the md superblock. This is
+   performed synchronously. It is primarily used to signal device
+   failure.
 
- 3.1.2 RESYNC: informs other nodes that a resync is initiated or ended
-   so that each node may suspend or resume the region.
+ 3.1.2 RESYNCING: informs other nodes that a resync is initiated or
+   ended so that each node may suspend or resume the region.  Each
+   RESYNCING message identifies a range of the devices that the
+   sending node is about to resync. This over-rides any pervious
+   notification from that node: only one ranged can be resynced at a
+   time per-node.
+
+ 3.1.3 NEWDISK: informs other nodes that a device is being added to
+   the array. Message contains an identifier for that device.  See
+   below for further details.
+
+ 3.1.4 REMOVE: A failed or spare device is being removed from the
+   array. The slot-number of the device is included in the message.
+
+ 3.1.5 RE_ADD: A failed device is being re-activated - the assumption
+   is that it has been determined to be working again.
+
+ 3.1.6 BITMAP_NEEDS_SYNC: if a node is stopped locally but the bitmap
+   isn't clean, then another node is informed to take the ownership of
+   resync.
 
 3.2 Communication mechanism
 
  The DLM LVB is used to communicate within nodes of the cluster. There
  are three resources used for the purpose:
 
-  3.2.1 Token: The resource which protects the entire communication
+  3.2.1 token: The resource which protects the entire communication
    system. The node having the token resource is allowed to
    communicate.
 
-  3.2.2 Message: The lock resource which carries the data to
+  3.2.2 message: The lock resource which carries the data to
    communicate.
 
-  3.2.3 Ack: The resource, acquiring which means the message has been
+  3.2.3 ack: The resource, acquiring which means the message has been
    acknowledged by all nodes in the cluster. The BAST of the resource
-   is used to inform the receive node that a node wants to communicate.
+   is used to inform the receiving node that a node wants to
+   communicate.
 
 The algorithm is:
 
- 1. receive status
+ 1. receive status - all nodes have concurrent-reader lock on "ack".
 
-   sender                         receiver                   receiver
-   ACK:CR                          ACK:CR                     ACK:CR
+   sender                         receiver                 receiver
+   "ack":CR                       "ack":CR                 "ack":CR
 
- 2. sender get EX of TOKEN
-    sender get EX of MESSAGE
+ 2. sender get EX on "token"
+    sender get EX on "message"
     sender                        receiver                 receiver
-    TOKEN:EX                       ACK:CR                   ACK:CR
-    MESSAGE:EX
-    ACK:CR
+    "token":EX                    "ack":CR                 "ack":CR
+    "message":EX
+    "ack":CR
 
-    Sender checks that it still needs to send a message. Messages received
-    or other events that happened while waiting for the TOKEN may have made
-    this message inappropriate or redundant.
+    Sender checks that it still needs to send a message. Messages
+    received or other events that happened while waiting for the
+    "token" may have made this message inappropriate or redundant.
 
- 3. sender write LVB.
-    sender down-convert MESSAGE from EX to CW
-    sender try to get EX of ACK
-    [ wait until all receiver has *processed* the MESSAGE ]
+ 3. sender writes LVB.
+    sender down-convert "message" from EX to CW
+    sender try to get EX of "ack"
+    [ wait until all receivers have *processed* the "message" ]
 
-                                     [ triggered by bast of ACK ]
-                                     receiver get CR of MESSAGE
+                                     [ triggered by bast of "ack" ]
+                                     receiver get CR on "message"
                                      receiver read LVB
                                      receiver processes the message
                                      [ wait finish ]
-                                     receiver release ACK
-
-   sender                         receiver                   receiver
-   TOKEN:EX                       MESSAGE:CR                 MESSAGE:CR
-   MESSAGE:CR
-   ACK:EX
-
- 4. triggered by grant of EX on ACK (indicating all receivers have processed
-    message)
-    sender down-convert ACK from EX to CR
-    sender release MESSAGE
-    sender release TOKEN
-                               receiver upconvert to PR of MESSAGE
-                               receiver get CR of ACK
-                               receiver release MESSAGE
+                                     receiver releases "ack"
+                                     receiver tries to get PR on "message"
+
+   sender                         receiver                  receiver
+   "token":EX                     "message":CR              "message":CR
+   "message":CW
+   "ack":EX
+
+ 4. triggered by grant of EX on "ack" (indicating all receivers
+    have processed message)
+    sender down-converts "ack" from EX to CR
+    sender releases "message"
+    sender releases "token"
+                               receiver upconvert to PR on "message"
+                               receiver get CR of "ack"
+                               receiver release "message"
 
    sender                      receiver                   receiver
-   ACK:CR                       ACK:CR                     ACK:CR
+   "ack":CR                    "ack":CR                   "ack":CR
 
 
 4. Handling Failures
 
 4.1 Node Failure
- When a node fails, the DLM informs the cluster with the slot. The node
- starts a cluster recovery thread. The cluster recovery thread:
+
+ When a node fails, the DLM informs the cluster with the slot
+ number. The node starts a cluster recovery thread. The cluster
+ recovery thread:
+
 	- acquires the bitmap<number> lock of the failed node
 	- opens the bitmap
 	- reads the bitmap of the failed node
@@ -132,45 +176,143 @@ The algorithm is:
 	- cleans the bitmap of the failed node
 	- releases bitmap<number> lock of the failed node
 	- initiates resync of the bitmap on the current node
+		md_check_recovery is invoked within recover_bitmaps,
+		then md_check_recovery -> metadata_update_start/finish,
+		it will lock the communication by lock_comm.
+		Which means when one node is resyncing it blocks all
+		other nodes from writing anywhere on the array.
 
- The resync process, is the regular md resync. However, in a clustered
+ The resync process is the regular md resync. However, in a clustered
  environment when a resync is performed, it needs to tell other nodes
  of the areas which are suspended. Before a resync starts, the node
- send out RESYNC_START with the (lo,hi) range of the area which needs
- to be suspended. Each node maintains a suspend_list, which contains
- the list  of ranges which are currently suspended. On receiving
- RESYNC_START, the node adds the range to the suspend_list. Similarly,
- when the node performing resync finishes, it send RESYNC_FINISHED
- to other nodes and other nodes remove the corresponding entry from
- the suspend_list.
+ send out RESYNCING with the (lo,hi) range of the area which needs to
+ be suspended. Each node maintains a suspend_list, which contains the
+ list of ranges which are currently suspended. On receiving RESYNCING,
+ the node adds the range to the suspend_list. Similarly, when the node
+ performing resync finishes, it sends RESYNCING with an empty range to
+ other nodes and other nodes remove the corresponding entry from the
+ suspend_list.
 
- A helper function, should_suspend() can be used to check if a particular
- I/O range should be suspended or not.
+ A helper function, ->area_resyncing() can be used to check if a
+ particular I/O range should be suspended or not.
 
 4.2 Device Failure
+
  Device failures are handled and communicated with the metadata update
- routine.
+ routine.  When a node detects a device failure it does not allow
+ any further writes to that device until the failure has been
+ acknowledged by all other nodes.
 
 5. Adding a new Device
-For adding a new device, it is necessary that all nodes "see" the new device
-to be added. For this, the following algorithm is used:
+
+ For adding a new device, it is necessary that all nodes "see" the new
+ device to be added. For this, the following algorithm is used:
 
     1. Node 1 issues mdadm --manage /dev/mdX --add /dev/sdYY which issues
-       ioctl(ADD_NEW_DISC with disc.state set to MD_DISK_CLUSTER_ADD)
-    2. Node 1 sends NEWDISK with uuid and slot number
+       ioctl(ADD_NEW_DISK with disc.state set to MD_DISK_CLUSTER_ADD)
+    2. Node 1 sends a NEWDISK message with uuid and slot number
     3. Other nodes issue kobject_uevent_env with uuid and slot number
        (Steps 4,5 could be a udev rule)
     4. In userspace, the node searches for the disk, perhaps
        using blkid -t SUB_UUID=""
-    5. Other nodes issue either of the following depending on whether the disk
-       was found:
+    5. Other nodes issue either of the following depending on whether
+       the disk was found:
        ioctl(ADD_NEW_DISK with disc.state set to MD_DISK_CANDIDATE and
-                disc.number set to slot number)
+             disc.number set to slot number)
        ioctl(CLUSTERED_DISK_NACK)
-    6. Other nodes drop lock on no-new-devs (CR) if device is found
-    7. Node 1 attempts EX lock on no-new-devs
-    8. If node 1 gets the lock, it sends METADATA_UPDATED after unmarking the disk
-       as SpareLocal
-    9. If not (get no-new-dev lock), it fails the operation and sends METADATA_UPDATED
-    10. Other nodes get the information whether a disk is added or not
-	by the following METADATA_UPDATED.
+    6. Other nodes drop lock on "no-new-devs" (CR) if device is found
+    7. Node 1 attempts EX lock on "no-new-dev"
+    8. If node 1 gets the lock, it sends METADATA_UPDATED after
+       unmarking the disk as SpareLocal
+    9. If not (get "no-new-dev" lock), it fails the operation and sends
+       METADATA_UPDATED.
+   10. Other nodes get the information whether a disk is added or not
+       by the following METADATA_UPDATED.
+
+6. Module interface.
+
+ There are 17 call-backs which the md core can make to the cluster
+ module.  Understanding these can give a good overview of the whole
+ process.
+
+6.1 join(nodes) and leave()
+
+ These are called when an array is started with a clustered bitmap,
+ and when the array is stopped.  join() ensures the cluster is
+ available and initializes the various resources.
+ Only the first 'nodes' nodes in the cluster can use the array.
+
+6.2 slot_number()
+
+ Reports the slot number advised by the cluster infrastructure.
+ Range is from 0 to nodes-1.
+
+6.3 resync_info_update()
+
+ This updates the resync range that is stored in the bitmap lock.
+ The starting point is updated as the resync progresses.  The
+ end point is always the end of the array.
+ It does *not* send a RESYNCING message.
+
+6.4 resync_start(), resync_finish()
+
+ These are called when resync/recovery/reshape starts or stops.
+ They update the resyncing range in the bitmap lock and also
+ send a RESYNCING message.  resync_start reports the whole
+ array as resyncing, resync_finish reports none of it.
+
+ resync_finish() also sends a BITMAP_NEEDS_SYNC message which
+ allows some other node to take over.
+
+6.5 metadata_update_start(), metadata_update_finish(),
+    metadata_update_cancel().
+
+ metadata_update_start is used to get exclusive access to
+ the metadata.  If a change is still needed once that access is
+ gained, metadata_update_finish() will send a METADATA_UPDATE
+ message to all other nodes, otherwise metadata_update_cancel()
+ can be used to release the lock.
+
+6.6 area_resyncing()
+
+ This combines two elements of functionality.
+
+ Firstly, it will check if any node is currently resyncing
+ anything in a given range of sectors.  If any resync is found,
+ then the caller will avoid writing or read-balancing in that
+ range.
+
+ Secondly, while node recovery is happening it reports that
+ all areas are resyncing for READ requests.  This avoids races
+ between the cluster-filesystem and the cluster-RAID handling
+ a node failure.
+
+6.7 add_new_disk_start(), add_new_disk_finish(), new_disk_ack()
+
+ These are used to manage the new-disk protocol described above.
+ When a new device is added, add_new_disk_start() is called before
+ it is bound to the array and, if that succeeds, add_new_disk_finish()
+ is called the device is fully added.
+
+ When a device is added in acknowledgement to a previous
+ request, or when the device is declared "unavailable",
+ new_disk_ack() is called.
+
+6.8 remove_disk()
+
+ This is called when a spare or failed device is removed from
+ the array.  It causes a REMOVE message to be send to other nodes.
+
+6.9 gather_bitmaps()
+
+ This sends a RE_ADD message to all other nodes and then
+ gathers bitmap information from all bitmaps.  This combined
+ bitmap is then used to recovery the re-added device.
+
+6.10 lock_all_bitmaps() and unlock_all_bitmaps()
+
+ These are called when change bitmap to none. If a node plans
+ to clear the cluster raid's bitmap, it need to make sure no other
+ nodes are using the raid which is achieved by lock all bitmap
+ locks within the cluster, and also those locks are unlocked
+ accordingly.

+ 0 - 1
MAINTAINERS

@@ -9999,7 +9999,6 @@ S:	Supported
 F:	drivers/media/pci/solo6x10/
 
 SOFTWARE RAID (Multiple Disks) SUPPORT
-M:	Neil Brown <neilb@suse.com>
 L:	linux-raid@vger.kernel.org
 S:	Supported
 F:	drivers/md/

+ 150 - 14
drivers/md/md-cluster.c

@@ -48,13 +48,29 @@ struct resync_info {
 #define		MD_CLUSTER_SUSPEND_READ_BALANCING	2
 #define		MD_CLUSTER_BEGIN_JOIN_CLUSTER		3
 
+/* Lock the send communication. This is done through
+ * bit manipulation as opposed to a mutex in order to
+ * accomodate lock and hold. See next comment.
+ */
+#define		MD_CLUSTER_SEND_LOCK			4
+/* If cluster operations (such as adding a disk) must lock the
+ * communication channel, so as to perform extra operations
+ * (update metadata) and no other operation is allowed on the
+ * MD. Token needs to be locked and held until the operation
+ * completes witha md_update_sb(), which would eventually release
+ * the lock.
+ */
+#define		MD_CLUSTER_SEND_LOCKED_ALREADY		5
+
 
 struct md_cluster_info {
 	/* dlm lock space and resources for clustered raid. */
 	dlm_lockspace_t *lockspace;
 	int slot_number;
 	struct completion completion;
+	struct mutex recv_mutex;
 	struct dlm_lock_resource *bitmap_lockres;
+	struct dlm_lock_resource **other_bitmap_lockres;
 	struct dlm_lock_resource *resync_lockres;
 	struct list_head suspend_list;
 	spinlock_t suspend_lock;
@@ -67,6 +83,7 @@ struct md_cluster_info {
 	struct dlm_lock_resource *no_new_dev_lockres;
 	struct md_thread *recv_thread;
 	struct completion newdisk_completion;
+	wait_queue_head_t wait;
 	unsigned long state;
 };
 
@@ -431,8 +448,10 @@ static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
 {
 	struct md_cluster_info *cinfo = mddev->cluster_info;
-	md_reload_sb(mddev, le32_to_cpu(msg->raid_slot));
+	mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
+	set_bit(MD_RELOAD_SB, &mddev->flags);
 	dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
+	md_wakeup_thread(mddev->thread);
 }
 
 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
@@ -440,8 +459,11 @@ static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
 	struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
 						   le32_to_cpu(msg->raid_slot));
 
-	if (rdev)
-		md_kick_rdev_from_array(rdev);
+	if (rdev) {
+		set_bit(ClusterRemove, &rdev->flags);
+		set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
+		md_wakeup_thread(mddev->thread);
+	}
 	else
 		pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
 			__func__, __LINE__, le32_to_cpu(msg->raid_slot));
@@ -502,9 +524,11 @@ static void recv_daemon(struct md_thread *thread)
 	struct cluster_msg msg;
 	int ret;
 
+	mutex_lock(&cinfo->recv_mutex);
 	/*get CR on Message*/
 	if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
 		pr_err("md/raid1:failed to get CR on MESSAGE\n");
+		mutex_unlock(&cinfo->recv_mutex);
 		return;
 	}
 
@@ -528,33 +552,45 @@ static void recv_daemon(struct md_thread *thread)
 	ret = dlm_unlock_sync(message_lockres);
 	if (unlikely(ret != 0))
 		pr_info("unlock msg failed return %d\n", ret);
+	mutex_unlock(&cinfo->recv_mutex);
 }
 
-/* lock_comm()
+/* lock_token()
  * Takes the lock on the TOKEN lock resource so no other
  * node can communicate while the operation is underway.
- * If called again, and the TOKEN lock is alread in EX mode
- * return success. However, care must be taken that unlock_comm()
- * is called only once.
  */
-static int lock_comm(struct md_cluster_info *cinfo)
+static int lock_token(struct md_cluster_info *cinfo)
 {
 	int error;
 
-	if (cinfo->token_lockres->mode == DLM_LOCK_EX)
-		return 0;
-
 	error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
 	if (error)
 		pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
 				__func__, __LINE__, error);
+
+	/* Lock the receive sequence */
+	mutex_lock(&cinfo->recv_mutex);
 	return error;
 }
 
+/* lock_comm()
+ * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
+ */
+static int lock_comm(struct md_cluster_info *cinfo)
+{
+	wait_event(cinfo->wait,
+		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
+
+	return lock_token(cinfo);
+}
+
 static void unlock_comm(struct md_cluster_info *cinfo)
 {
 	WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
+	mutex_unlock(&cinfo->recv_mutex);
 	dlm_unlock_sync(cinfo->token_lockres);
+	clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
+	wake_up(&cinfo->wait);
 }
 
 /* __sendmsg()
@@ -707,6 +743,8 @@ static int join(struct mddev *mddev, int nodes)
 	spin_lock_init(&cinfo->suspend_lock);
 	init_completion(&cinfo->completion);
 	set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
+	init_waitqueue_head(&cinfo->wait);
+	mutex_init(&cinfo->recv_mutex);
 
 	mddev->cluster_info = cinfo;
 
@@ -800,6 +838,7 @@ static void resync_bitmap(struct mddev *mddev)
 			__func__, __LINE__, err);
 }
 
+static void unlock_all_bitmaps(struct mddev *mddev);
 static int leave(struct mddev *mddev)
 {
 	struct md_cluster_info *cinfo = mddev->cluster_info;
@@ -820,6 +859,7 @@ static int leave(struct mddev *mddev)
 	lockres_free(cinfo->ack_lockres);
 	lockres_free(cinfo->no_new_dev_lockres);
 	lockres_free(cinfo->bitmap_lockres);
+	unlock_all_bitmaps(mddev);
 	dlm_release_lockspace(cinfo->lockspace, 2);
 	return 0;
 }
@@ -835,9 +875,25 @@ static int slot_number(struct mddev *mddev)
 	return cinfo->slot_number - 1;
 }
 
+/*
+ * Check if the communication is already locked, else lock the communication
+ * channel.
+ * If it is already locked, token is in EX mode, and hence lock_token()
+ * should not be called.
+ */
 static int metadata_update_start(struct mddev *mddev)
 {
-	return lock_comm(mddev->cluster_info);
+	struct md_cluster_info *cinfo = mddev->cluster_info;
+
+	wait_event(cinfo->wait,
+		   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
+		   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
+
+	/* If token is already locked, return 0 */
+	if (cinfo->token_lockres->mode == DLM_LOCK_EX)
+		return 0;
+
+	return lock_token(cinfo);
 }
 
 static int metadata_update_finish(struct mddev *mddev)
@@ -862,6 +918,7 @@ static int metadata_update_finish(struct mddev *mddev)
 		ret = __sendmsg(cinfo, &cmsg);
 	} else
 		pr_warn("md-cluster: No good device id found to send\n");
+	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
 	unlock_comm(cinfo);
 	return ret;
 }
@@ -869,6 +926,7 @@ static int metadata_update_finish(struct mddev *mddev)
 static void metadata_update_cancel(struct mddev *mddev)
 {
 	struct md_cluster_info *cinfo = mddev->cluster_info;
+	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
 	unlock_comm(cinfo);
 }
 
@@ -882,8 +940,16 @@ static int resync_start(struct mddev *mddev)
 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
 {
 	struct md_cluster_info *cinfo = mddev->cluster_info;
+	struct resync_info ri;
 	struct cluster_msg cmsg = {0};
 
+	/* do not send zero again, if we have sent before */
+	if (hi == 0) {
+		memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
+		if (le64_to_cpu(ri.hi) == 0)
+			return 0;
+	}
+
 	add_resync_info(cinfo->bitmap_lockres, lo, hi);
 	/* Re-acquire the lock to refresh LVB */
 	dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
@@ -954,14 +1020,30 @@ static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
 		ret = -ENOENT;
 	if (ret)
 		unlock_comm(cinfo);
-	else
+	else {
 		dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
+		/* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
+		 * will run soon after add_new_disk, the below path will be
+		 * invoked:
+		 *   md_wakeup_thread(mddev->thread)
+		 *	-> conf->thread (raid1d)
+		 *	-> md_check_recovery -> md_update_sb
+		 *	-> metadata_update_start/finish
+		 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
+		 *
+		 * For other failure cases, metadata_update_cancel and
+		 * add_new_disk_cancel also clear below bit as well.
+		 * */
+		set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
+		wake_up(&cinfo->wait);
+	}
 	return ret;
 }
 
 static void add_new_disk_cancel(struct mddev *mddev)
 {
 	struct md_cluster_info *cinfo = mddev->cluster_info;
+	clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
 	unlock_comm(cinfo);
 }
 
@@ -986,7 +1068,59 @@ static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
 	struct md_cluster_info *cinfo = mddev->cluster_info;
 	cmsg.type = cpu_to_le32(REMOVE);
 	cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
-	return __sendmsg(cinfo, &cmsg);
+	return sendmsg(cinfo, &cmsg);
+}
+
+static int lock_all_bitmaps(struct mddev *mddev)
+{
+	int slot, my_slot, ret, held = 1, i = 0;
+	char str[64];
+	struct md_cluster_info *cinfo = mddev->cluster_info;
+
+	cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
+					     sizeof(struct dlm_lock_resource *),
+					     GFP_KERNEL);
+	if (!cinfo->other_bitmap_lockres) {
+		pr_err("md: can't alloc mem for other bitmap locks\n");
+		return 0;
+	}
+
+	my_slot = slot_number(mddev);
+	for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
+		if (slot == my_slot)
+			continue;
+
+		memset(str, '\0', 64);
+		snprintf(str, 64, "bitmap%04d", slot);
+		cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
+		if (!cinfo->other_bitmap_lockres[i])
+			return -ENOMEM;
+
+		cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
+		ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
+		if (ret)
+			held = -1;
+		i++;
+	}
+
+	return held;
+}
+
+static void unlock_all_bitmaps(struct mddev *mddev)
+{
+	struct md_cluster_info *cinfo = mddev->cluster_info;
+	int i;
+
+	/* release other node's bitmap lock if they are existed */
+	if (cinfo->other_bitmap_lockres) {
+		for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
+			if (cinfo->other_bitmap_lockres[i]) {
+				dlm_unlock_sync(cinfo->other_bitmap_lockres[i]);
+				lockres_free(cinfo->other_bitmap_lockres[i]);
+			}
+		}
+		kfree(cinfo->other_bitmap_lockres);
+	}
 }
 
 static int gather_bitmaps(struct md_rdev *rdev)
@@ -1034,6 +1168,8 @@ static struct md_cluster_operations cluster_ops = {
 	.new_disk_ack = new_disk_ack,
 	.remove_disk = remove_disk,
 	.gather_bitmaps = gather_bitmaps,
+	.lock_all_bitmaps = lock_all_bitmaps,
+	.unlock_all_bitmaps = unlock_all_bitmaps,
 };
 
 static int __init cluster_init(void)

+ 2 - 0
drivers/md/md-cluster.h

@@ -24,6 +24,8 @@ struct md_cluster_operations {
 	int (*new_disk_ack)(struct mddev *mddev, bool ack);
 	int (*remove_disk)(struct mddev *mddev, struct md_rdev *rdev);
 	int (*gather_bitmaps)(struct md_rdev *rdev);
+	int (*lock_all_bitmaps)(struct mddev *mddev);
+	void (*unlock_all_bitmaps)(struct mddev *mddev);
 };
 
 #endif /* _MD_CLUSTER_H */

+ 105 - 66
drivers/md/md.c

@@ -206,15 +206,6 @@ void md_new_event(struct mddev *mddev)
 }
 EXPORT_SYMBOL_GPL(md_new_event);
 
-/* Alternate version that can be called from interrupts
- * when calling sysfs_notify isn't needed.
- */
-static void md_new_event_inintr(struct mddev *mddev)
-{
-	atomic_inc(&md_event_count);
-	wake_up(&md_event_waiters);
-}
-
 /*
  * Enables to iterate over all existing md arrays
  * all_mddevs_lock protects this list.
@@ -260,8 +251,7 @@ static blk_qc_t md_make_request(struct request_queue *q, struct bio *bio)
 
 	blk_queue_split(q, &bio, q->bio_split);
 
-	if (mddev == NULL || mddev->pers == NULL
-	    || !mddev->ready) {
+	if (mddev == NULL || mddev->pers == NULL) {
 		bio_io_error(bio);
 		return BLK_QC_T_NONE;
 	}
@@ -1026,8 +1016,9 @@ static int super_90_load(struct md_rdev *rdev, struct md_rdev *refdev, int minor
 	 * (not needed for Linear and RAID0 as metadata doesn't
 	 * record this size)
 	 */
-	if (rdev->sectors >= (2ULL << 32) && sb->level >= 1)
-		rdev->sectors = (2ULL << 32) - 2;
+	if (IS_ENABLED(CONFIG_LBDAF) && (u64)rdev->sectors >= (2ULL << 32) &&
+	    sb->level >= 1)
+		rdev->sectors = (sector_t)(2ULL << 32) - 2;
 
 	if (rdev->sectors < ((sector_t)sb->size) * 2 && sb->level >= 1)
 		/* "this cannot possibly happen" ... */
@@ -1199,13 +1190,13 @@ static void super_90_sync(struct mddev *mddev, struct md_rdev *rdev)
 	memcpy(&sb->set_uuid2, mddev->uuid+8, 4);
 	memcpy(&sb->set_uuid3, mddev->uuid+12,4);
 
-	sb->ctime = mddev->ctime;
+	sb->ctime = clamp_t(time64_t, mddev->ctime, 0, U32_MAX);
 	sb->level = mddev->level;
 	sb->size = mddev->dev_sectors / 2;
 	sb->raid_disks = mddev->raid_disks;
 	sb->md_minor = mddev->md_minor;
 	sb->not_persistent = 0;
-	sb->utime = mddev->utime;
+	sb->utime = clamp_t(time64_t, mddev->utime, 0, U32_MAX);
 	sb->state = 0;
 	sb->events_hi = (mddev->events>>32);
 	sb->events_lo = (u32)mddev->events;
@@ -1320,8 +1311,9 @@ super_90_rdev_size_change(struct md_rdev *rdev, sector_t num_sectors)
 	/* Limit to 4TB as metadata cannot record more than that.
 	 * 4TB == 2^32 KB, or 2*2^32 sectors.
 	 */
-	if (num_sectors >= (2ULL << 32) && rdev->mddev->level >= 1)
-		num_sectors = (2ULL << 32) - 2;
+	if (IS_ENABLED(CONFIG_LBDAF) && (u64)num_sectors >= (2ULL << 32) &&
+	    rdev->mddev->level >= 1)
+		num_sectors = (sector_t)(2ULL << 32) - 2;
 	md_super_write(rdev->mddev, rdev, rdev->sb_start, rdev->sb_size,
 		       rdev->sb_page);
 	md_super_wait(rdev->mddev);
@@ -1542,8 +1534,8 @@ static int super_1_validate(struct mddev *mddev, struct md_rdev *rdev)
 		mddev->patch_version = 0;
 		mddev->external = 0;
 		mddev->chunk_sectors = le32_to_cpu(sb->chunksize);
-		mddev->ctime = le64_to_cpu(sb->ctime) & ((1ULL << 32)-1);
-		mddev->utime = le64_to_cpu(sb->utime) & ((1ULL << 32)-1);
+		mddev->ctime = le64_to_cpu(sb->ctime);
+		mddev->utime = le64_to_cpu(sb->utime);
 		mddev->level = le32_to_cpu(sb->level);
 		mddev->clevel[0] = 0;
 		mddev->layout = le32_to_cpu(sb->layout);
@@ -1602,6 +1594,11 @@ static int super_1_validate(struct mddev *mddev, struct md_rdev *rdev)
 			mddev->new_chunk_sectors = mddev->chunk_sectors;
 		}
 
+		if (le32_to_cpu(sb->feature_map) & MD_FEATURE_JOURNAL) {
+			set_bit(MD_HAS_JOURNAL, &mddev->flags);
+			if (mddev->recovery_cp == MaxSector)
+				set_bit(MD_JOURNAL_CLEAN, &mddev->flags);
+		}
 	} else if (mddev->pers == NULL) {
 		/* Insist of good event counter while assembling, except for
 		 * spares (which don't need an event count) */
@@ -1648,8 +1645,6 @@ static int super_1_validate(struct mddev *mddev, struct md_rdev *rdev)
 			}
 			set_bit(Journal, &rdev->flags);
 			rdev->journal_tail = le64_to_cpu(sb->journal_tail);
-			if (mddev->recovery_cp == MaxSector)
-				set_bit(MD_JOURNAL_CLEAN, &mddev->flags);
 			rdev->raid_disk = 0;
 			break;
 		default:
@@ -1669,8 +1664,6 @@ static int super_1_validate(struct mddev *mddev, struct md_rdev *rdev)
 			set_bit(WriteMostly, &rdev->flags);
 		if (le32_to_cpu(sb->feature_map) & MD_FEATURE_REPLACEMENT)
 			set_bit(Replacement, &rdev->flags);
-		if (le32_to_cpu(sb->feature_map) & MD_FEATURE_JOURNAL)
-			set_bit(MD_HAS_JOURNAL, &mddev->flags);
 	} else /* MULTIPATH are always insync */
 		set_bit(In_sync, &rdev->flags);
 
@@ -2014,28 +2007,32 @@ int md_integrity_register(struct mddev *mddev)
 }
 EXPORT_SYMBOL(md_integrity_register);
 
-/* Disable data integrity if non-capable/non-matching disk is being added */
-void md_integrity_add_rdev(struct md_rdev *rdev, struct mddev *mddev)
+/*
+ * Attempt to add an rdev, but only if it is consistent with the current
+ * integrity profile
+ */
+int md_integrity_add_rdev(struct md_rdev *rdev, struct mddev *mddev)
 {
 	struct blk_integrity *bi_rdev;
 	struct blk_integrity *bi_mddev;
+	char name[BDEVNAME_SIZE];
 
 	if (!mddev->gendisk)
-		return;
+		return 0;
 
 	bi_rdev = bdev_get_integrity(rdev->bdev);
 	bi_mddev = blk_get_integrity(mddev->gendisk);
 
 	if (!bi_mddev) /* nothing to do */
-		return;
-	if (rdev->raid_disk < 0) /* skip spares */
-		return;
-	if (bi_rdev && blk_integrity_compare(mddev->gendisk,
-					     rdev->bdev->bd_disk) >= 0)
-		return;
-	WARN_ON_ONCE(!mddev->suspended);
-	printk(KERN_NOTICE "disabling data integrity on %s\n", mdname(mddev));
-	blk_integrity_unregister(mddev->gendisk);
+		return 0;
+
+	if (blk_integrity_compare(mddev->gendisk, rdev->bdev->bd_disk) != 0) {
+		printk(KERN_NOTICE "%s: incompatible integrity profile for %s\n",
+				mdname(mddev), bdevname(rdev->bdev, name));
+		return -ENXIO;
+	}
+
+	return 0;
 }
 EXPORT_SYMBOL(md_integrity_add_rdev);
 
@@ -2050,8 +2047,9 @@ static int bind_rdev_to_array(struct md_rdev *rdev, struct mddev *mddev)
 		return -EEXIST;
 
 	/* make sure rdev->sectors exceeds mddev->dev_sectors */
-	if (rdev->sectors && (mddev->dev_sectors == 0 ||
-			rdev->sectors < mddev->dev_sectors)) {
+	if (!test_bit(Journal, &rdev->flags) &&
+	    rdev->sectors &&
+	    (mddev->dev_sectors == 0 || rdev->sectors < mddev->dev_sectors)) {
 		if (mddev->pers) {
 			/* Cannot change size, so fail
 			 * If mddev->level <= 0, then we don't care
@@ -2082,7 +2080,8 @@ static int bind_rdev_to_array(struct md_rdev *rdev, struct mddev *mddev)
 		}
 	}
 	rcu_read_unlock();
-	if (mddev->max_disks && rdev->desc_nr >= mddev->max_disks) {
+	if (!test_bit(Journal, &rdev->flags) &&
+	    mddev->max_disks && rdev->desc_nr >= mddev->max_disks) {
 		printk(KERN_WARNING "md: %s: array is limited to %d devices\n",
 		       mdname(mddev), mddev->max_disks);
 		return -EBUSY;
@@ -2331,7 +2330,7 @@ repeat:
 
 	spin_lock(&mddev->lock);
 
-	mddev->utime = get_seconds();
+	mddev->utime = ktime_get_real_seconds();
 
 	if (test_and_clear_bit(MD_CHANGE_DEVS, &mddev->flags))
 		force_change = 1;
@@ -2457,15 +2456,20 @@ static int add_bound_rdev(struct md_rdev *rdev)
 {
 	struct mddev *mddev = rdev->mddev;
 	int err = 0;
+	bool add_journal = test_bit(Journal, &rdev->flags);
 
-	if (!mddev->pers->hot_remove_disk) {
+	if (!mddev->pers->hot_remove_disk || add_journal) {
 		/* If there is hot_add_disk but no hot_remove_disk
 		 * then added disks for geometry changes,
 		 * and should be added immediately.
 		 */
 		super_types[mddev->major_version].
 			validate_super(mddev, rdev);
+		if (add_journal)
+			mddev_suspend(mddev);
 		err = mddev->pers->hot_add_disk(mddev, rdev);
+		if (add_journal)
+			mddev_resume(mddev);
 		if (err) {
 			unbind_rdev_from_array(rdev);
 			export_rdev(rdev);
@@ -5299,7 +5303,6 @@ int md_run(struct mddev *mddev)
 	smp_wmb();
 	spin_lock(&mddev->lock);
 	mddev->pers = pers;
-	mddev->ready = 1;
 	spin_unlock(&mddev->lock);
 	rdev_for_each(rdev, mddev)
 		if (rdev->raid_disk >= 0)
@@ -5499,7 +5502,6 @@ static void __md_stop(struct mddev *mddev)
 	/* Ensure ->event_work is done */
 	flush_workqueue(md_misc_wq);
 	spin_lock(&mddev->lock);
-	mddev->ready = 0;
 	mddev->pers = NULL;
 	spin_unlock(&mddev->lock);
 	pers->free(mddev, mddev->private);
@@ -5837,7 +5839,7 @@ static int get_array_info(struct mddev *mddev, void __user *arg)
 	info.major_version = mddev->major_version;
 	info.minor_version = mddev->minor_version;
 	info.patch_version = MD_PATCHLEVEL_VERSION;
-	info.ctime         = mddev->ctime;
+	info.ctime         = clamp_t(time64_t, mddev->ctime, 0, U32_MAX);
 	info.level         = mddev->level;
 	info.size          = mddev->dev_sectors / 2;
 	if (info.size != mddev->dev_sectors / 2) /* overflow */
@@ -5847,7 +5849,7 @@ static int get_array_info(struct mddev *mddev, void __user *arg)
 	info.md_minor      = mddev->md_minor;
 	info.not_persistent= !mddev->persistent;
 
-	info.utime         = mddev->utime;
+	info.utime         = clamp_t(time64_t, mddev->utime, 0, U32_MAX);
 	info.state         = 0;
 	if (mddev->in_sync)
 		info.state = (1<<MD_SB_CLEAN);
@@ -6038,8 +6040,23 @@ static int add_new_disk(struct mddev *mddev, mdu_disk_info_t *info)
 		else
 			clear_bit(WriteMostly, &rdev->flags);
 
-		if (info->state & (1<<MD_DISK_JOURNAL))
+		if (info->state & (1<<MD_DISK_JOURNAL)) {
+			struct md_rdev *rdev2;
+			bool has_journal = false;
+
+			/* make sure no existing journal disk */
+			rdev_for_each(rdev2, mddev) {
+				if (test_bit(Journal, &rdev2->flags)) {
+					has_journal = true;
+					break;
+				}
+			}
+			if (has_journal) {
+				export_rdev(rdev);
+				return -EBUSY;
+			}
 			set_bit(Journal, &rdev->flags);
+		}
 		/*
 		 * check whether the device shows up in other nodes
 		 */
@@ -6130,15 +6147,11 @@ static int hot_remove_disk(struct mddev *mddev, dev_t dev)
 {
 	char b[BDEVNAME_SIZE];
 	struct md_rdev *rdev;
-	int ret = -1;
 
 	rdev = find_rdev(mddev, dev);
 	if (!rdev)
 		return -ENXIO;
 
-	if (mddev_is_clustered(mddev))
-		ret = md_cluster_ops->metadata_update_start(mddev);
-
 	if (rdev->raid_disk < 0)
 		goto kick_rdev;
 
@@ -6149,7 +6162,7 @@ static int hot_remove_disk(struct mddev *mddev, dev_t dev)
 		goto busy;
 
 kick_rdev:
-	if (mddev_is_clustered(mddev) && ret == 0)
+	if (mddev_is_clustered(mddev))
 		md_cluster_ops->remove_disk(mddev, rdev);
 
 	md_kick_rdev_from_array(rdev);
@@ -6158,9 +6171,6 @@ kick_rdev:
 
 	return 0;
 busy:
-	if (mddev_is_clustered(mddev) && ret == 0)
-		md_cluster_ops->metadata_update_cancel(mddev);
-
 	printk(KERN_WARNING "md: cannot remove active disk %s from %s ...\n",
 		bdevname(rdev->bdev,b), mdname(mddev));
 	return -EBUSY;
@@ -6354,13 +6364,13 @@ static int set_array_info(struct mddev *mddev, mdu_array_info_t *info)
 		/* ensure mddev_put doesn't delete this now that there
 		 * is some minimal configuration.
 		 */
-		mddev->ctime         = get_seconds();
+		mddev->ctime         = ktime_get_real_seconds();
 		return 0;
 	}
 	mddev->major_version = MD_MAJOR_VERSION;
 	mddev->minor_version = MD_MINOR_VERSION;
 	mddev->patch_version = MD_PATCHLEVEL_VERSION;
-	mddev->ctime         = get_seconds();
+	mddev->ctime         = ktime_get_real_seconds();
 
 	mddev->level         = info->level;
 	mddev->clevel[0]     = 0;
@@ -6602,6 +6612,19 @@ static int update_array_info(struct mddev *mddev, mdu_array_info_t *info)
 				rv = -EINVAL;
 				goto err;
 			}
+			if (mddev->bitmap_info.nodes) {
+				/* hold PW on all the bitmap lock */
+				if (md_cluster_ops->lock_all_bitmaps(mddev) <= 0) {
+					printk("md: can't change bitmap to none since the"
+					       " array is in use by more than one node\n");
+					rv = -EPERM;
+					md_cluster_ops->unlock_all_bitmaps(mddev);
+					goto err;
+				}
+
+				mddev->bitmap_info.nodes = 0;
+				md_cluster_ops->leave(mddev);
+			}
 			mddev->pers->quiesce(mddev, 1);
 			bitmap_destroy(mddev);
 			mddev->pers->quiesce(mddev, 0);
@@ -7180,7 +7203,7 @@ void md_error(struct mddev *mddev, struct md_rdev *rdev)
 	md_wakeup_thread(mddev->thread);
 	if (mddev->event_work.func)
 		queue_work(md_misc_wq, &mddev->event_work);
-	md_new_event_inintr(mddev);
+	md_new_event(mddev);
 }
 EXPORT_SYMBOL(md_error);
 
@@ -7704,7 +7727,7 @@ EXPORT_SYMBOL(md_write_end);
  * attempting a GFP_KERNEL allocation while holding the mddev lock.
  * Must be called with mddev_lock held.
  *
- * In the ->external case MD_CHANGE_CLEAN can not be cleared until mddev->lock
+ * In the ->external case MD_CHANGE_PENDING can not be cleared until mddev->lock
  * is dropped, so return -EAGAIN after notifying userspace.
  */
 int md_allow_write(struct mddev *mddev)
@@ -8169,19 +8192,20 @@ static int remove_and_add_spares(struct mddev *mddev,
 			continue;
 		if (test_bit(Faulty, &rdev->flags))
 			continue;
-		if (test_bit(Journal, &rdev->flags))
-			continue;
-		if (mddev->ro &&
-		    ! (rdev->saved_raid_disk >= 0 &&
-		       !test_bit(Bitmap_sync, &rdev->flags)))
-			continue;
+		if (!test_bit(Journal, &rdev->flags)) {
+			if (mddev->ro &&
+			    ! (rdev->saved_raid_disk >= 0 &&
+			       !test_bit(Bitmap_sync, &rdev->flags)))
+				continue;
 
-		rdev->recovery_offset = 0;
+			rdev->recovery_offset = 0;
+		}
 		if (mddev->pers->
 		    hot_add_disk(mddev, rdev) == 0) {
 			if (sysfs_link_rdev(mddev, rdev))
 				/* failure here is OK */;
-			spares++;
+			if (!test_bit(Journal, &rdev->flags))
+				spares++;
 			md_new_event(mddev);
 			set_bit(MD_CHANGE_DEVS, &mddev->flags);
 		}
@@ -8276,6 +8300,7 @@ void md_check_recovery(struct mddev *mddev)
 		(mddev->flags & MD_UPDATE_SB_FLAGS & ~ (1<<MD_CHANGE_PENDING)) ||
 		test_bit(MD_RECOVERY_NEEDED, &mddev->recovery) ||
 		test_bit(MD_RECOVERY_DONE, &mddev->recovery) ||
+		test_bit(MD_RELOAD_SB, &mddev->flags) ||
 		(mddev->external == 0 && mddev->safemode == 1) ||
 		(mddev->safemode == 2 && ! atomic_read(&mddev->writes_pending)
 		 && !mddev->in_sync && mddev->recovery_cp == MaxSector)
@@ -8314,6 +8339,21 @@ void md_check_recovery(struct mddev *mddev)
 			goto unlock;
 		}
 
+		if (mddev_is_clustered(mddev)) {
+			struct md_rdev *rdev;
+			/* kick the device if another node issued a
+			 * remove disk.
+			 */
+			rdev_for_each(rdev, mddev) {
+				if (test_and_clear_bit(ClusterRemove, &rdev->flags) &&
+						rdev->raid_disk < 0)
+					md_kick_rdev_from_array(rdev);
+			}
+
+			if (test_and_clear_bit(MD_RELOAD_SB, &mddev->flags))
+				md_reload_sb(mddev, mddev->good_device_nr);
+		}
+
 		if (!mddev->external) {
 			int did_change = 0;
 			spin_lock(&mddev->lock);
@@ -8635,7 +8675,6 @@ static void check_sb_changes(struct mddev *mddev, struct md_rdev *rdev)
 				ret = remove_and_add_spares(mddev, rdev2);
 				pr_info("Activated spare: %s\n",
 						bdevname(rdev2->bdev,b));
-				continue;
 			}
 			/* device faulty
 			 * We just want to do the minimum to mark the disk

+ 7 - 4
drivers/md/md.h

@@ -162,6 +162,7 @@ enum flag_bits {
 				 * Usually, this device should be faster
 				 * than other devices in the array
 				 */
+	ClusterRemove,
 };
 
 static inline int is_badblock(struct md_rdev *rdev, sector_t s, int sectors,
@@ -200,6 +201,9 @@ struct mddev {
 				 */
 #define MD_JOURNAL_CLEAN 5	/* A raid with journal is already clean */
 #define MD_HAS_JOURNAL	6	/* The raid array has journal feature set */
+#define MD_RELOAD_SB	7	/* Reload the superblock because another node
+				 * updated it.
+				 */
 
 	int				suspended;
 	atomic_t			active_io;
@@ -208,8 +212,6 @@ struct mddev {
 						       * are happening, so run/
 						       * takeover/stop are not safe
 						       */
-	int				ready; /* See when safe to pass
-						* IO requests down */
 	struct gendisk			*gendisk;
 
 	struct kobject			kobj;
@@ -226,7 +228,7 @@ struct mddev {
 							 * managed externally */
 	char				metadata_type[17]; /* externally set*/
 	int				chunk_sectors;
-	time_t				ctime, utime;
+	time64_t			ctime, utime;
 	int				level, layout;
 	char				clevel[16];
 	int				raid_disks;
@@ -430,6 +432,7 @@ struct mddev {
 	struct work_struct event_work;	/* used by dm to report failure event */
 	void (*sync_super)(struct mddev *mddev, struct md_rdev *rdev);
 	struct md_cluster_info		*cluster_info;
+	unsigned int			good_device_nr;	/* good device num within cluster raid */
 };
 
 static inline int __must_check mddev_lock(struct mddev *mddev)
@@ -623,7 +626,7 @@ extern void md_wait_for_blocked_rdev(struct md_rdev *rdev, struct mddev *mddev);
 extern void md_set_array_sectors(struct mddev *mddev, sector_t array_sectors);
 extern int md_check_no_bitmap(struct mddev *mddev);
 extern int md_integrity_register(struct mddev *mddev);
-extern void md_integrity_add_rdev(struct md_rdev *rdev, struct mddev *mddev);
+extern int md_integrity_add_rdev(struct md_rdev *rdev, struct mddev *mddev);
 extern int strict_strtoul_scaled(const char *cp, unsigned long *res, int scale);
 
 extern void mddev_init(struct mddev *mddev);

+ 3 - 3
drivers/md/multipath.c

@@ -257,6 +257,9 @@ static int multipath_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 			disk_stack_limits(mddev->gendisk, rdev->bdev,
 					  rdev->data_offset << 9);
 
+			err = md_integrity_add_rdev(rdev, mddev);
+			if (err)
+				break;
 			spin_lock_irq(&conf->device_lock);
 			mddev->degraded--;
 			rdev->raid_disk = path;
@@ -264,9 +267,6 @@ static int multipath_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 			spin_unlock_irq(&conf->device_lock);
 			rcu_assign_pointer(p->rdev, rdev);
 			err = 0;
-			mddev_suspend(mddev);
-			md_integrity_add_rdev(rdev, mddev);
-			mddev_resume(mddev);
 			break;
 		}
 

+ 3 - 3
drivers/md/raid1.c

@@ -1589,6 +1589,9 @@ static int raid1_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 	if (mddev->recovery_disabled == conf->recovery_disabled)
 		return -EBUSY;
 
+	if (md_integrity_add_rdev(rdev, mddev))
+		return -ENXIO;
+
 	if (rdev->raid_disk >= 0)
 		first = last = rdev->raid_disk;
 
@@ -1632,9 +1635,6 @@ static int raid1_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 			break;
 		}
 	}
-	mddev_suspend(mddev);
-	md_integrity_add_rdev(rdev, mddev);
-	mddev_resume(mddev);
 	if (mddev->queue && blk_queue_discard(bdev_get_queue(rdev->bdev)))
 		queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, mddev->queue);
 	print_conf(conf);

+ 3 - 3
drivers/md/raid10.c

@@ -1698,6 +1698,9 @@ static int raid10_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 	if (rdev->saved_raid_disk < 0 && !_enough(conf, 1, -1))
 		return -EINVAL;
 
+	if (md_integrity_add_rdev(rdev, mddev))
+		return -ENXIO;
+
 	if (rdev->raid_disk >= 0)
 		first = last = rdev->raid_disk;
 
@@ -1739,9 +1742,6 @@ static int raid10_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 		rcu_assign_pointer(p->rdev, rdev);
 		break;
 	}
-	mddev_suspend(mddev);
-	md_integrity_add_rdev(rdev, mddev);
-	mddev_resume(mddev);
 	if (mddev->queue && blk_queue_discard(bdev_get_queue(rdev->bdev)))
 		queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, mddev->queue);
 

+ 119 - 39
drivers/md/raid5-cache.c

@@ -34,6 +34,12 @@
 #define RECLAIM_MAX_FREE_SPACE (10 * 1024 * 1024 * 2) /* sector */
 #define RECLAIM_MAX_FREE_SPACE_SHIFT (2)
 
+/*
+ * We only need 2 bios per I/O unit to make progress, but ensure we
+ * have a few more available to not get too tight.
+ */
+#define R5L_POOL_SIZE	4
+
 struct r5l_log {
 	struct md_rdev *rdev;
 
@@ -69,7 +75,12 @@ struct r5l_log {
 	struct list_head finished_ios;	/* io_units which settle down in log disk */
 	struct bio flush_bio;
 
+	struct list_head no_mem_stripes;   /* pending stripes, -ENOMEM */
+
 	struct kmem_cache *io_kc;
+	mempool_t *io_pool;
+	struct bio_set *bs;
+	mempool_t *meta_pool;
 
 	struct md_thread *reclaim_thread;
 	unsigned long reclaim_target;	/* number of space that need to be
@@ -150,27 +161,6 @@ static bool r5l_has_free_space(struct r5l_log *log, sector_t size)
 	return log->device_size > used_size + size;
 }
 
-static void r5l_free_io_unit(struct r5l_log *log, struct r5l_io_unit *io)
-{
-	__free_page(io->meta_page);
-	kmem_cache_free(log->io_kc, io);
-}
-
-static void r5l_move_io_unit_list(struct list_head *from, struct list_head *to,
-				  enum r5l_io_unit_state state)
-{
-	struct r5l_io_unit *io;
-
-	while (!list_empty(from)) {
-		io = list_first_entry(from, struct r5l_io_unit, log_sibling);
-		/* don't change list order */
-		if (io->state >= state)
-			list_move_tail(&io->log_sibling, to);
-		else
-			break;
-	}
-}
-
 static void __r5l_set_io_unit_state(struct r5l_io_unit *io,
 				    enum r5l_io_unit_state state)
 {
@@ -206,6 +196,20 @@ static void r5l_log_run_stripes(struct r5l_log *log)
 	}
 }
 
+static void r5l_move_to_end_ios(struct r5l_log *log)
+{
+	struct r5l_io_unit *io, *next;
+
+	assert_spin_locked(&log->io_list_lock);
+
+	list_for_each_entry_safe(io, next, &log->running_ios, log_sibling) {
+		/* don't change list order */
+		if (io->state < IO_UNIT_IO_END)
+			break;
+		list_move_tail(&io->log_sibling, &log->io_end_ios);
+	}
+}
+
 static void r5l_log_endio(struct bio *bio)
 {
 	struct r5l_io_unit *io = bio->bi_private;
@@ -216,12 +220,12 @@ static void r5l_log_endio(struct bio *bio)
 		md_error(log->rdev->mddev, log->rdev);
 
 	bio_put(bio);
+	mempool_free(io->meta_page, log->meta_pool);
 
 	spin_lock_irqsave(&log->io_list_lock, flags);
 	__r5l_set_io_unit_state(io, IO_UNIT_IO_END);
 	if (log->need_cache_flush)
-		r5l_move_io_unit_list(&log->running_ios, &log->io_end_ios,
-				      IO_UNIT_IO_END);
+		r5l_move_to_end_ios(log);
 	else
 		r5l_log_run_stripes(log);
 	spin_unlock_irqrestore(&log->io_list_lock, flags);
@@ -255,7 +259,7 @@ static void r5l_submit_current_io(struct r5l_log *log)
 
 static struct bio *r5l_bio_alloc(struct r5l_log *log)
 {
-	struct bio *bio = bio_kmalloc(GFP_NOIO | __GFP_NOFAIL, BIO_MAX_PAGES);
+	struct bio *bio = bio_alloc_bioset(GFP_NOIO, BIO_MAX_PAGES, log->bs);
 
 	bio->bi_rw = WRITE;
 	bio->bi_bdev = log->rdev->bdev;
@@ -286,15 +290,19 @@ static struct r5l_io_unit *r5l_new_meta(struct r5l_log *log)
 	struct r5l_io_unit *io;
 	struct r5l_meta_block *block;
 
-	/* We can't handle memory allocate failure so far */
-	io = kmem_cache_zalloc(log->io_kc, GFP_NOIO | __GFP_NOFAIL);
+	io = mempool_alloc(log->io_pool, GFP_ATOMIC);
+	if (!io)
+		return NULL;
+	memset(io, 0, sizeof(*io));
+
 	io->log = log;
 	INIT_LIST_HEAD(&io->log_sibling);
 	INIT_LIST_HEAD(&io->stripe_list);
 	io->state = IO_UNIT_RUNNING;
 
-	io->meta_page = alloc_page(GFP_NOIO | __GFP_NOFAIL | __GFP_ZERO);
+	io->meta_page = mempool_alloc(log->meta_pool, GFP_NOIO);
 	block = page_address(io->meta_page);
+	clear_page(block);
 	block->magic = cpu_to_le32(R5LOG_MAGIC);
 	block->version = R5LOG_VERSION;
 	block->seq = cpu_to_le64(log->seq);
@@ -324,8 +332,12 @@ static int r5l_get_meta(struct r5l_log *log, unsigned int payload_size)
 	    log->current_io->meta_offset + payload_size > PAGE_SIZE)
 		r5l_submit_current_io(log);
 
-	if (!log->current_io)
+	if (!log->current_io) {
 		log->current_io = r5l_new_meta(log);
+		if (!log->current_io)
+			return -ENOMEM;
+	}
+
 	return 0;
 }
 
@@ -370,11 +382,12 @@ static void r5l_append_payload_page(struct r5l_log *log, struct page *page)
 	r5_reserve_log_entry(log, io);
 }
 
-static void r5l_log_stripe(struct r5l_log *log, struct stripe_head *sh,
+static int r5l_log_stripe(struct r5l_log *log, struct stripe_head *sh,
 			   int data_pages, int parity_pages)
 {
 	int i;
 	int meta_size;
+	int ret;
 	struct r5l_io_unit *io;
 
 	meta_size =
@@ -383,7 +396,10 @@ static void r5l_log_stripe(struct r5l_log *log, struct stripe_head *sh,
 		sizeof(struct r5l_payload_data_parity) +
 		sizeof(__le32) * parity_pages;
 
-	r5l_get_meta(log, meta_size);
+	ret = r5l_get_meta(log, meta_size);
+	if (ret)
+		return ret;
+
 	io = log->current_io;
 
 	for (i = 0; i < sh->disks; i++) {
@@ -413,6 +429,8 @@ static void r5l_log_stripe(struct r5l_log *log, struct stripe_head *sh,
 	list_add_tail(&sh->log_list, &io->stripe_list);
 	atomic_inc(&io->pending_stripe);
 	sh->log_io = io;
+
+	return 0;
 }
 
 static void r5l_wake_reclaim(struct r5l_log *log, sector_t space);
@@ -427,6 +445,7 @@ int r5l_write_stripe(struct r5l_log *log, struct stripe_head *sh)
 	int meta_size;
 	int reserve;
 	int i;
+	int ret = 0;
 
 	if (!log)
 		return -EAGAIN;
@@ -475,17 +494,22 @@ int r5l_write_stripe(struct r5l_log *log, struct stripe_head *sh)
 	mutex_lock(&log->io_mutex);
 	/* meta + data */
 	reserve = (1 + write_disks) << (PAGE_SHIFT - 9);
-	if (r5l_has_free_space(log, reserve))
-		r5l_log_stripe(log, sh, data_pages, parity_pages);
-	else {
+	if (!r5l_has_free_space(log, reserve)) {
 		spin_lock(&log->no_space_stripes_lock);
 		list_add_tail(&sh->log_list, &log->no_space_stripes);
 		spin_unlock(&log->no_space_stripes_lock);
 
 		r5l_wake_reclaim(log, reserve);
+	} else {
+		ret = r5l_log_stripe(log, sh, data_pages, parity_pages);
+		if (ret) {
+			spin_lock_irq(&log->io_list_lock);
+			list_add_tail(&sh->log_list, &log->no_mem_stripes);
+			spin_unlock_irq(&log->io_list_lock);
+		}
 	}
-	mutex_unlock(&log->io_mutex);
 
+	mutex_unlock(&log->io_mutex);
 	return 0;
 }
 
@@ -538,6 +562,21 @@ static sector_t r5l_reclaimable_space(struct r5l_log *log)
 				 log->next_checkpoint);
 }
 
+static void r5l_run_no_mem_stripe(struct r5l_log *log)
+{
+	struct stripe_head *sh;
+
+	assert_spin_locked(&log->io_list_lock);
+
+	if (!list_empty(&log->no_mem_stripes)) {
+		sh = list_first_entry(&log->no_mem_stripes,
+				      struct stripe_head, log_list);
+		list_del_init(&sh->log_list);
+		set_bit(STRIPE_HANDLE, &sh->state);
+		raid5_release_stripe(sh);
+	}
+}
+
 static bool r5l_complete_finished_ios(struct r5l_log *log)
 {
 	struct r5l_io_unit *io, *next;
@@ -554,7 +593,8 @@ static bool r5l_complete_finished_ios(struct r5l_log *log)
 		log->next_cp_seq = io->seq;
 
 		list_del(&io->log_sibling);
-		r5l_free_io_unit(log, io);
+		mempool_free(io, log->io_pool);
+		r5l_run_no_mem_stripe(log);
 
 		found = true;
 	}
@@ -787,6 +827,13 @@ void r5l_quiesce(struct r5l_log *log, int state)
 		return;
 	if (state == 0) {
 		log->in_teardown = 0;
+		/*
+		 * This is a special case for hotadd. In suspend, the array has
+		 * no journal. In resume, journal is initialized as well as the
+		 * reclaim thread.
+		 */
+		if (log->reclaim_thread)
+			return;
 		log->reclaim_thread = md_register_thread(r5l_reclaim_thread,
 					log->rdev->mddev, "reclaim");
 	} else if (state == 1) {
@@ -806,10 +853,18 @@ void r5l_quiesce(struct r5l_log *log, int state)
 
 bool r5l_log_disk_error(struct r5conf *conf)
 {
+	struct r5l_log *log;
+	bool ret;
 	/* don't allow write if journal disk is missing */
-	if (!conf->log)
-		return test_bit(MD_HAS_JOURNAL, &conf->mddev->flags);
-	return test_bit(Faulty, &conf->log->rdev->flags);
+	rcu_read_lock();
+	log = rcu_dereference(conf->log);
+
+	if (!log)
+		ret = test_bit(MD_HAS_JOURNAL, &conf->mddev->flags);
+	else
+		ret = test_bit(Faulty, &log->rdev->flags);
+	rcu_read_unlock();
+	return ret;
 }
 
 struct r5l_recovery_ctx {
@@ -1160,23 +1215,45 @@ int r5l_init_log(struct r5conf *conf, struct md_rdev *rdev)
 	if (!log->io_kc)
 		goto io_kc;
 
+	log->io_pool = mempool_create_slab_pool(R5L_POOL_SIZE, log->io_kc);
+	if (!log->io_pool)
+		goto io_pool;
+
+	log->bs = bioset_create(R5L_POOL_SIZE, 0);
+	if (!log->bs)
+		goto io_bs;
+
+	log->meta_pool = mempool_create_page_pool(R5L_POOL_SIZE, 0);
+	if (!log->meta_pool)
+		goto out_mempool;
+
 	log->reclaim_thread = md_register_thread(r5l_reclaim_thread,
 						 log->rdev->mddev, "reclaim");
 	if (!log->reclaim_thread)
 		goto reclaim_thread;
 	init_waitqueue_head(&log->iounit_wait);
 
+	INIT_LIST_HEAD(&log->no_mem_stripes);
+
 	INIT_LIST_HEAD(&log->no_space_stripes);
 	spin_lock_init(&log->no_space_stripes_lock);
 
 	if (r5l_load_log(log))
 		goto error;
 
-	conf->log = log;
+	rcu_assign_pointer(conf->log, log);
+	set_bit(MD_HAS_JOURNAL, &conf->mddev->flags);
 	return 0;
+
 error:
 	md_unregister_thread(&log->reclaim_thread);
 reclaim_thread:
+	mempool_destroy(log->meta_pool);
+out_mempool:
+	bioset_free(log->bs);
+io_bs:
+	mempool_destroy(log->io_pool);
+io_pool:
 	kmem_cache_destroy(log->io_kc);
 io_kc:
 	kfree(log);
@@ -1186,6 +1263,9 @@ io_kc:
 void r5l_exit_log(struct r5l_log *log)
 {
 	md_unregister_thread(&log->reclaim_thread);
+	mempool_destroy(log->meta_pool);
+	bioset_free(log->bs);
+	mempool_destroy(log->io_pool);
 	kmem_cache_destroy(log->io_kc);
 	kfree(log);
 }

+ 26 - 10
drivers/md/raid5.c

@@ -772,8 +772,6 @@ static void stripe_add_to_batch_list(struct r5conf *conf, struct stripe_head *sh
 	int hash;
 	int dd_idx;
 
-	if (!stripe_can_batch(sh))
-		return;
 	/* Don't cross chunks, so stripe pd_idx/qd_idx is the same */
 	tmp_sec = sh->sector;
 	if (!sector_div(tmp_sec, conf->chunk_sectors))
@@ -7141,14 +7139,19 @@ static int raid5_remove_disk(struct mddev *mddev, struct md_rdev *rdev)
 	struct disk_info *p = conf->disks + number;
 
 	print_raid5_conf(conf);
-	if (test_bit(Journal, &rdev->flags)) {
+	if (test_bit(Journal, &rdev->flags) && conf->log) {
+		struct r5l_log *log;
 		/*
-		 * journal disk is not removable, but we need give a chance to
-		 * update superblock of other disks. Otherwise journal disk
-		 * will be considered as 'fresh'
+		 * we can't wait pending write here, as this is called in
+		 * raid5d, wait will deadlock.
 		 */
-		set_bit(MD_CHANGE_DEVS, &mddev->flags);
-		return -EINVAL;
+		if (atomic_read(&mddev->writes_pending))
+			return -EBUSY;
+		log = conf->log;
+		conf->log = NULL;
+		synchronize_rcu();
+		r5l_exit_log(log);
+		return 0;
 	}
 	if (rdev == p->rdev)
 		rdevp = &p->rdev;
@@ -7212,8 +7215,21 @@ static int raid5_add_disk(struct mddev *mddev, struct md_rdev *rdev)
 	int first = 0;
 	int last = conf->raid_disks - 1;
 
-	if (test_bit(Journal, &rdev->flags))
-		return -EINVAL;
+	if (test_bit(Journal, &rdev->flags)) {
+		char b[BDEVNAME_SIZE];
+		if (conf->log)
+			return -EBUSY;
+
+		rdev->raid_disk = 0;
+		/*
+		 * The array is in readonly mode if journal is missing, so no
+		 * write requests running. We should be safe
+		 */
+		r5l_init_log(conf, rdev);
+		printk(KERN_INFO"md/raid:%s: using device %s as journal\n",
+		       mdname(mddev), bdevname(rdev->bdev, b));
+		return 0;
+	}
 	if (mddev->recovery_disabled == conf->recovery_disabled)
 		return -EBUSY;
 

+ 2 - 2
include/uapi/linux/raid/md_u.h

@@ -80,7 +80,7 @@ typedef struct mdu_array_info_s {
 	int major_version;
 	int minor_version;
 	int patch_version;
-	int ctime;
+	unsigned int ctime;
 	int level;
 	int size;
 	int nr_disks;
@@ -91,7 +91,7 @@ typedef struct mdu_array_info_s {
 	/*
 	 * Generic state information
 	 */
-	int utime;		/*  0 Superblock update time		      */
+	unsigned int utime;	/*  0 Superblock update time		      */
 	int state;		/*  1 State bits (clean, ...)		      */
 	int active_disks;	/*  2 Number of currently active disks	      */
 	int working_disks;	/*  3 Number of working disks		      */