Эх сурвалжийг харах

Merge tag 'nfs-rdma-3.16' of git://git.linux-nfs.org/projects/anna/nfs-rdma into linux-next

Pull NFS client side changes for RDMA from Anna Schumaker

These patches are mostly cleanups and bugfixes for using RDMA as an
over-the-wire transport.

Highlights include:

- Remove obsolete memory registration modes.
- Removing BUG_ON()s to keep client's running.
- Fix deadlocks, NULL-pointer dereferences, and memory leaks.

* tag 'nfs-rdma-3.16' of git://git.linux-nfs.org/projects/anna/nfs-rdma: (24 commits)
  xprtrdma: Disconnect on registration failure
  xprtrdma: Remove BUG_ON() call sites
  xprtrdma: Avoid deadlock when credit window is reset
  SUNRPC: Move congestion window constants to header file
  xprtrdma: Reset connection timeout after successful reconnect
  xprtrdma: Use macros for reconnection timeout constants
  xprtrdma: Allocate missing pagelist
  xprtrdma: Remove Tavor MTU setting
  xprtrdma: Ensure ia->ri_id->qp is not NULL when reconnecting
  xprtrdma: Reduce the number of hardway buffer allocations
  xprtrdma: Limit work done by completion handler
  xprtrmda: Reduce calls to ib_poll_cq() in completion handlers
  xprtrmda: Reduce lock contention in completion handlers
  xprtrdma: Split the completion queue
  xprtrdma: Make rpcrdma_ep_destroy() return void
  xprtrdma: Simplify rpcrdma_deregister_external() synopsis
  xprtrdma: mount reports "Invalid mount option" if memreg mode not supported
  xprtrdma: Fall back to MTHCAFMR when FRMR is not supported
  xprtrdma: Remove REGISTER memory registration mode
  xprtrdma: Remove MEMWINDOWS registration modes
  ...
Trond Myklebust 11 жил өмнө
parent
commit
7b160cfd19

+ 6 - 0
include/linux/sunrpc/xprt.h

@@ -24,6 +24,12 @@
 #define RPC_MAX_SLOT_TABLE_LIMIT	(65536U)
 #define RPC_MAX_SLOT_TABLE_LIMIT	(65536U)
 #define RPC_MAX_SLOT_TABLE	RPC_MAX_SLOT_TABLE_LIMIT
 #define RPC_MAX_SLOT_TABLE	RPC_MAX_SLOT_TABLE_LIMIT
 
 
+#define RPC_CWNDSHIFT		(8U)
+#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT)
+#define RPC_INITCWND		RPC_CWNDSCALE
+#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT)
+#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
+
 /*
 /*
  * This describes a timeout strategy
  * This describes a timeout strategy
  */
  */

+ 9 - 19
net/sunrpc/xprt.c

@@ -71,24 +71,6 @@ static void	 xprt_destroy(struct rpc_xprt *xprt);
 static DEFINE_SPINLOCK(xprt_list_lock);
 static DEFINE_SPINLOCK(xprt_list_lock);
 static LIST_HEAD(xprt_list);
 static LIST_HEAD(xprt_list);
 
 
-/*
- * The transport code maintains an estimate on the maximum number of out-
- * standing RPC requests, using a smoothed version of the congestion
- * avoidance implemented in 44BSD. This is basically the Van Jacobson
- * congestion algorithm: If a retransmit occurs, the congestion window is
- * halved; otherwise, it is incremented by 1/cwnd when
- *
- *	-	a reply is received and
- *	-	a full number of requests are outstanding and
- *	-	the congestion window hasn't been updated recently.
- */
-#define RPC_CWNDSHIFT		(8U)
-#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT)
-#define RPC_INITCWND		RPC_CWNDSCALE
-#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT)
-
-#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
-
 /**
 /**
  * xprt_register_transport - register a transport implementation
  * xprt_register_transport - register a transport implementation
  * @transport: transport to register
  * @transport: transport to register
@@ -446,7 +428,15 @@ EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
  * @task: recently completed RPC request used to adjust window
  * @task: recently completed RPC request used to adjust window
  * @result: result code of completed RPC request
  * @result: result code of completed RPC request
  *
  *
- * We use a time-smoothed congestion estimator to avoid heavy oscillation.
+ * The transport code maintains an estimate on the maximum number of out-
+ * standing RPC requests, using a smoothed version of the congestion
+ * avoidance implemented in 44BSD. This is basically the Van Jacobson
+ * congestion algorithm: If a retransmit occurs, the congestion window is
+ * halved; otherwise, it is incremented by 1/cwnd when
+ *
+ *	-	a reply is received and
+ *	-	a full number of requests are outstanding and
+ *	-	the congestion window hasn't been updated recently.
  */
  */
 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
 {
 {

+ 53 - 66
net/sunrpc/xprtrdma/rpc_rdma.c

@@ -78,8 +78,7 @@ static const char transfertypes[][12] = {
  * elements. Segments are then coalesced when registered, if possible
  * elements. Segments are then coalesced when registered, if possible
  * within the selected memreg mode.
  * within the selected memreg mode.
  *
  *
- * Note, this routine is never called if the connection's memory
- * registration strategy is 0 (bounce buffers).
+ * Returns positive number of segments converted, or a negative errno.
  */
  */
 
 
 static int
 static int
@@ -102,10 +101,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	page_base = xdrbuf->page_base & ~PAGE_MASK;
 	page_base = xdrbuf->page_base & ~PAGE_MASK;
 	p = 0;
 	p = 0;
 	while (len && n < nsegs) {
 	while (len && n < nsegs) {
+		if (!ppages[p]) {
+			/* alloc the pagelist for receiving buffer */
+			ppages[p] = alloc_page(GFP_ATOMIC);
+			if (!ppages[p])
+				return -ENOMEM;
+		}
 		seg[n].mr_page = ppages[p];
 		seg[n].mr_page = ppages[p];
 		seg[n].mr_offset = (void *)(unsigned long) page_base;
 		seg[n].mr_offset = (void *)(unsigned long) page_base;
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-		BUG_ON(seg[n].mr_len > PAGE_SIZE);
+		if (seg[n].mr_len > PAGE_SIZE)
+			return -EIO;
 		len -= seg[n].mr_len;
 		len -= seg[n].mr_len;
 		++n;
 		++n;
 		++p;
 		++p;
@@ -114,7 +120,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 
 
 	/* Message overflows the seg array */
 	/* Message overflows the seg array */
 	if (len && n == nsegs)
 	if (len && n == nsegs)
-		return 0;
+		return -EIO;
 
 
 	if (xdrbuf->tail[0].iov_len) {
 	if (xdrbuf->tail[0].iov_len) {
 		/* the rpcrdma protocol allows us to omit any trailing
 		/* the rpcrdma protocol allows us to omit any trailing
@@ -123,7 +129,7 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 			return n;
 			return n;
 		if (n == nsegs)
 		if (n == nsegs)
 			/* Tail remains, but we're out of segments */
 			/* Tail remains, but we're out of segments */
-			return 0;
+			return -EIO;
 		seg[n].mr_page = NULL;
 		seg[n].mr_page = NULL;
 		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
 		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
 		seg[n].mr_len = xdrbuf->tail[0].iov_len;
 		seg[n].mr_len = xdrbuf->tail[0].iov_len;
@@ -164,15 +170,17 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
  *  Reply chunk (a counted array):
  *  Reply chunk (a counted array):
  *   N elements:
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
  *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
  */
  */
 
 
-static unsigned int
+static ssize_t
 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
 		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
 {
 {
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int nsegs, nchunks = 0;
+	int n, nsegs, nchunks = 0;
 	unsigned int pos;
 	unsigned int pos;
 	struct rpcrdma_mr_seg *seg = req->rl_segments;
 	struct rpcrdma_mr_seg *seg = req->rl_segments;
 	struct rpcrdma_read_chunk *cur_rchunk = NULL;
 	struct rpcrdma_read_chunk *cur_rchunk = NULL;
@@ -198,12 +206,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 		pos = target->head[0].iov_len;
 		pos = target->head[0].iov_len;
 
 
 	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
 	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
-	if (nsegs == 0)
-		return 0;
+	if (nsegs < 0)
+		return nsegs;
 
 
 	do {
 	do {
-		/* bind/register the memory, then build chunk from result. */
-		int n = rpcrdma_register_external(seg, nsegs,
+		n = rpcrdma_register_external(seg, nsegs,
 						cur_wchunk != NULL, r_xprt);
 						cur_wchunk != NULL, r_xprt);
 		if (n <= 0)
 		if (n <= 0)
 			goto out;
 			goto out;
@@ -248,10 +255,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 	/* success. all failures return above */
 	/* success. all failures return above */
 	req->rl_nchunks = nchunks;
 	req->rl_nchunks = nchunks;
 
 
-	BUG_ON(nchunks == 0);
-	BUG_ON((r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
-	       && (nchunks > 3));
-
 	/*
 	/*
 	 * finish off header. If write, marshal discrim and nchunks.
 	 * finish off header. If write, marshal discrim and nchunks.
 	 */
 	 */
@@ -278,8 +281,8 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 out:
 out:
 	for (pos = 0; nchunks--;)
 	for (pos = 0; nchunks--;)
 		pos += rpcrdma_deregister_external(
 		pos += rpcrdma_deregister_external(
-				&req->rl_segments[pos], r_xprt, NULL);
-	return 0;
+				&req->rl_segments[pos], r_xprt);
+	return n;
 }
 }
 
 
 /*
 /*
@@ -361,6 +364,8 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
  *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
  *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
  *  [2] -- optional padding.
  *  [2] -- optional padding.
  *  [3] -- if padded, header only in [1] and data here.
  *  [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
  */
  */
 
 
 int
 int
@@ -370,7 +375,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	char *base;
 	char *base;
-	size_t hdrlen, rpclen, padlen;
+	size_t rpclen, padlen;
+	ssize_t hdrlen;
 	enum rpcrdma_chunktype rtype, wtype;
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_msg *headerp;
 
 
@@ -441,14 +447,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	/* The following simplification is not true forever */
 	/* The following simplification is not true forever */
 	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
 	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
 		wtype = rpcrdma_noch;
 		wtype = rpcrdma_noch;
-	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
-
-	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS &&
-	    (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) {
-		/* forced to "pure inline"? */
-		dprintk("RPC:       %s: too much data (%d/%d) for inline\n",
-			__func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len);
-		return -1;
+	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
+			__func__);
+		return -EIO;
 	}
 	}
 
 
 	hdrlen = 28; /*sizeof *headerp;*/
 	hdrlen = 28; /*sizeof *headerp;*/
@@ -474,8 +476,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-			BUG_ON(wtype != rpcrdma_noch);
-
+			if (wtype != rpcrdma_noch) {
+				dprintk("RPC:       %s: invalid chunk list\n",
+					__func__);
+				return -EIO;
+			}
 		} else {
 		} else {
 			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
 			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
 			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
 			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
@@ -492,8 +497,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 			 * on receive. Therefore, we request a reply chunk
 			 * on receive. Therefore, we request a reply chunk
 			 * for non-writes wherever feasible and efficient.
 			 * for non-writes wherever feasible and efficient.
 			 */
 			 */
-			if (wtype == rpcrdma_noch &&
-			    r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER)
+			if (wtype == rpcrdma_noch)
 				wtype = rpcrdma_replych;
 				wtype = rpcrdma_replych;
 		}
 		}
 	}
 	}
@@ -511,9 +515,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		hdrlen = rpcrdma_create_chunks(rqst,
 		hdrlen = rpcrdma_create_chunks(rqst,
 					&rqst->rq_rcv_buf, headerp, wtype);
 					&rqst->rq_rcv_buf, headerp, wtype);
 	}
 	}
-
-	if (hdrlen == 0)
-		return -1;
+	if (hdrlen < 0)
+		return hdrlen;
 
 
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
@@ -680,15 +683,11 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 	rqst->rq_private_buf = rqst->rq_rcv_buf;
 	rqst->rq_private_buf = rqst->rq_rcv_buf;
 }
 }
 
 
-/*
- * This function is called when an async event is posted to
- * the connection which changes the connection state. All it
- * does at this point is mark the connection up/down, the rpc
- * timers do the rest.
- */
 void
 void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
+rpcrdma_connect_worker(struct work_struct *work)
 {
 {
+	struct rpcrdma_ep *ep =
+		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
 	struct rpc_xprt *xprt = ep->rep_xprt;
 	struct rpc_xprt *xprt = ep->rep_xprt;
 
 
 	spin_lock_bh(&xprt->transport_lock);
 	spin_lock_bh(&xprt->transport_lock);
@@ -705,13 +704,15 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
 }
 }
 
 
 /*
 /*
- * This function is called when memory window unbind which we are waiting
- * for completes. Just use rr_func (zeroed by upcall) to signal completion.
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
  */
  */
-static void
-rpcrdma_unbind_func(struct rpcrdma_rep *rep)
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
 {
 {
-	wake_up(&rep->rr_unbind);
+	schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 }
 
 
 /*
 /*
@@ -728,7 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	struct rpc_xprt *xprt = rep->rr_xprt;
 	struct rpc_xprt *xprt = rep->rr_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	__be32 *iptr;
 	__be32 *iptr;
-	int i, rdmalen, status;
+	int rdmalen, status;
+	unsigned long cwnd;
 
 
 	/* Check status. If bad, signal disconnect and return rep to pool */
 	/* Check status. If bad, signal disconnect and return rep to pool */
 	if (rep->rr_len == ~0U) {
 	if (rep->rr_len == ~0U) {
@@ -783,6 +785,7 @@ repost:
 
 
 	/* from here on, the reply is no longer an orphan */
 	/* from here on, the reply is no longer an orphan */
 	req->rl_reply = rep;
 	req->rl_reply = rep;
+	xprt->reestablish_timeout = 0;
 
 
 	/* check for expected message types */
 	/* check for expected message types */
 	/* The order of some of these tests is important. */
 	/* The order of some of these tests is important. */
@@ -857,26 +860,10 @@ badheader:
 		break;
 		break;
 	}
 	}
 
 
-	/* If using mw bind, start the deregister process now. */
-	/* (Note: if mr_free(), cannot perform it here, in tasklet context) */
-	if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) {
-	case RPCRDMA_MEMWINDOWS:
-		for (i = 0; req->rl_nchunks-- > 1;)
-			i += rpcrdma_deregister_external(
-				&req->rl_segments[i], r_xprt, NULL);
-		/* Optionally wait (not here) for unbinds to complete */
-		rep->rr_func = rpcrdma_unbind_func;
-		(void) rpcrdma_deregister_external(&req->rl_segments[i],
-						   r_xprt, rep);
-		break;
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-		for (i = 0; req->rl_nchunks--;)
-			i += rpcrdma_deregister_external(&req->rl_segments[i],
-							 r_xprt, NULL);
-		break;
-	default:
-		break;
-	}
+	cwnd = xprt->cwnd;
+	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+	if (xprt->cwnd > cwnd)
+		xprt_release_rqst_cong(rqst->rq_task);
 
 
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 			__func__, xprt, rqst, status);
 			__func__, xprt, rqst, status);

+ 30 - 60
net/sunrpc/xprtrdma/transport.c

@@ -149,6 +149,11 @@ static struct ctl_table sunrpc_table[] = {
 
 
 #endif
 #endif
 
 
+#define RPCRDMA_BIND_TO		(60U * HZ)
+#define RPCRDMA_INIT_REEST_TO	(5U * HZ)
+#define RPCRDMA_MAX_REEST_TO	(30U * HZ)
+#define RPCRDMA_IDLE_DISC_TO	(5U * 60 * HZ)
+
 static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
 static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
 
 
 static void
 static void
@@ -229,7 +234,6 @@ static void
 xprt_rdma_destroy(struct rpc_xprt *xprt)
 xprt_rdma_destroy(struct rpc_xprt *xprt)
 {
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-	int rc;
 
 
 	dprintk("RPC:       %s: called\n", __func__);
 	dprintk("RPC:       %s: called\n", __func__);
 
 
@@ -238,10 +242,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
 	xprt_clear_connected(xprt);
 	xprt_clear_connected(xprt);
 
 
 	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
 	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
-	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
-	if (rc)
-		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
-			__func__, rc);
+	rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
 	rpcrdma_ia_close(&r_xprt->rx_ia);
 	rpcrdma_ia_close(&r_xprt->rx_ia);
 
 
 	xprt_rdma_free_addresses(xprt);
 	xprt_rdma_free_addresses(xprt);
@@ -289,9 +290,9 @@ xprt_setup_rdma(struct xprt_create *args)
 
 
 	/* 60 second timeout, no retries */
 	/* 60 second timeout, no retries */
 	xprt->timeout = &xprt_rdma_default_timeout;
 	xprt->timeout = &xprt_rdma_default_timeout;
-	xprt->bind_timeout = (60U * HZ);
-	xprt->reestablish_timeout = (5U * HZ);
-	xprt->idle_timeout = (5U * 60 * HZ);
+	xprt->bind_timeout = RPCRDMA_BIND_TO;
+	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 
 	xprt->resvport = 0;		/* privileged port not needed */
 	xprt->resvport = 0;		/* privileged port not needed */
 	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
 	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
@@ -391,7 +392,7 @@ out4:
 	xprt_rdma_free_addresses(xprt);
 	xprt_rdma_free_addresses(xprt);
 	rc = -EINVAL;
 	rc = -EINVAL;
 out3:
 out3:
-	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+	rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
 out2:
 out2:
 	rpcrdma_ia_close(&new_xprt->rx_ia);
 	rpcrdma_ia_close(&new_xprt->rx_ia);
 out1:
 out1:
@@ -436,10 +437,10 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 		schedule_delayed_work(&r_xprt->rdma_connect,
 		schedule_delayed_work(&r_xprt->rdma_connect,
 			xprt->reestablish_timeout);
 			xprt->reestablish_timeout);
 		xprt->reestablish_timeout <<= 1;
 		xprt->reestablish_timeout <<= 1;
-		if (xprt->reestablish_timeout > (30 * HZ))
-			xprt->reestablish_timeout = (30 * HZ);
-		else if (xprt->reestablish_timeout < (5 * HZ))
-			xprt->reestablish_timeout = (5 * HZ);
+		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
+			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
+		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 	} else {
 	} else {
 		schedule_delayed_work(&r_xprt->rdma_connect, 0);
 		schedule_delayed_work(&r_xprt->rdma_connect, 0);
 		if (!RPC_IS_ASYNC(task))
 		if (!RPC_IS_ASYNC(task))
@@ -447,23 +448,6 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 	}
 	}
 }
 }
 
 
-static int
-xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-	int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
-
-	/* == RPC_CWNDSCALE @ init, but *after* setup */
-	if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
-		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
-		dprintk("RPC:       %s: cwndscale %lu\n", __func__,
-			r_xprt->rx_buf.rb_cwndscale);
-		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
-	}
-	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
-	return xprt_reserve_xprt_cong(xprt, task);
-}
-
 /*
 /*
  * The RDMA allocate/free functions need the task structure as a place
  * The RDMA allocate/free functions need the task structure as a place
  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
@@ -479,7 +463,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 	struct rpcrdma_req *req, *nreq;
 	struct rpcrdma_req *req, *nreq;
 
 
 	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
 	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
-	BUG_ON(NULL == req);
+	if (req == NULL)
+		return NULL;
 
 
 	if (size > req->rl_size) {
 	if (size > req->rl_size) {
 		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
 		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
@@ -503,18 +488,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 		 * If the allocation or registration fails, the RPC framework
 		 * If the allocation or registration fails, the RPC framework
 		 * will (doggedly) retry.
 		 * will (doggedly) retry.
 		 */
 		 */
-		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
-				RPCRDMA_BOUNCEBUFFERS) {
-			/* forced to "pure inline" */
-			dprintk("RPC:       %s: too much data (%zd) for inline "
-					"(r/w max %d/%d)\n", __func__, size,
-					rpcx_to_rdmad(xprt).inline_rsize,
-					rpcx_to_rdmad(xprt).inline_wsize);
-			size = req->rl_size;
-			rpc_exit(task, -EIO);		/* fail the operation */
-			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
-			goto out;
-		}
 		if (task->tk_flags & RPC_TASK_SWAPPER)
 		if (task->tk_flags & RPC_TASK_SWAPPER)
 			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
 			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
 		else
 		else
@@ -543,7 +516,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 		req = nreq;
 		req = nreq;
 	}
 	}
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
-out:
 	req->rl_connect_cookie = 0;	/* our reserved value */
 	req->rl_connect_cookie = 0;	/* our reserved value */
 	return req->rl_xdr_buf;
 	return req->rl_xdr_buf;
 
 
@@ -579,9 +551,7 @@ xprt_rdma_free(void *buffer)
 		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
 		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
 
 
 	/*
 	/*
-	 * Finish the deregistration. When using mw bind, this was
-	 * begun in rpcrdma_reply_handler(). In all other modes, we
-	 * do it here, in thread context. The process is considered
+	 * Finish the deregistration.  The process is considered
 	 * complete when the rr_func vector becomes NULL - this
 	 * complete when the rr_func vector becomes NULL - this
 	 * was put in place during rpcrdma_reply_handler() - the wait
 	 * was put in place during rpcrdma_reply_handler() - the wait
 	 * call below will not block if the dereg is "done". If
 	 * call below will not block if the dereg is "done". If
@@ -590,12 +560,7 @@ xprt_rdma_free(void *buffer)
 	for (i = 0; req->rl_nchunks;) {
 	for (i = 0; req->rl_nchunks;) {
 		--req->rl_nchunks;
 		--req->rl_nchunks;
 		i += rpcrdma_deregister_external(
 		i += rpcrdma_deregister_external(
-			&req->rl_segments[i], r_xprt, NULL);
-	}
-
-	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
-		rep->rr_func = NULL;	/* abandon the callback */
-		req->rl_reply = NULL;
+			&req->rl_segments[i], r_xprt);
 	}
 	}
 
 
 	if (req->rl_iov.length == 0) {	/* see allocate above */
 	if (req->rl_iov.length == 0) {	/* see allocate above */
@@ -630,13 +595,12 @@ xprt_rdma_send_request(struct rpc_task *task)
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	int rc;
 
 
-	/* marshal the send itself */
-	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
-		r_xprt->rx_stats.failed_marshal_count++;
-		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
-			__func__);
-		return -EIO;
+	if (req->rl_niovs == 0) {
+		rc = rpcrdma_marshal_req(rqst);
+		if (rc < 0)
+			goto failed_marshal;
 	}
 	}
 
 
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
@@ -660,6 +624,12 @@ xprt_rdma_send_request(struct rpc_task *task)
 	rqst->rq_bytes_sent = 0;
 	rqst->rq_bytes_sent = 0;
 	return 0;
 	return 0;
 
 
+failed_marshal:
+	r_xprt->rx_stats.failed_marshal_count++;
+	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
+		__func__, rc);
+	if (rc == -EIO)
+		return -EIO;
 drop_connection:
 drop_connection:
 	xprt_disconnect_done(xprt);
 	xprt_disconnect_done(xprt);
 	return -ENOTCONN;	/* implies disconnect */
 	return -ENOTCONN;	/* implies disconnect */
@@ -705,7 +675,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
  */
  */
 
 
 static struct rpc_xprt_ops xprt_rdma_procs = {
 static struct rpc_xprt_ops xprt_rdma_procs = {
-	.reserve_xprt		= xprt_rdma_reserve_xprt,
+	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
 	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
 	.alloc_slot		= xprt_alloc_slot,
 	.alloc_slot		= xprt_alloc_slot,
 	.release_request	= xprt_release_rqst_cong,       /* ditto */
 	.release_request	= xprt_release_rqst_cong,       /* ditto */

+ 302 - 451
net/sunrpc/xprtrdma/verbs.c

@@ -48,8 +48,8 @@
  */
  */
 
 
 #include <linux/interrupt.h>
 #include <linux/interrupt.h>
-#include <linux/pci.h>	/* for Tavor hack below */
 #include <linux/slab.h>
 #include <linux/slab.h>
+#include <asm/bitops.h>
 
 
 #include "xprt_rdma.h"
 #include "xprt_rdma.h"
 
 
@@ -142,98 +142,139 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
 	}
 	}
 }
 }
 
 
-static inline
-void rpcrdma_event_process(struct ib_wc *wc)
+static void
+rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 {
 {
-	struct rpcrdma_mw *frmr;
-	struct rpcrdma_rep *rep =
-			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
+	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
 
 
-	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
-		__func__, rep, wc->status, wc->opcode, wc->byte_len);
+	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
+		__func__, frmr, wc->status, wc->opcode);
 
 
-	if (!rep) /* send or bind completion that we don't care about */
+	if (wc->wr_id == 0ULL)
 		return;
 		return;
-
-	if (IB_WC_SUCCESS != wc->status) {
-		dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n",
-			__func__, wc->opcode, wc->status);
-		rep->rr_len = ~0U;
-		if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
-			rpcrdma_schedule_tasklet(rep);
+	if (wc->status != IB_WC_SUCCESS)
 		return;
 		return;
-	}
 
 
-	switch (wc->opcode) {
-	case IB_WC_FAST_REG_MR:
-		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+	if (wc->opcode == IB_WC_FAST_REG_MR)
 		frmr->r.frmr.state = FRMR_IS_VALID;
 		frmr->r.frmr.state = FRMR_IS_VALID;
-		break;
-	case IB_WC_LOCAL_INV:
-		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+	else if (wc->opcode == IB_WC_LOCAL_INV)
 		frmr->r.frmr.state = FRMR_IS_INVALID;
 		frmr->r.frmr.state = FRMR_IS_INVALID;
-		break;
-	case IB_WC_RECV:
-		rep->rr_len = wc->byte_len;
-		ib_dma_sync_single_for_cpu(
-			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
-			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
-		/* Keep (only) the most recent credits, after check validity */
-		if (rep->rr_len >= 16) {
-			struct rpcrdma_msg *p =
-					(struct rpcrdma_msg *) rep->rr_base;
-			unsigned int credits = ntohl(p->rm_credit);
-			if (credits == 0) {
-				dprintk("RPC:       %s: server"
-					" dropped credits to 0!\n", __func__);
-				/* don't deadlock */
-				credits = 1;
-			} else if (credits > rep->rr_buffer->rb_max_requests) {
-				dprintk("RPC:       %s: server"
-					" over-crediting: %d (%d)\n",
-					__func__, credits,
-					rep->rr_buffer->rb_max_requests);
-				credits = rep->rr_buffer->rb_max_requests;
-			}
-			atomic_set(&rep->rr_buffer->rb_credits, credits);
-		}
-		/* fall through */
-	case IB_WC_BIND_MW:
-		rpcrdma_schedule_tasklet(rep);
-		break;
-	default:
-		dprintk("RPC:       %s: unexpected WC event %X\n",
-			__func__, wc->opcode);
-		break;
-	}
 }
 }
 
 
-static inline int
-rpcrdma_cq_poll(struct ib_cq *cq)
+static int
+rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
 {
 {
-	struct ib_wc wc;
-	int rc;
+	struct ib_wc *wcs;
+	int budget, count, rc;
 
 
-	for (;;) {
-		rc = ib_poll_cq(cq, 1, &wc);
-		if (rc < 0) {
-			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
-				__func__, rc);
+	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
+	do {
+		wcs = ep->rep_send_wcs;
+
+		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
+		if (rc <= 0)
 			return rc;
 			return rc;
-		}
-		if (rc == 0)
-			break;
 
 
-		rpcrdma_event_process(&wc);
+		count = rc;
+		while (count-- > 0)
+			rpcrdma_sendcq_process_wc(wcs++);
+	} while (rc == RPCRDMA_POLLSIZE && --budget);
+	return 0;
+}
+
+/*
+ * Handle send, fast_reg_mr, and local_inv completions.
+ *
+ * Send events are typically suppressed and thus do not result
+ * in an upcall. Occasionally one is signaled, however. This
+ * prevents the provider's completion queue from wrapping and
+ * losing a completion.
+ */
+static void
+rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+{
+	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
+	int rc;
+
+	rc = rpcrdma_sendcq_poll(cq, ep);
+	if (rc) {
+		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
+			__func__, rc);
+		return;
 	}
 	}
 
 
+	rc = ib_req_notify_cq(cq,
+			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
+	if (rc == 0)
+		return;
+	if (rc < 0) {
+		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
+			__func__, rc);
+		return;
+	}
+
+	rpcrdma_sendcq_poll(cq, ep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+{
+	struct rpcrdma_rep *rep =
+			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+
+	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
+		__func__, rep, wc->status, wc->opcode, wc->byte_len);
+
+	if (wc->status != IB_WC_SUCCESS) {
+		rep->rr_len = ~0U;
+		goto out_schedule;
+	}
+	if (wc->opcode != IB_WC_RECV)
+		return;
+
+	rep->rr_len = wc->byte_len;
+	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
+
+	if (rep->rr_len >= 16) {
+		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
+		unsigned int credits = ntohl(p->rm_credit);
+
+		if (credits == 0)
+			credits = 1;	/* don't deadlock */
+		else if (credits > rep->rr_buffer->rb_max_requests)
+			credits = rep->rr_buffer->rb_max_requests;
+		atomic_set(&rep->rr_buffer->rb_credits, credits);
+	}
+
+out_schedule:
+	rpcrdma_schedule_tasklet(rep);
+}
+
+static int
+rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+{
+	struct ib_wc *wcs;
+	int budget, count, rc;
+
+	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
+	do {
+		wcs = ep->rep_recv_wcs;
+
+		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
+		if (rc <= 0)
+			return rc;
+
+		count = rc;
+		while (count-- > 0)
+			rpcrdma_recvcq_process_wc(wcs++);
+	} while (rc == RPCRDMA_POLLSIZE && --budget);
 	return 0;
 	return 0;
 }
 }
 
 
 /*
 /*
- * rpcrdma_cq_event_upcall
+ * Handle receive completions.
  *
  *
- * This upcall handles recv, send, bind and unbind events.
  * It is reentrant but processes single events in order to maintain
  * It is reentrant but processes single events in order to maintain
  * ordering of receives to keep server credits.
  * ordering of receives to keep server credits.
  *
  *
@@ -242,26 +283,31 @@ rpcrdma_cq_poll(struct ib_cq *cq)
  * connection shutdown. That is, the structures required for
  * connection shutdown. That is, the structures required for
  * the completion of the reply handler must remain intact until
  * the completion of the reply handler must remain intact until
  * all memory has been reclaimed.
  * all memory has been reclaimed.
- *
- * Note that send events are suppressed and do not result in an upcall.
  */
  */
 static void
 static void
-rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
+rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
 {
+	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
 	int rc;
 	int rc;
 
 
-	rc = rpcrdma_cq_poll(cq);
-	if (rc)
+	rc = rpcrdma_recvcq_poll(cq, ep);
+	if (rc) {
+		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
+			__func__, rc);
 		return;
 		return;
+	}
 
 
-	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-	if (rc) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
+	rc = ib_req_notify_cq(cq,
+			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
+	if (rc == 0)
+		return;
+	if (rc < 0) {
+		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 			__func__, rc);
 			__func__, rc);
 		return;
 		return;
 	}
 	}
 
 
-	rpcrdma_cq_poll(cq);
+	rpcrdma_recvcq_poll(cq, ep);
 }
 }
 
 
 #ifdef RPC_DEBUG
 #ifdef RPC_DEBUG
@@ -493,54 +539,32 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
 	}
 	}
 
 
-	switch (memreg) {
-	case RPCRDMA_MEMWINDOWS:
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
-			dprintk("RPC:       %s: MEMWINDOWS registration "
-				"specified but not supported by adapter, "
-				"using slower RPCRDMA_REGISTER\n",
-				__func__);
-			memreg = RPCRDMA_REGISTER;
-		}
-		break;
-	case RPCRDMA_MTHCAFMR:
-		if (!ia->ri_id->device->alloc_fmr) {
-#if RPCRDMA_PERSISTENT_REGISTRATION
-			dprintk("RPC:       %s: MTHCAFMR registration "
-				"specified but not supported by adapter, "
-				"using riskier RPCRDMA_ALLPHYSICAL\n",
-				__func__);
-			memreg = RPCRDMA_ALLPHYSICAL;
-#else
-			dprintk("RPC:       %s: MTHCAFMR registration "
-				"specified but not supported by adapter, "
-				"using slower RPCRDMA_REGISTER\n",
-				__func__);
-			memreg = RPCRDMA_REGISTER;
-#endif
-		}
-		break;
-	case RPCRDMA_FRMR:
+	if (memreg == RPCRDMA_FRMR) {
 		/* Requires both frmr reg and local dma lkey */
 		/* Requires both frmr reg and local dma lkey */
 		if ((devattr.device_cap_flags &
 		if ((devattr.device_cap_flags &
 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
-#if RPCRDMA_PERSISTENT_REGISTRATION
 			dprintk("RPC:       %s: FRMR registration "
 			dprintk("RPC:       %s: FRMR registration "
-				"specified but not supported by adapter, "
-				"using riskier RPCRDMA_ALLPHYSICAL\n",
-				__func__);
+				"not supported by HCA\n", __func__);
+			memreg = RPCRDMA_MTHCAFMR;
+		} else {
+			/* Mind the ia limit on FRMR page list depth */
+			ia->ri_max_frmr_depth = min_t(unsigned int,
+				RPCRDMA_MAX_DATA_SEGS,
+				devattr.max_fast_reg_page_list_len);
+		}
+	}
+	if (memreg == RPCRDMA_MTHCAFMR) {
+		if (!ia->ri_id->device->alloc_fmr) {
+			dprintk("RPC:       %s: MTHCAFMR registration "
+				"not supported by HCA\n", __func__);
+#if RPCRDMA_PERSISTENT_REGISTRATION
 			memreg = RPCRDMA_ALLPHYSICAL;
 			memreg = RPCRDMA_ALLPHYSICAL;
 #else
 #else
-			dprintk("RPC:       %s: FRMR registration "
-				"specified but not supported by adapter, "
-				"using slower RPCRDMA_REGISTER\n",
-				__func__);
-			memreg = RPCRDMA_REGISTER;
+			rc = -ENOMEM;
+			goto out2;
 #endif
 #endif
 		}
 		}
-		break;
 	}
 	}
 
 
 	/*
 	/*
@@ -552,8 +576,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	 * adapter.
 	 * adapter.
 	 */
 	 */
 	switch (memreg) {
 	switch (memreg) {
-	case RPCRDMA_BOUNCEBUFFERS:
-	case RPCRDMA_REGISTER:
 	case RPCRDMA_FRMR:
 	case RPCRDMA_FRMR:
 		break;
 		break;
 #if RPCRDMA_PERSISTENT_REGISTRATION
 #if RPCRDMA_PERSISTENT_REGISTRATION
@@ -563,30 +585,26 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 				IB_ACCESS_REMOTE_READ;
 				IB_ACCESS_REMOTE_READ;
 		goto register_setup;
 		goto register_setup;
 #endif
 #endif
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		mem_priv = IB_ACCESS_LOCAL_WRITE |
-				IB_ACCESS_MW_BIND;
-		goto register_setup;
 	case RPCRDMA_MTHCAFMR:
 	case RPCRDMA_MTHCAFMR:
 		if (ia->ri_have_dma_lkey)
 		if (ia->ri_have_dma_lkey)
 			break;
 			break;
 		mem_priv = IB_ACCESS_LOCAL_WRITE;
 		mem_priv = IB_ACCESS_LOCAL_WRITE;
+#if RPCRDMA_PERSISTENT_REGISTRATION
 	register_setup:
 	register_setup:
+#endif
 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
 		if (IS_ERR(ia->ri_bind_mem)) {
 		if (IS_ERR(ia->ri_bind_mem)) {
 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
-				"phys register failed with %lX\n\t"
-				"Will continue with degraded performance\n",
+				"phys register failed with %lX\n",
 				__func__, PTR_ERR(ia->ri_bind_mem));
 				__func__, PTR_ERR(ia->ri_bind_mem));
-			memreg = RPCRDMA_REGISTER;
-			ia->ri_bind_mem = NULL;
+			rc = -ENOMEM;
+			goto out2;
 		}
 		}
 		break;
 		break;
 	default:
 	default:
-		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
-				__func__, memreg);
-		rc = -EINVAL;
+		printk(KERN_ERR "RPC: Unsupported memory "
+				"registration mode: %d\n", memreg);
+		rc = -ENOMEM;
 		goto out2;
 		goto out2;
 	}
 	}
 	dprintk("RPC:       %s: memory registration strategy is %d\n",
 	dprintk("RPC:       %s: memory registration strategy is %d\n",
@@ -640,6 +658,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 				struct rpcrdma_create_data_internal *cdata)
 				struct rpcrdma_create_data_internal *cdata)
 {
 {
 	struct ib_device_attr devattr;
 	struct ib_device_attr devattr;
+	struct ib_cq *sendcq, *recvcq;
 	int rc, err;
 	int rc, err;
 
 
 	rc = ib_query_device(ia->ri_id->device, &devattr);
 	rc = ib_query_device(ia->ri_id->device, &devattr);
@@ -659,32 +678,42 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 	switch (ia->ri_memreg_strategy) {
 	switch (ia->ri_memreg_strategy) {
-	case RPCRDMA_FRMR:
+	case RPCRDMA_FRMR: {
+		int depth = 7;
+
 		/* Add room for frmr register and invalidate WRs.
 		/* Add room for frmr register and invalidate WRs.
 		 * 1. FRMR reg WR for head
 		 * 1. FRMR reg WR for head
 		 * 2. FRMR invalidate WR for head
 		 * 2. FRMR invalidate WR for head
-		 * 3. FRMR reg WR for pagelist
-		 * 4. FRMR invalidate WR for pagelist
+		 * 3. N FRMR reg WRs for pagelist
+		 * 4. N FRMR invalidate WRs for pagelist
 		 * 5. FRMR reg WR for tail
 		 * 5. FRMR reg WR for tail
 		 * 6. FRMR invalidate WR for tail
 		 * 6. FRMR invalidate WR for tail
 		 * 7. The RDMA_SEND WR
 		 * 7. The RDMA_SEND WR
 		 */
 		 */
-		ep->rep_attr.cap.max_send_wr *= 7;
+
+		/* Calculate N if the device max FRMR depth is smaller than
+		 * RPCRDMA_MAX_DATA_SEGS.
+		 */
+		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
+			int delta = RPCRDMA_MAX_DATA_SEGS -
+				    ia->ri_max_frmr_depth;
+
+			do {
+				depth += 2; /* FRMR reg + invalidate */
+				delta -= ia->ri_max_frmr_depth;
+			} while (delta > 0);
+
+		}
+		ep->rep_attr.cap.max_send_wr *= depth;
 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
-			cdata->max_requests = devattr.max_qp_wr / 7;
+			cdata->max_requests = devattr.max_qp_wr / depth;
 			if (!cdata->max_requests)
 			if (!cdata->max_requests)
 				return -EINVAL;
 				return -EINVAL;
-			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
+			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
+						       depth;
 		}
 		}
 		break;
 		break;
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		/* Add room for mw_binds+unbinds - overkill! */
-		ep->rep_attr.cap.max_send_wr++;
-		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
-		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
-			return -EINVAL;
-		break;
+	}
 	default:
 	default:
 		break;
 		break;
 	}
 	}
@@ -705,46 +734,51 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 		ep->rep_attr.cap.max_recv_sge);
 		ep->rep_attr.cap.max_recv_sge);
 
 
 	/* set trigger for requesting send completion */
 	/* set trigger for requesting send completion */
-	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
-	switch (ia->ri_memreg_strategy) {
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
-		break;
-	default:
-		break;
-	}
+	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 	if (ep->rep_cqinit <= 2)
 	if (ep->rep_cqinit <= 2)
 		ep->rep_cqinit = 0;
 		ep->rep_cqinit = 0;
 	INIT_CQCOUNT(ep);
 	INIT_CQCOUNT(ep);
 	ep->rep_ia = ia;
 	ep->rep_ia = ia;
 	init_waitqueue_head(&ep->rep_connect_wait);
 	init_waitqueue_head(&ep->rep_connect_wait);
+	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
 
-	/*
-	 * Create a single cq for receive dto and mw_bind (only ever
-	 * care about unbind, really). Send completions are suppressed.
-	 * Use single threaded tasklet upcalls to maintain ordering.
-	 */
-	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
-				  rpcrdma_cq_async_error_upcall, NULL,
-				  ep->rep_attr.cap.max_recv_wr +
+	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
+				  rpcrdma_cq_async_error_upcall, ep,
 				  ep->rep_attr.cap.max_send_wr + 1, 0);
 				  ep->rep_attr.cap.max_send_wr + 1, 0);
-	if (IS_ERR(ep->rep_cq)) {
-		rc = PTR_ERR(ep->rep_cq);
-		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
+	if (IS_ERR(sendcq)) {
+		rc = PTR_ERR(sendcq);
+		dprintk("RPC:       %s: failed to create send CQ: %i\n",
 			__func__, rc);
 			__func__, rc);
 		goto out1;
 		goto out1;
 	}
 	}
 
 
-	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
+	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
+	if (rc) {
+		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
+			__func__, rc);
+		goto out2;
+	}
+
+	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
+				  rpcrdma_cq_async_error_upcall, ep,
+				  ep->rep_attr.cap.max_recv_wr + 1, 0);
+	if (IS_ERR(recvcq)) {
+		rc = PTR_ERR(recvcq);
+		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
+			__func__, rc);
+		goto out2;
+	}
+
+	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
 	if (rc) {
 	if (rc) {
 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
 			__func__, rc);
 			__func__, rc);
+		ib_destroy_cq(recvcq);
 		goto out2;
 		goto out2;
 	}
 	}
 
 
-	ep->rep_attr.send_cq = ep->rep_cq;
-	ep->rep_attr.recv_cq = ep->rep_cq;
+	ep->rep_attr.send_cq = sendcq;
+	ep->rep_attr.recv_cq = recvcq;
 
 
 	/* Initialize cma parameters */
 	/* Initialize cma parameters */
 
 
@@ -754,9 +788,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
 
 	/* Client offers RDMA Read but does not initiate */
 	/* Client offers RDMA Read but does not initiate */
 	ep->rep_remote_cma.initiator_depth = 0;
 	ep->rep_remote_cma.initiator_depth = 0;
-	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
-		ep->rep_remote_cma.responder_resources = 0;
-	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
+	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
 		ep->rep_remote_cma.responder_resources = 32;
 		ep->rep_remote_cma.responder_resources = 32;
 	else
 	else
 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
@@ -768,7 +800,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	return 0;
 	return 0;
 
 
 out2:
 out2:
-	err = ib_destroy_cq(ep->rep_cq);
+	err = ib_destroy_cq(sendcq);
 	if (err)
 	if (err)
 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 			__func__, err);
 			__func__, err);
@@ -782,11 +814,8 @@ out1:
  * Disconnect and destroy endpoint. After this, the only
  * Disconnect and destroy endpoint. After this, the only
  * valid operations on the ep are to free it (if dynamically
  * valid operations on the ep are to free it (if dynamically
  * allocated) or re-create it.
  * allocated) or re-create it.
- *
- * The caller's error handling must be sure to not leak the endpoint
- * if this function fails.
  */
  */
-int
+void
 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
 {
 	int rc;
 	int rc;
@@ -794,6 +823,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	dprintk("RPC:       %s: entering, connected is %d\n",
 	dprintk("RPC:       %s: entering, connected is %d\n",
 		__func__, ep->rep_connected);
 		__func__, ep->rep_connected);
 
 
+	cancel_delayed_work_sync(&ep->rep_connect_worker);
+
 	if (ia->ri_id->qp) {
 	if (ia->ri_id->qp) {
 		rc = rpcrdma_ep_disconnect(ep, ia);
 		rc = rpcrdma_ep_disconnect(ep, ia);
 		if (rc)
 		if (rc)
@@ -809,13 +840,17 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 		ep->rep_pad_mr = NULL;
 		ep->rep_pad_mr = NULL;
 	}
 	}
 
 
-	rpcrdma_clean_cq(ep->rep_cq);
-	rc = ib_destroy_cq(ep->rep_cq);
+	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
 	if (rc)
 	if (rc)
 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 			__func__, rc);
 			__func__, rc);
 
 
-	return rc;
+	rpcrdma_clean_cq(ep->rep_attr.send_cq);
+	rc = ib_destroy_cq(ep->rep_attr.send_cq);
+	if (rc)
+		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
+			__func__, rc);
 }
 }
 
 
 /*
 /*
@@ -831,17 +866,20 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	if (ep->rep_connected != 0) {
 	if (ep->rep_connected != 0) {
 		struct rpcrdma_xprt *xprt;
 		struct rpcrdma_xprt *xprt;
 retry:
 retry:
+		dprintk("RPC:       %s: reconnecting...\n", __func__);
 		rc = rpcrdma_ep_disconnect(ep, ia);
 		rc = rpcrdma_ep_disconnect(ep, ia);
 		if (rc && rc != -ENOTCONN)
 		if (rc && rc != -ENOTCONN)
 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
 				" status %i\n", __func__, rc);
 				" status %i\n", __func__, rc);
-		rpcrdma_clean_cq(ep->rep_cq);
+
+		rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+		rpcrdma_clean_cq(ep->rep_attr.send_cq);
 
 
 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 		id = rpcrdma_create_id(xprt, ia,
 		id = rpcrdma_create_id(xprt, ia,
 				(struct sockaddr *)&xprt->rx_data.addr);
 				(struct sockaddr *)&xprt->rx_data.addr);
 		if (IS_ERR(id)) {
 		if (IS_ERR(id)) {
-			rc = PTR_ERR(id);
+			rc = -EHOSTUNREACH;
 			goto out;
 			goto out;
 		}
 		}
 		/* TEMP TEMP TEMP - fail if new device:
 		/* TEMP TEMP TEMP - fail if new device:
@@ -855,35 +893,32 @@ retry:
 			printk("RPC:       %s: can't reconnect on "
 			printk("RPC:       %s: can't reconnect on "
 				"different device!\n", __func__);
 				"different device!\n", __func__);
 			rdma_destroy_id(id);
 			rdma_destroy_id(id);
-			rc = -ENETDOWN;
+			rc = -ENETUNREACH;
 			goto out;
 			goto out;
 		}
 		}
 		/* END TEMP */
 		/* END TEMP */
+		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+		if (rc) {
+			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
+				__func__, rc);
+			rdma_destroy_id(id);
+			rc = -ENETUNREACH;
+			goto out;
+		}
 		rdma_destroy_qp(ia->ri_id);
 		rdma_destroy_qp(ia->ri_id);
 		rdma_destroy_id(ia->ri_id);
 		rdma_destroy_id(ia->ri_id);
 		ia->ri_id = id;
 		ia->ri_id = id;
+	} else {
+		dprintk("RPC:       %s: connecting...\n", __func__);
+		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+		if (rc) {
+			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
+				__func__, rc);
+			/* do not update ep->rep_connected */
+			return -ENETUNREACH;
+		}
 	}
 	}
 
 
-	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
-	if (rc) {
-		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
-			__func__, rc);
-		goto out;
-	}
-
-/* XXX Tavor device performs badly with 2K MTU! */
-if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
-	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
-	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
-	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
-	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
-		struct ib_qp_attr attr = {
-			.path_mtu = IB_MTU_1024
-		};
-		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
-	}
-}
-
 	ep->rep_connected = 0;
 	ep->rep_connected = 0;
 
 
 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
@@ -944,7 +979,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
 {
 	int rc;
 	int rc;
 
 
-	rpcrdma_clean_cq(ep->rep_cq);
+	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+	rpcrdma_clean_cq(ep->rep_attr.send_cq);
 	rc = rdma_disconnect(ia->ri_id);
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc) {
 	if (!rc) {
 		/* returns without wait if not connected */
 		/* returns without wait if not connected */
@@ -967,7 +1003,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
 {
 {
 	char *p;
 	char *p;
-	size_t len;
+	size_t len, rlen, wlen;
 	int i, rc;
 	int i, rc;
 	struct rpcrdma_mw *r;
 	struct rpcrdma_mw *r;
 
 
@@ -997,11 +1033,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
 				sizeof(struct rpcrdma_mw);
 				sizeof(struct rpcrdma_mw);
 		break;
 		break;
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
-				sizeof(struct rpcrdma_mw);
-		break;
 	default:
 	default:
 		break;
 		break;
 	}
 	}
@@ -1032,32 +1063,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	}
 	}
 	p += cdata->padding;
 	p += cdata->padding;
 
 
-	/*
-	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
-	 * We "cycle" the mw's in order to minimize rkey reuse,
-	 * and also reduce unbind-to-bind collision.
-	 */
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	r = (struct rpcrdma_mw *)p;
 	r = (struct rpcrdma_mw *)p;
 	switch (ia->ri_memreg_strategy) {
 	switch (ia->ri_memreg_strategy) {
 	case RPCRDMA_FRMR:
 	case RPCRDMA_FRMR:
 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
-							 RPCRDMA_MAX_SEGS);
+						ia->ri_max_frmr_depth);
 			if (IS_ERR(r->r.frmr.fr_mr)) {
 			if (IS_ERR(r->r.frmr.fr_mr)) {
 				rc = PTR_ERR(r->r.frmr.fr_mr);
 				rc = PTR_ERR(r->r.frmr.fr_mr);
 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
 					" failed %i\n", __func__, rc);
 					" failed %i\n", __func__, rc);
 				goto out;
 				goto out;
 			}
 			}
-			r->r.frmr.fr_pgl =
-				ib_alloc_fast_reg_page_list(ia->ri_id->device,
-							    RPCRDMA_MAX_SEGS);
+			r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
+						ia->ri_id->device,
+						ia->ri_max_frmr_depth);
 			if (IS_ERR(r->r.frmr.fr_pgl)) {
 			if (IS_ERR(r->r.frmr.fr_pgl)) {
 				rc = PTR_ERR(r->r.frmr.fr_pgl);
 				rc = PTR_ERR(r->r.frmr.fr_pgl);
 				dprintk("RPC:       %s: "
 				dprintk("RPC:       %s: "
 					"ib_alloc_fast_reg_page_list "
 					"ib_alloc_fast_reg_page_list "
 					"failed %i\n", __func__, rc);
 					"failed %i\n", __func__, rc);
+
+				ib_dereg_mr(r->r.frmr.fr_mr);
 				goto out;
 				goto out;
 			}
 			}
 			list_add(&r->mw_list, &buf->rb_mws);
 			list_add(&r->mw_list, &buf->rb_mws);
@@ -1082,21 +1110,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 			++r;
 			++r;
 		}
 		}
 		break;
 		break;
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		/* Allocate one extra request's worth, for full cycling */
-		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
-			r->r.mw = ib_alloc_mw(ia->ri_pd, IB_MW_TYPE_1);
-			if (IS_ERR(r->r.mw)) {
-				rc = PTR_ERR(r->r.mw);
-				dprintk("RPC:       %s: ib_alloc_mw"
-					" failed %i\n", __func__, rc);
-				goto out;
-			}
-			list_add(&r->mw_list, &buf->rb_mws);
-			++r;
-		}
-		break;
 	default:
 	default:
 		break;
 		break;
 	}
 	}
@@ -1105,16 +1118,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	 * Allocate/init the request/reply buffers. Doing this
 	 * Allocate/init the request/reply buffers. Doing this
 	 * using kmalloc for now -- one for each buf.
 	 * using kmalloc for now -- one for each buf.
 	 */
 	 */
+	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
+	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
+	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
+		__func__, wlen, rlen);
+
 	for (i = 0; i < buf->rb_max_requests; i++) {
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
 		struct rpcrdma_req *req;
 		struct rpcrdma_rep *rep;
 		struct rpcrdma_rep *rep;
 
 
-		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
-		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
-		/* Typical ~2400b, so rounding up saves work later */
-		if (len < 4096)
-			len = 4096;
-		req = kmalloc(len, GFP_KERNEL);
+		req = kmalloc(wlen, GFP_KERNEL);
 		if (req == NULL) {
 		if (req == NULL) {
 			dprintk("RPC:       %s: request buffer %d alloc"
 			dprintk("RPC:       %s: request buffer %d alloc"
 				" failed\n", __func__, i);
 				" failed\n", __func__, i);
@@ -1126,16 +1139,16 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 		buf->rb_send_bufs[i]->rl_buffer = buf;
 		buf->rb_send_bufs[i]->rl_buffer = buf;
 
 
 		rc = rpcrdma_register_internal(ia, req->rl_base,
 		rc = rpcrdma_register_internal(ia, req->rl_base,
-				len - offsetof(struct rpcrdma_req, rl_base),
+				wlen - offsetof(struct rpcrdma_req, rl_base),
 				&buf->rb_send_bufs[i]->rl_handle,
 				&buf->rb_send_bufs[i]->rl_handle,
 				&buf->rb_send_bufs[i]->rl_iov);
 				&buf->rb_send_bufs[i]->rl_iov);
 		if (rc)
 		if (rc)
 			goto out;
 			goto out;
 
 
-		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
+		buf->rb_send_bufs[i]->rl_size = wlen -
+						sizeof(struct rpcrdma_req);
 
 
-		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
-		rep = kmalloc(len, GFP_KERNEL);
+		rep = kmalloc(rlen, GFP_KERNEL);
 		if (rep == NULL) {
 		if (rep == NULL) {
 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 				__func__, i);
 				__func__, i);
@@ -1145,10 +1158,9 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 		memset(rep, 0, sizeof(struct rpcrdma_rep));
 		memset(rep, 0, sizeof(struct rpcrdma_rep));
 		buf->rb_recv_bufs[i] = rep;
 		buf->rb_recv_bufs[i] = rep;
 		buf->rb_recv_bufs[i]->rr_buffer = buf;
 		buf->rb_recv_bufs[i]->rr_buffer = buf;
-		init_waitqueue_head(&rep->rr_unbind);
 
 
 		rc = rpcrdma_register_internal(ia, rep->rr_base,
 		rc = rpcrdma_register_internal(ia, rep->rr_base,
-				len - offsetof(struct rpcrdma_rep, rr_base),
+				rlen - offsetof(struct rpcrdma_rep, rr_base),
 				&buf->rb_recv_bufs[i]->rr_handle,
 				&buf->rb_recv_bufs[i]->rr_handle,
 				&buf->rb_recv_bufs[i]->rr_iov);
 				&buf->rb_recv_bufs[i]->rr_iov);
 		if (rc)
 		if (rc)
@@ -1179,7 +1191,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 
 
 	/* clean up in reverse order from create
 	/* clean up in reverse order from create
 	 *   1.  recv mr memory (mr free, then kfree)
 	 *   1.  recv mr memory (mr free, then kfree)
-	 *   1a. bind mw memory
 	 *   2.  send mr memory (mr free, then kfree)
 	 *   2.  send mr memory (mr free, then kfree)
 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
 	 *   4.  arrays
 	 *   4.  arrays
@@ -1194,41 +1205,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 			kfree(buf->rb_recv_bufs[i]);
 			kfree(buf->rb_recv_bufs[i]);
 		}
 		}
 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
-			while (!list_empty(&buf->rb_mws)) {
-				r = list_entry(buf->rb_mws.next,
-					struct rpcrdma_mw, mw_list);
-				list_del(&r->mw_list);
-				switch (ia->ri_memreg_strategy) {
-				case RPCRDMA_FRMR:
-					rc = ib_dereg_mr(r->r.frmr.fr_mr);
-					if (rc)
-						dprintk("RPC:       %s:"
-							" ib_dereg_mr"
-							" failed %i\n",
-							__func__, rc);
-					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
-					break;
-				case RPCRDMA_MTHCAFMR:
-					rc = ib_dealloc_fmr(r->r.fmr);
-					if (rc)
-						dprintk("RPC:       %s:"
-							" ib_dealloc_fmr"
-							" failed %i\n",
-							__func__, rc);
-					break;
-				case RPCRDMA_MEMWINDOWS_ASYNC:
-				case RPCRDMA_MEMWINDOWS:
-					rc = ib_dealloc_mw(r->r.mw);
-					if (rc)
-						dprintk("RPC:       %s:"
-							" ib_dealloc_mw"
-							" failed %i\n",
-							__func__, rc);
-					break;
-				default:
-					break;
-				}
-			}
 			rpcrdma_deregister_internal(ia,
 			rpcrdma_deregister_internal(ia,
 					buf->rb_send_bufs[i]->rl_handle,
 					buf->rb_send_bufs[i]->rl_handle,
 					&buf->rb_send_bufs[i]->rl_iov);
 					&buf->rb_send_bufs[i]->rl_iov);
@@ -1236,6 +1212,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 		}
 		}
 	}
 	}
 
 
+	while (!list_empty(&buf->rb_mws)) {
+		r = list_entry(buf->rb_mws.next,
+			struct rpcrdma_mw, mw_list);
+		list_del(&r->mw_list);
+		switch (ia->ri_memreg_strategy) {
+		case RPCRDMA_FRMR:
+			rc = ib_dereg_mr(r->r.frmr.fr_mr);
+			if (rc)
+				dprintk("RPC:       %s:"
+					" ib_dereg_mr"
+					" failed %i\n",
+					__func__, rc);
+			ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+			break;
+		case RPCRDMA_MTHCAFMR:
+			rc = ib_dealloc_fmr(r->r.fmr);
+			if (rc)
+				dprintk("RPC:       %s:"
+					" ib_dealloc_fmr"
+					" failed %i\n",
+					__func__, rc);
+			break;
+		default:
+			break;
+		}
+	}
+
 	kfree(buf->rb_pool);
 	kfree(buf->rb_pool);
 }
 }
 
 
@@ -1299,21 +1302,17 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
 	int i;
 	int i;
 	unsigned long flags;
 	unsigned long flags;
 
 
-	BUG_ON(req->rl_nchunks != 0);
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
 	req->rl_niovs = 0;
 	req->rl_niovs = 0;
 	if (req->rl_reply) {
 	if (req->rl_reply) {
 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
-		init_waitqueue_head(&req->rl_reply->rr_unbind);
 		req->rl_reply->rr_func = NULL;
 		req->rl_reply->rr_func = NULL;
 		req->rl_reply = NULL;
 		req->rl_reply = NULL;
 	}
 	}
 	switch (ia->ri_memreg_strategy) {
 	switch (ia->ri_memreg_strategy) {
 	case RPCRDMA_FRMR:
 	case RPCRDMA_FRMR:
 	case RPCRDMA_MTHCAFMR:
 	case RPCRDMA_MTHCAFMR:
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
 		/*
 		/*
 		 * Cycle mw's back in reverse order, and "spin" them.
 		 * Cycle mw's back in reverse order, and "spin" them.
 		 * This delays and scrambles reuse as much as possible.
 		 * This delays and scrambles reuse as much as possible.
@@ -1358,8 +1357,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 
 
 /*
 /*
  * Put reply buffers back into pool when not attached to
  * Put reply buffers back into pool when not attached to
- * request. This happens in error conditions, and when
- * aborting unbinds. Pre-decrement counter/array index.
+ * request. This happens in error conditions.
  */
  */
 void
 void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
@@ -1498,8 +1496,8 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
 	seg1->mr_offset -= pageoff;	/* start of page */
 	seg1->mr_offset -= pageoff;	/* start of page */
 	seg1->mr_len += pageoff;
 	seg1->mr_len += pageoff;
 	len = -pageoff;
 	len = -pageoff;
-	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
-		*nsegs = RPCRDMA_MAX_DATA_SEGS;
+	if (*nsegs > ia->ri_max_frmr_depth)
+		*nsegs = ia->ri_max_frmr_depth;
 	for (page_no = i = 0; i < *nsegs;) {
 	for (page_no = i = 0; i < *nsegs;) {
 		rpcrdma_map_one(ia, seg, writing);
 		rpcrdma_map_one(ia, seg, writing);
 		pa = seg->mr_dma;
 		pa = seg->mr_dma;
@@ -1536,10 +1534,6 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
 	} else
 	} else
 		post_wr = &frmr_wr;
 		post_wr = &frmr_wr;
 
 
-	/* Bump the key */
-	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
-	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
-
 	/* Prepare FRMR WR */
 	/* Prepare FRMR WR */
 	memset(&frmr_wr, 0, sizeof frmr_wr);
 	memset(&frmr_wr, 0, sizeof frmr_wr);
 	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
 	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
@@ -1550,7 +1544,16 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
 	frmr_wr.wr.fast_reg.page_list_len = page_no;
 	frmr_wr.wr.fast_reg.page_list_len = page_no;
 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
 	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
 	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
-	BUG_ON(frmr_wr.wr.fast_reg.length < len);
+	if (frmr_wr.wr.fast_reg.length < len) {
+		while (seg1->mr_nsegs--)
+			rpcrdma_unmap_one(ia, seg++);
+		return -EIO;
+	}
+
+	/* Bump the key */
+	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
+	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
+
 	frmr_wr.wr.fast_reg.access_flags = (writing ?
 	frmr_wr.wr.fast_reg.access_flags = (writing ?
 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 				IB_ACCESS_REMOTE_READ);
 				IB_ACCESS_REMOTE_READ);
@@ -1661,135 +1664,6 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
 	return rc;
 	return rc;
 }
 }
 
 
-static int
-rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
-			int *nsegs, int writing, struct rpcrdma_ia *ia,
-			struct rpcrdma_xprt *r_xprt)
-{
-	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
-				  IB_ACCESS_REMOTE_READ);
-	struct ib_mw_bind param;
-	int rc;
-
-	*nsegs = 1;
-	rpcrdma_map_one(ia, seg, writing);
-	param.bind_info.mr = ia->ri_bind_mem;
-	param.wr_id = 0ULL;	/* no send cookie */
-	param.bind_info.addr = seg->mr_dma;
-	param.bind_info.length = seg->mr_len;
-	param.send_flags = 0;
-	param.bind_info.mw_access_flags = mem_priv;
-
-	DECR_CQCOUNT(&r_xprt->rx_ep);
-	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
-	if (rc) {
-		dprintk("RPC:       %s: failed ib_bind_mw "
-			"%u@0x%llx status %i\n",
-			__func__, seg->mr_len,
-			(unsigned long long)seg->mr_dma, rc);
-		rpcrdma_unmap_one(ia, seg);
-	} else {
-		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
-		seg->mr_base = param.bind_info.addr;
-		seg->mr_nsegs = 1;
-	}
-	return rc;
-}
-
-static int
-rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
-			struct rpcrdma_ia *ia,
-			struct rpcrdma_xprt *r_xprt, void **r)
-{
-	struct ib_mw_bind param;
-	LIST_HEAD(l);
-	int rc;
-
-	BUG_ON(seg->mr_nsegs != 1);
-	param.bind_info.mr = ia->ri_bind_mem;
-	param.bind_info.addr = 0ULL;	/* unbind */
-	param.bind_info.length = 0;
-	param.bind_info.mw_access_flags = 0;
-	if (*r) {
-		param.wr_id = (u64) (unsigned long) *r;
-		param.send_flags = IB_SEND_SIGNALED;
-		INIT_CQCOUNT(&r_xprt->rx_ep);
-	} else {
-		param.wr_id = 0ULL;
-		param.send_flags = 0;
-		DECR_CQCOUNT(&r_xprt->rx_ep);
-	}
-	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
-	rpcrdma_unmap_one(ia, seg);
-	if (rc)
-		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
-			" status %i\n", __func__, rc);
-	else
-		*r = NULL;	/* will upcall on completion */
-	return rc;
-}
-
-static int
-rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
-			int *nsegs, int writing, struct rpcrdma_ia *ia)
-{
-	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
-				  IB_ACCESS_REMOTE_READ);
-	struct rpcrdma_mr_seg *seg1 = seg;
-	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
-	int len, i, rc = 0;
-
-	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
-		*nsegs = RPCRDMA_MAX_DATA_SEGS;
-	for (len = 0, i = 0; i < *nsegs;) {
-		rpcrdma_map_one(ia, seg, writing);
-		ipb[i].addr = seg->mr_dma;
-		ipb[i].size = seg->mr_len;
-		len += seg->mr_len;
-		++seg;
-		++i;
-		/* Check for holes */
-		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
-		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
-			break;
-	}
-	seg1->mr_base = seg1->mr_dma;
-	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
-				ipb, i, mem_priv, &seg1->mr_base);
-	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
-		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
-		dprintk("RPC:       %s: failed ib_reg_phys_mr "
-			"%u@0x%llx (%d)... status %i\n",
-			__func__, len,
-			(unsigned long long)seg1->mr_dma, i, rc);
-		while (i--)
-			rpcrdma_unmap_one(ia, --seg);
-	} else {
-		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
-		seg1->mr_nsegs = i;
-		seg1->mr_len = len;
-	}
-	*nsegs = i;
-	return rc;
-}
-
-static int
-rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
-			struct rpcrdma_ia *ia)
-{
-	struct rpcrdma_mr_seg *seg1 = seg;
-	int rc;
-
-	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
-	seg1->mr_chunk.rl_mr = NULL;
-	while (seg1->mr_nsegs--)
-		rpcrdma_unmap_one(ia, seg++);
-	if (rc)
-		dprintk("RPC:       %s: failed ib_dereg_mr,"
-			" status %i\n", __func__, rc);
-	return rc;
-}
-
 int
 int
 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
@@ -1819,16 +1693,8 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
 		break;
 		break;
 
 
-	/* Registration using memory windows */
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
-		break;
-
-	/* Default registration each time */
 	default:
 	default:
-		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
-		break;
+		return -1;
 	}
 	}
 	if (rc)
 	if (rc)
 		return -1;
 		return -1;
@@ -1838,7 +1704,7 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 
 
 int
 int
 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
-		struct rpcrdma_xprt *r_xprt, void *r)
+		struct rpcrdma_xprt *r_xprt)
 {
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	int nsegs = seg->mr_nsegs, rc;
 	int nsegs = seg->mr_nsegs, rc;
@@ -1847,9 +1713,7 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 
 
 #if RPCRDMA_PERSISTENT_REGISTRATION
 #if RPCRDMA_PERSISTENT_REGISTRATION
 	case RPCRDMA_ALLPHYSICAL:
 	case RPCRDMA_ALLPHYSICAL:
-		BUG_ON(nsegs != 1);
 		rpcrdma_unmap_one(ia, seg);
 		rpcrdma_unmap_one(ia, seg);
-		rc = 0;
 		break;
 		break;
 #endif
 #endif
 
 
@@ -1861,21 +1725,9 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 		rc = rpcrdma_deregister_fmr_external(seg, ia);
 		rc = rpcrdma_deregister_fmr_external(seg, ia);
 		break;
 		break;
 
 
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-	case RPCRDMA_MEMWINDOWS:
-		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
-		break;
-
 	default:
 	default:
-		rc = rpcrdma_deregister_default_external(seg, ia);
 		break;
 		break;
 	}
 	}
-	if (r) {
-		struct rpcrdma_rep *rep = r;
-		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
-		rep->rr_func = NULL;
-		func(rep);	/* dereg done, callback now */
-	}
 	return nsegs;
 	return nsegs;
 }
 }
 
 
@@ -1950,7 +1802,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
 
 
-	DECR_CQCOUNT(ep);
 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 
 
 	if (rc)
 	if (rc)

+ 11 - 6
net/sunrpc/xprtrdma/xprt_rdma.h

@@ -43,6 +43,7 @@
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
 #include <linux/atomic.h>			/* atomic_t, etc */
 #include <linux/atomic.h>			/* atomic_t, etc */
+#include <linux/workqueue.h>		/* struct work_struct */
 
 
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
 #include <rdma/ib_verbs.h>		/* RDMA verbs api */
 #include <rdma/ib_verbs.h>		/* RDMA verbs api */
@@ -66,18 +67,21 @@ struct rpcrdma_ia {
 	struct completion	ri_done;
 	struct completion	ri_done;
 	int			ri_async_rc;
 	int			ri_async_rc;
 	enum rpcrdma_memreg	ri_memreg_strategy;
 	enum rpcrdma_memreg	ri_memreg_strategy;
+	unsigned int		ri_max_frmr_depth;
 };
 };
 
 
 /*
 /*
  * RDMA Endpoint -- one per transport instance
  * RDMA Endpoint -- one per transport instance
  */
  */
 
 
+#define RPCRDMA_WC_BUDGET	(128)
+#define RPCRDMA_POLLSIZE	(16)
+
 struct rpcrdma_ep {
 struct rpcrdma_ep {
 	atomic_t		rep_cqcount;
 	atomic_t		rep_cqcount;
 	int			rep_cqinit;
 	int			rep_cqinit;
 	int			rep_connected;
 	int			rep_connected;
 	struct rpcrdma_ia	*rep_ia;
 	struct rpcrdma_ia	*rep_ia;
-	struct ib_cq		*rep_cq;
 	struct ib_qp_init_attr	rep_attr;
 	struct ib_qp_init_attr	rep_attr;
 	wait_queue_head_t 	rep_connect_wait;
 	wait_queue_head_t 	rep_connect_wait;
 	struct ib_sge		rep_pad;	/* holds zeroed pad */
 	struct ib_sge		rep_pad;	/* holds zeroed pad */
@@ -86,6 +90,9 @@ struct rpcrdma_ep {
 	struct rpc_xprt		*rep_xprt;	/* for rep_func */
 	struct rpc_xprt		*rep_xprt;	/* for rep_func */
 	struct rdma_conn_param	rep_remote_cma;
 	struct rdma_conn_param	rep_remote_cma;
 	struct sockaddr_storage	rep_remote_addr;
 	struct sockaddr_storage	rep_remote_addr;
+	struct delayed_work	rep_connect_worker;
+	struct ib_wc		rep_send_wcs[RPCRDMA_POLLSIZE];
+	struct ib_wc		rep_recv_wcs[RPCRDMA_POLLSIZE];
 };
 };
 
 
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
@@ -124,7 +131,6 @@ struct rpcrdma_rep {
 	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */
 	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */
 	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
 	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
 	struct list_head rr_list;	/* tasklet list */
 	struct list_head rr_list;	/* tasklet list */
-	wait_queue_head_t rr_unbind;	/* optional unbind wait */
 	struct ib_sge	rr_iov;		/* for posting */
 	struct ib_sge	rr_iov;		/* for posting */
 	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */
 	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */
 	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
 	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
@@ -159,7 +165,6 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
 		struct ib_mr	*rl_mr;		/* if registered directly */
 		struct ib_mr	*rl_mr;		/* if registered directly */
 		struct rpcrdma_mw {		/* if registered from region */
 		struct rpcrdma_mw {		/* if registered from region */
 			union {
 			union {
-				struct ib_mw	*mw;
 				struct ib_fmr	*fmr;
 				struct ib_fmr	*fmr;
 				struct {
 				struct {
 					struct ib_fast_reg_page_list *fr_pgl;
 					struct ib_fast_reg_page_list *fr_pgl;
@@ -207,7 +212,6 @@ struct rpcrdma_req {
 struct rpcrdma_buffer {
 struct rpcrdma_buffer {
 	spinlock_t	rb_lock;	/* protects indexes */
 	spinlock_t	rb_lock;	/* protects indexes */
 	atomic_t	rb_credits;	/* most recent server credits */
 	atomic_t	rb_credits;	/* most recent server credits */
-	unsigned long	rb_cwndscale;	/* cached framework rpc_cwndscale */
 	int		rb_max_requests;/* client max requests */
 	int		rb_max_requests;/* client max requests */
 	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */
 	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */
 	int		rb_send_index;
 	int		rb_send_index;
@@ -300,7 +304,7 @@ void rpcrdma_ia_close(struct rpcrdma_ia *);
  */
  */
 int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
 int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
 				struct rpcrdma_create_data_internal *);
 				struct rpcrdma_create_data_internal *);
-int rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
 int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 int rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 
@@ -330,11 +334,12 @@ int rpcrdma_deregister_internal(struct rpcrdma_ia *,
 int rpcrdma_register_external(struct rpcrdma_mr_seg *,
 int rpcrdma_register_external(struct rpcrdma_mr_seg *,
 				int, int, struct rpcrdma_xprt *);
 				int, int, struct rpcrdma_xprt *);
 int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
 int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
-				struct rpcrdma_xprt *, void *);
+				struct rpcrdma_xprt *);
 
 
 /*
 /*
  * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
  * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
  */
  */
+void rpcrdma_connect_worker(struct work_struct *);
 void rpcrdma_conn_func(struct rpcrdma_ep *);
 void rpcrdma_conn_func(struct rpcrdma_ep *);
 void rpcrdma_reply_handler(struct rpcrdma_rep *);
 void rpcrdma_reply_handler(struct rpcrdma_rep *);