Эх сурвалжийг харах

Merge tag 'nfsd-4.6' of git://linux-nfs.org/~bfields/linux

Pull nfsd updates from Bruce Fields:
 "Various bugfixes, a RDMA update from Chuck Lever, and support for a
  new pnfs layout type from Christoph Hellwig.  The new layout type is a
  variant of the block layout which uses SCSI features to offer improved
  fencing and device identification.

  (Also: note this pull request also includes the client side of SCSI
  layout, with Trond's permission.)"

* tag 'nfsd-4.6' of git://linux-nfs.org/~bfields/linux:
  sunrpc/cache: drop reference when sunrpc_cache_pipe_upcall() detects a race
  nfsd: recover: fix memory leak
  nfsd: fix deadlock secinfo+readdir compound
  nfsd4: resfh unused in nfsd4_secinfo
  svcrdma: Use new CQ API for RPC-over-RDMA server send CQs
  svcrdma: Use new CQ API for RPC-over-RDMA server receive CQs
  svcrdma: Remove close_out exit path
  svcrdma: Hook up the logic to return ERR_CHUNK
  svcrdma: Use correct XID in error replies
  svcrdma: Make RDMA_ERROR messages work
  rpcrdma: Add RPCRDMA_HDRLEN_ERR
  svcrdma: svc_rdma_post_recv() should close connection on error
  svcrdma: Close connection when a send error occurs
  nfsd: Lower NFSv4.1 callback message size limit
  svcrdma: Do not send Write chunk XDR pad with inline content
  svcrdma: Do not write xdr_buf::tail in a Write chunk
  svcrdma: Find client-provided write and reply chunks once per reply
  nfsd: Update NFS server comments related to RDMA support
  nfsd: Fix a memory leak when meeting unsupported state_protect_how4
  nfsd4: fix bad bounds checking
Linus Torvalds 9 жил өмнө
parent
commit
5b1e167d8d

+ 1 - 2
fs/nfsd/nfs4proc.c

@@ -864,12 +864,10 @@ static __be32
 nfsd4_secinfo(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 	      struct nfsd4_secinfo *secinfo)
 {
-	struct svc_fh resfh;
 	struct svc_export *exp;
 	struct dentry *dentry;
 	__be32 err;
 
-	fh_init(&resfh, NFS4_FHSIZE);
 	err = fh_verify(rqstp, &cstate->current_fh, S_IFDIR, NFSD_MAY_EXEC);
 	if (err)
 		return err;
@@ -878,6 +876,7 @@ nfsd4_secinfo(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 				    &exp, &dentry);
 	if (err)
 		return err;
+	fh_unlock(&cstate->current_fh);
 	if (d_really_is_negative(dentry)) {
 		exp_put(exp);
 		err = nfserr_noent;

+ 1 - 0
fs/nfsd/nfs4recover.c

@@ -1266,6 +1266,7 @@ nfsd4_umh_cltrack_init(struct net *net)
 	/* XXX: The usermode helper s not working in container yet. */
 	if (net != &init_net) {
 		pr_warn("NFSD: attempt to initialize umh client tracking in a container ignored.\n");
+		kfree(grace_start);
 		return -EINVAL;
 	}
 

+ 17 - 12
fs/nfsd/nfs4state.c

@@ -2408,7 +2408,8 @@ nfsd4_exchange_id(struct svc_rqst *rqstp,
 	default:				/* checked by xdr code */
 		WARN_ON_ONCE(1);
 	case SP4_SSV:
-		return nfserr_encr_alg_unsupp;
+		status = nfserr_encr_alg_unsupp;
+		goto out_nolock;
 	}
 
 	/* Cases below refer to rfc 5661 section 18.35.4: */
@@ -2586,21 +2587,26 @@ static __be32 check_forechannel_attrs(struct nfsd4_channel_attrs *ca, struct nfs
 	return nfs_ok;
 }
 
+/*
+ * Server's NFSv4.1 backchannel support is AUTH_SYS-only for now.
+ * These are based on similar macros in linux/sunrpc/msg_prot.h .
+ */
+#define RPC_MAX_HEADER_WITH_AUTH_SYS \
+	(RPC_CALLHDRSIZE + 2 * (2 + UNX_CALLSLACK))
+
+#define RPC_MAX_REPHEADER_WITH_AUTH_SYS \
+	(RPC_REPHDRSIZE + (2 + NUL_REPLYSLACK))
+
 #define NFSD_CB_MAX_REQ_SZ	((NFS4_enc_cb_recall_sz + \
-				 RPC_MAX_HEADER_WITH_AUTH) * sizeof(__be32))
+				 RPC_MAX_HEADER_WITH_AUTH_SYS) * sizeof(__be32))
 #define NFSD_CB_MAX_RESP_SZ	((NFS4_dec_cb_recall_sz + \
-				 RPC_MAX_REPHEADER_WITH_AUTH) * sizeof(__be32))
+				 RPC_MAX_REPHEADER_WITH_AUTH_SYS) * \
+				 sizeof(__be32))
 
 static __be32 check_backchannel_attrs(struct nfsd4_channel_attrs *ca)
 {
 	ca->headerpadsz = 0;
 
-	/*
-	 * These RPC_MAX_HEADER macros are overkill, especially since we
-	 * don't even do gss on the backchannel yet.  But this is still
-	 * less than 1k.  Tighten up this estimate in the unlikely event
-	 * it turns out to be a problem for some client:
-	 */
 	if (ca->maxreq_sz < NFSD_CB_MAX_REQ_SZ)
 		return nfserr_toosmall;
 	if (ca->maxresp_sz < NFSD_CB_MAX_RESP_SZ)
@@ -2710,10 +2716,9 @@ nfsd4_create_session(struct svc_rqst *rqstp,
 		goto out_free_conn;
 	}
 	status = nfs_ok;
-	/*
-	 * We do not support RDMA or persistent sessions
-	 */
+	/* Persistent sessions are not supported */
 	cr_ses->flags &= ~SESSION4_PERSIST;
+	/* Upshifting from TCP to RDMA is not supported */
 	cr_ses->flags &= ~SESSION4_RDMA;
 
 	init_session(rqstp, new, conf, cr_ses);

+ 9 - 6
fs/nfsd/nfs4xdr.c

@@ -1072,8 +1072,9 @@ nfsd4_decode_rename(struct nfsd4_compoundargs *argp, struct nfsd4_rename *rename
 
 	READ_BUF(4);
 	rename->rn_snamelen = be32_to_cpup(p++);
-	READ_BUF(rename->rn_snamelen + 4);
+	READ_BUF(rename->rn_snamelen);
 	SAVEMEM(rename->rn_sname, rename->rn_snamelen);
+	READ_BUF(4);
 	rename->rn_tnamelen = be32_to_cpup(p++);
 	READ_BUF(rename->rn_tnamelen);
 	SAVEMEM(rename->rn_tname, rename->rn_tnamelen);
@@ -1155,13 +1156,14 @@ nfsd4_decode_setclientid(struct nfsd4_compoundargs *argp, struct nfsd4_setclient
 	READ_BUF(8);
 	setclientid->se_callback_prog = be32_to_cpup(p++);
 	setclientid->se_callback_netid_len = be32_to_cpup(p++);
-
-	READ_BUF(setclientid->se_callback_netid_len + 4);
+	READ_BUF(setclientid->se_callback_netid_len);
 	SAVEMEM(setclientid->se_callback_netid_val, setclientid->se_callback_netid_len);
+	READ_BUF(4);
 	setclientid->se_callback_addr_len = be32_to_cpup(p++);
 
-	READ_BUF(setclientid->se_callback_addr_len + 4);
+	READ_BUF(setclientid->se_callback_addr_len);
 	SAVEMEM(setclientid->se_callback_addr_val, setclientid->se_callback_addr_len);
+	READ_BUF(4);
 	setclientid->se_callback_ident = be32_to_cpup(p++);
 
 	DECODE_TAIL;
@@ -1835,8 +1837,9 @@ nfsd4_decode_compound(struct nfsd4_compoundargs *argp)
 
 	READ_BUF(4);
 	argp->taglen = be32_to_cpup(p++);
-	READ_BUF(argp->taglen + 8);
+	READ_BUF(argp->taglen);
 	SAVEMEM(argp->tag, argp->taglen);
+	READ_BUF(8);
 	argp->minorversion = be32_to_cpup(p++);
 	argp->opcnt = be32_to_cpup(p++);
 	max_reply += 4 + (XDR_QUADLEN(argp->taglen) << 2);
@@ -3060,7 +3063,7 @@ static __be32 nfsd4_encode_bind_conn_to_session(struct nfsd4_compoundres *resp,
 		p = xdr_encode_opaque_fixed(p, bcts->sessionid.data,
 						NFS4_MAX_SESSIONID_LEN);
 		*p++ = cpu_to_be32(bcts->dir);
-		/* Sorry, we do not yet support RDMA over 4.1: */
+		/* Upshifting from TCP to RDMA is not supported */
 		*p++ = cpu_to_be32(0);
 	}
 	return nfserr;

+ 7 - 0
include/linux/sunrpc/auth.h

@@ -20,11 +20,18 @@
 #include <linux/uidgid.h>
 #include <linux/utsname.h>
 
+/*
+ * Maximum size of AUTH_NONE authentication information, in XDR words.
+ */
+#define NUL_CALLSLACK	(4)
+#define NUL_REPLYSLACK	(2)
+
 /*
  * Size of the nodename buffer. RFC1831 specifies a hard limit of 255 bytes,
  * but Linux hostnames are actually limited to __NEW_UTS_LEN bytes.
  */
 #define UNX_MAXNODENAME	__NEW_UTS_LEN
+#define UNX_CALLSLACK	(21 + XDR_QUADLEN(UNX_MAXNODENAME))
 
 struct rpcsec_gss_info;
 

+ 13 - 7
include/linux/sunrpc/svc_rdma.h

@@ -75,8 +75,10 @@ struct svc_rdma_op_ctxt {
 	struct svc_rdma_fastreg_mr *frmr;
 	int hdr_count;
 	struct xdr_buf arg;
+	struct ib_cqe cqe;
+	struct ib_cqe reg_cqe;
+	struct ib_cqe inv_cqe;
 	struct list_head dto_q;
-	enum ib_wr_opcode wr_op;
 	enum ib_wc_status wc_status;
 	u32 byte_len;
 	u32 position;
@@ -174,8 +176,6 @@ struct svcxprt_rdma {
 	struct work_struct   sc_work;
 };
 /* sc_flags */
-#define RDMAXPRT_RQ_PENDING	1
-#define RDMAXPRT_SQ_PENDING	2
 #define RDMAXPRT_CONN_PENDING	3
 
 #define RPCRDMA_LISTEN_BACKLOG  10
@@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 				    struct xdr_buf *rcvbuf);
 
 /* svc_rdma_marshal.c */
-extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg **, struct svc_rqst *);
+extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *);
 extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
 				     struct rpcrdma_msg *,
 				     enum rpcrdma_errcode, __be32 *);
@@ -224,16 +224,22 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
 
 /* svc_rdma_sendto.c */
 extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
-			    struct svc_rdma_req_map *);
+			    struct svc_rdma_req_map *, bool);
 extern int svc_rdma_sendto(struct svc_rqst *);
 extern struct rpcrdma_read_chunk *
 	svc_rdma_get_read_chunk(struct rpcrdma_msg *);
+extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+				int);
 
 /* svc_rdma_transport.c */
+extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
+extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
 extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
-				enum rpcrdma_errcode);
 extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
+extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
 extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
 extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);

+ 2 - 2
net/sunrpc/auth_null.c

@@ -113,8 +113,8 @@ const struct rpc_authops authnull_ops = {
 
 static
 struct rpc_auth null_auth = {
-	.au_cslack	= 4,
-	.au_rslack	= 2,
+	.au_cslack	= NUL_CALLSLACK,
+	.au_rslack	= NUL_REPLYSLACK,
 	.au_ops		= &authnull_ops,
 	.au_flavor	= RPC_AUTH_NULL,
 	.au_count	= ATOMIC_INIT(0),

+ 2 - 4
net/sunrpc/auth_unix.c

@@ -23,8 +23,6 @@ struct unx_cred {
 };
 #define uc_uid			uc_base.cr_uid
 
-#define UNX_WRITESLACK		(21 + XDR_QUADLEN(UNX_MAXNODENAME))
-
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 # define RPCDBG_FACILITY	RPCDBG_AUTH
 #endif
@@ -228,8 +226,8 @@ const struct rpc_authops authunix_ops = {
 
 static
 struct rpc_auth		unix_auth = {
-	.au_cslack	= UNX_WRITESLACK,
-	.au_rslack	= 2,			/* assume AUTH_NULL verf */
+	.au_cslack	= UNX_CALLSLACK,
+	.au_rslack	= NUL_REPLYSLACK,
 	.au_ops		= &authunix_ops,
 	.au_flavor	= RPC_AUTH_UNIX,
 	.au_count	= ATOMIC_INIT(0),

+ 3 - 3
net/sunrpc/cache.c

@@ -1182,14 +1182,14 @@ int sunrpc_cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
 	}
 
 	crq->q.reader = 0;
-	crq->item = cache_get(h);
 	crq->buf = buf;
 	crq->len = 0;
 	crq->readers = 0;
 	spin_lock(&queue_lock);
-	if (test_bit(CACHE_PENDING, &h->flags))
+	if (test_bit(CACHE_PENDING, &h->flags)) {
+		crq->item = cache_get(h);
 		list_add_tail(&crq->q.list, &detail->queue);
-	else
+	} else
 		/* Lost a race, no longer PENDING, so don't enqueue */
 		ret = -EAGAIN;
 	spin_unlock(&queue_lock);

+ 5 - 12
net/sunrpc/xprtrdma/svc_rdma_backchannel.c

@@ -107,26 +107,18 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 	int ret;
 
 	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, sndbuf, vec);
+	ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
 	if (ret)
 		goto out_err;
 
-	/* Post a recv buffer to handle the reply for this request. */
-	ret = svc_rdma_post_recv(rdma, GFP_NOIO);
-	if (ret) {
-		pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n",
-		       ret);
-		pr_err("svcrdma: closing transport %p.\n", rdma);
-		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-		ret = -ENOTCONN;
+	ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
+	if (ret)
 		goto out_err;
-	}
 
 	ctxt = svc_rdma_get_context(rdma);
 	ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
 	ctxt->count = 1;
 
-	ctxt->wr_op = IB_WR_SEND;
 	ctxt->direction = DMA_TO_DEVICE;
 	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
 	ctxt->sge[0].length = sndbuf->len;
@@ -140,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 	atomic_inc(&rdma->sc_dma_used);
 
 	memset(&send_wr, 0, sizeof(send_wr));
-	send_wr.wr_id = (unsigned long)ctxt;
+	ctxt->cqe.done = svc_rdma_wc_send;
+	send_wr.wr_cqe = &ctxt->cqe;
 	send_wr.sg_list = ctxt->sge;
 	send_wr.num_sge = 1;
 	send_wr.opcode = IB_WR_SEND;

+ 44 - 20
net/sunrpc/xprtrdma/svc_rdma_marshal.c

@@ -145,29 +145,44 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
 	return (__be32 *)&ary->wc_array[nchunks];
 }
 
-int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
-			    struct svc_rqst *rqstp)
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp)
 {
-	struct rpcrdma_msg *rmsgp = NULL;
 	__be32 *va, *vaend;
+	unsigned int len;
 	u32 hdr_len;
 
-	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
 	/* Verify that there's enough bytes for header + something */
-	if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+	if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) {
 		dprintk("svcrdma: header too short = %d\n",
 			rqstp->rq_arg.len);
 		return -EINVAL;
 	}
 
-	if (rmsgp->rm_vers != rpcrdma_version)
-		return -ENOSYS;
-
-	/* Pull in the extra for the padded case and bump our pointer */
-	if (rmsgp->rm_type == rdma_msgp) {
-		int hdrlen;
+	if (rmsgp->rm_vers != rpcrdma_version) {
+		dprintk("%s: bad version %u\n", __func__,
+			be32_to_cpu(rmsgp->rm_vers));
+		return -EPROTONOSUPPORT;
+	}
 
+	switch (be32_to_cpu(rmsgp->rm_type)) {
+	case RDMA_MSG:
+	case RDMA_NOMSG:
+		break;
+
+	case RDMA_DONE:
+		/* Just drop it */
+		dprintk("svcrdma: dropping RDMA_DONE message\n");
+		return 0;
+
+	case RDMA_ERROR:
+		/* Possible if this is a backchannel reply.
+		 * XXX: We should cancel this XID, though.
+		 */
+		dprintk("svcrdma: dropping RDMA_ERROR message\n");
+		return 0;
+
+	case RDMA_MSGP:
+		/* Pull in the extra for the padded case, bump our pointer */
 		rmsgp->rm_body.rm_padded.rm_align =
 			be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
 		rmsgp->rm_body.rm_padded.rm_thresh =
@@ -175,11 +190,15 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
 
 		va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
 		rqstp->rq_arg.head[0].iov_base = va;
-		hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
-		rqstp->rq_arg.head[0].iov_len -= hdrlen;
-		if (hdrlen > rqstp->rq_arg.len)
+		len = (u32)((unsigned long)va - (unsigned long)rmsgp);
+		rqstp->rq_arg.head[0].iov_len -= len;
+		if (len > rqstp->rq_arg.len)
 			return -EINVAL;
-		return hdrlen;
+		return len;
+	default:
+		dprintk("svcrdma: bad rdma procedure (%u)\n",
+			be32_to_cpu(rmsgp->rm_type));
+		return -EINVAL;
 	}
 
 	/* The chunk list may contain either a read chunk list or a write
@@ -188,20 +207,25 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
 	va = &rmsgp->rm_body.rm_chunks[0];
 	vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
 	va = decode_read_list(va, vaend);
-	if (!va)
+	if (!va) {
+		dprintk("svcrdma: failed to decode read list\n");
 		return -EINVAL;
+	}
 	va = decode_write_list(va, vaend);
-	if (!va)
+	if (!va) {
+		dprintk("svcrdma: failed to decode write list\n");
 		return -EINVAL;
+	}
 	va = decode_reply_array(va, vaend);
-	if (!va)
+	if (!va) {
+		dprintk("svcrdma: failed to decode reply chunk\n");
 		return -EINVAL;
+	}
 
 	rqstp->rq_arg.head[0].iov_base = va;
 	hdr_len = (unsigned long)va - (unsigned long)rmsgp;
 	rqstp->rq_arg.head[0].iov_len -= hdr_len;
 
-	*rdma_req = rmsgp;
 	return hdr_len;
 }
 

+ 24 - 36
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c

@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
 		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 
 	memset(&read_wr, 0, sizeof(read_wr));
-	read_wr.wr.wr_id = (unsigned long)ctxt;
+	ctxt->cqe.done = svc_rdma_wc_read;
+	read_wr.wr.wr_cqe = &ctxt->cqe;
 	read_wr.wr.opcode = IB_WR_RDMA_READ;
-	ctxt->wr_op = read_wr.wr.opcode;
 	read_wr.wr.send_flags = IB_SEND_SIGNALED;
 	read_wr.rkey = rs_handle;
 	read_wr.remote_addr = rs_offset;
@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
 	ctxt->read_hdr = head;
 
 	/* Prepare REG WR */
+	ctxt->reg_cqe.done = svc_rdma_wc_reg;
+	reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
 	reg_wr.wr.opcode = IB_WR_REG_MR;
-	reg_wr.wr.wr_id = 0;
 	reg_wr.wr.send_flags = IB_SEND_SIGNALED;
 	reg_wr.wr.num_sge = 0;
 	reg_wr.mr = frmr->mr;
@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
 
 	/* Prepare RDMA_READ */
 	memset(&read_wr, 0, sizeof(read_wr));
+	ctxt->cqe.done = svc_rdma_wc_read;
+	read_wr.wr.wr_cqe = &ctxt->cqe;
 	read_wr.wr.send_flags = IB_SEND_SIGNALED;
 	read_wr.rkey = rs_handle;
 	read_wr.remote_addr = rs_offset;
@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
 	read_wr.wr.num_sge = 1;
 	if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
 		read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
-		read_wr.wr.wr_id = (unsigned long)ctxt;
 		read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
 	} else {
 		read_wr.wr.opcode = IB_WR_RDMA_READ;
 		read_wr.wr.next = &inv_wr;
 		/* Prepare invalidate */
 		memset(&inv_wr, 0, sizeof(inv_wr));
-		inv_wr.wr_id = (unsigned long)ctxt;
+		ctxt->inv_cqe.done = svc_rdma_wc_inv;
+		inv_wr.wr_cqe = &ctxt->inv_cqe;
 		inv_wr.opcode = IB_WR_LOCAL_INV;
 		inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
 		inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
 	}
-	ctxt->wr_op = read_wr.wr.opcode;
 
 	/* Post the chain */
 	ret = svc_rdma_send(xprt, &reg_wr.wr);
@@ -612,7 +614,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	struct svc_rdma_op_ctxt *ctxt = NULL;
 	struct rpcrdma_msg *rmsgp;
 	int ret = 0;
-	int len;
 
 	dprintk("svcrdma: rqstp=%p\n", rqstp);
 
@@ -642,8 +643,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 		 * transport list
 		 */
 		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
-			goto close_out;
-
+			goto defer;
 		goto out;
 	}
 	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -654,15 +654,13 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
 
 	/* Decode the RDMA header. */
-	len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
-	rqstp->rq_xprt_hlen = len;
-
-	/* If the request is invalid, reply with an error */
-	if (len < 0) {
-		if (len == -ENOSYS)
-			svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
-		goto close_out;
-	}
+	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+	ret = svc_rdma_xdr_decode_req(rmsgp, rqstp);
+	if (ret < 0)
+		goto out_err;
+	if (ret == 0)
+		goto out_drop;
+	rqstp->rq_xprt_hlen = ret;
 
 	if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
 		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
@@ -698,26 +696,16 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 	svc_xprt_copy_addrs(rqstp, xprt);
 	return ret;
 
- close_out:
-	if (ctxt)
-		svc_rdma_put_context(ctxt, 1);
-	dprintk("svcrdma: transport %p is closing\n", xprt);
-	/*
-	 * Set the close bit and enqueue it. svc_recv will see the
-	 * close bit and call svc_xprt_delete
-	 */
-	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+out_err:
+	svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+	svc_rdma_put_context(ctxt, 0);
+	return 0;
+
 defer:
 	return 0;
 
+out_drop:
+	svc_rdma_put_context(ctxt, 1);
 repost:
-	ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL);
-	if (ret) {
-		pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
-		       ret);
-		pr_err("svcrdma: closing transport %p.\n", rdma_xprt);
-		set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags);
-		ret = -ENOTCONN;
-	}
-	return ret;
+	return svc_rdma_repost_recv(rdma_xprt, GFP_KERNEL);
 }

+ 133 - 63
net/sunrpc/xprtrdma/svc_rdma_sendto.c

@@ -50,9 +50,15 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static u32 xdr_padsize(u32 len)
+{
+	return (len & 3) ? (4 - (len & 3)) : 0;
+}
+
 int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
 		     struct xdr_buf *xdr,
-		     struct svc_rdma_req_map *vec)
+		     struct svc_rdma_req_map *vec,
+		     bool write_chunk_present)
 {
 	int sge_no;
 	u32 sge_bytes;
@@ -92,9 +98,20 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
 
 	/* Tail SGE */
 	if (xdr->tail[0].iov_len) {
-		vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
-		vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
-		sge_no++;
+		unsigned char *base = xdr->tail[0].iov_base;
+		size_t len = xdr->tail[0].iov_len;
+		u32 xdr_pad = xdr_padsize(xdr->page_len);
+
+		if (write_chunk_present && xdr_pad) {
+			base += xdr_pad;
+			len -= xdr_pad;
+		}
+
+		if (len) {
+			vec->sge[sge_no].iov_base = base;
+			vec->sge[sge_no].iov_len = len;
+			sge_no++;
+		}
 	}
 
 	dprintk("svcrdma: %s: sge_no %d page_no %d "
@@ -166,10 +183,10 @@ svc_rdma_get_write_array(struct rpcrdma_msg *rmsgp)
  * reply array is present
  */
 static struct rpcrdma_write_array *
-svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
+svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp,
+			 struct rpcrdma_write_array *wr_ary)
 {
 	struct rpcrdma_read_chunk *rch;
-	struct rpcrdma_write_array *wr_ary;
 	struct rpcrdma_write_array *rp_ary;
 
 	/* XXX: Need to fix when reply chunk may occur with read list
@@ -191,7 +208,6 @@ svc_rdma_get_reply_array(struct rpcrdma_msg *rmsgp)
 		goto found_it;
 	}
 
-	wr_ary = svc_rdma_get_write_array(rmsgp);
 	if (wr_ary) {
 		int chunk = be32_to_cpu(wr_ary->wc_nchunks);
 
@@ -281,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
 
 	/* Prepare WRITE WR */
 	memset(&write_wr, 0, sizeof write_wr);
-	ctxt->wr_op = IB_WR_RDMA_WRITE;
-	write_wr.wr.wr_id = (unsigned long)ctxt;
+	ctxt->cqe.done = svc_rdma_wc_write;
+	write_wr.wr.wr_cqe = &ctxt->cqe;
 	write_wr.wr.sg_list = &sge[0];
 	write_wr.wr.num_sge = sge_no;
 	write_wr.wr.opcode = IB_WR_RDMA_WRITE;
@@ -298,41 +314,37 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
  err:
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 0);
-	/* Fatal error, close transport */
 	return -EIO;
 }
 
+noinline
 static int send_write_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_write_array *wr_ary,
 			     struct rpcrdma_msg *rdma_resp,
 			     struct svc_rqst *rqstp,
 			     struct svc_rdma_req_map *vec)
 {
-	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+	u32 xfer_len = rqstp->rq_res.page_len;
 	int write_len;
 	u32 xdr_off;
 	int chunk_off;
 	int chunk_no;
 	int nchunks;
-	struct rpcrdma_write_array *arg_ary;
 	struct rpcrdma_write_array *res_ary;
 	int ret;
 
-	arg_ary = svc_rdma_get_write_array(rdma_argp);
-	if (!arg_ary)
-		return 0;
 	res_ary = (struct rpcrdma_write_array *)
 		&rdma_resp->rm_body.rm_chunks[1];
 
 	/* Write chunks start at the pagelist */
-	nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+	nchunks = be32_to_cpu(wr_ary->wc_nchunks);
 	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
 	     xfer_len && chunk_no < nchunks;
 	     chunk_no++) {
 		struct rpcrdma_segment *arg_ch;
 		u64 rs_offset;
 
-		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+		arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
 		write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
 
 		/* Prepare the response chunk given the length actually
@@ -350,11 +362,8 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
 					 xdr_off,
 					 write_len,
 					 vec);
-			if (ret <= 0) {
-				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
-					ret);
-				return -EIO;
-			}
+			if (ret <= 0)
+				goto out_err;
 			chunk_off += ret;
 			xdr_off += ret;
 			xfer_len -= ret;
@@ -364,11 +373,16 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
 	/* Update the req with the number of chunks actually used */
 	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 
-	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+	return rqstp->rq_res.page_len;
+
+out_err:
+	pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
+	return -EIO;
 }
 
+noinline
 static int send_reply_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_write_array *rp_ary,
 			     struct rpcrdma_msg *rdma_resp,
 			     struct svc_rqst *rqstp,
 			     struct svc_rdma_req_map *vec)
@@ -380,25 +394,21 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
 	int chunk_off;
 	int nchunks;
 	struct rpcrdma_segment *ch;
-	struct rpcrdma_write_array *arg_ary;
 	struct rpcrdma_write_array *res_ary;
 	int ret;
 
-	arg_ary = svc_rdma_get_reply_array(rdma_argp);
-	if (!arg_ary)
-		return 0;
 	/* XXX: need to fix when reply lists occur with read-list and or
 	 * write-list */
 	res_ary = (struct rpcrdma_write_array *)
 		&rdma_resp->rm_body.rm_chunks[2];
 
 	/* xdr offset starts at RPC message */
-	nchunks = be32_to_cpu(arg_ary->wc_nchunks);
+	nchunks = be32_to_cpu(rp_ary->wc_nchunks);
 	for (xdr_off = 0, chunk_no = 0;
 	     xfer_len && chunk_no < nchunks;
 	     chunk_no++) {
 		u64 rs_offset;
-		ch = &arg_ary->wc_array[chunk_no].wc_target;
+		ch = &rp_ary->wc_array[chunk_no].wc_target;
 		write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
 
 		/* Prepare the reply chunk given the length actually
@@ -415,11 +425,8 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
 					 xdr_off,
 					 write_len,
 					 vec);
-			if (ret <= 0) {
-				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
-					ret);
-				return -EIO;
-			}
+			if (ret <= 0)
+				goto out_err;
 			chunk_off += ret;
 			xdr_off += ret;
 			xfer_len -= ret;
@@ -430,6 +437,10 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
 	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
 
 	return rqstp->rq_res.len;
+
+out_err:
+	pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
+	return -EIO;
 }
 
 /* This function prepares the portion of the RPCRDMA message to be
@@ -464,13 +475,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
 	int pages;
 	int ret;
 
-	/* Post a recv buffer to handle another request. */
-	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+	ret = svc_rdma_repost_recv(rdma, GFP_KERNEL);
 	if (ret) {
-		printk(KERN_INFO
-		       "svcrdma: could not post a receive buffer, err=%d."
-		       "Closing transport %p.\n", ret, rdma);
-		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
 		svc_rdma_put_context(ctxt, 0);
 		return -ENOTCONN;
 	}
@@ -543,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
 		goto err;
 	}
 	memset(&send_wr, 0, sizeof send_wr);
-	ctxt->wr_op = IB_WR_SEND;
-	send_wr.wr_id = (unsigned long)ctxt;
+	ctxt->cqe.done = svc_rdma_wc_send;
+	send_wr.wr_cqe = &ctxt->cqe;
 	send_wr.sg_list = ctxt->sge;
 	send_wr.num_sge = sge_no;
 	send_wr.opcode = IB_WR_SEND;
@@ -559,6 +565,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
  err:
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 1);
+	pr_err("svcrdma: failed to send reply, rc=%d\n", ret);
 	return -EIO;
 }
 
@@ -573,7 +580,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 	struct rpcrdma_msg *rdma_argp;
 	struct rpcrdma_msg *rdma_resp;
-	struct rpcrdma_write_array *reply_ary;
+	struct rpcrdma_write_array *wr_ary, *rp_ary;
 	enum rpcrdma_proc reply_type;
 	int ret;
 	int inline_bytes;
@@ -587,12 +594,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	 * places this at the start of page 0.
 	 */
 	rdma_argp = page_address(rqstp->rq_pages[0]);
+	wr_ary = svc_rdma_get_write_array(rdma_argp);
+	rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary);
 
 	/* Build an req vec for the XDR */
 	ctxt = svc_rdma_get_context(rdma);
 	ctxt->direction = DMA_TO_DEVICE;
 	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec);
+	ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
 	if (ret)
 		goto err0;
 	inline_bytes = rqstp->rq_res.len;
@@ -603,8 +612,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 	if (!res_page)
 		goto err0;
 	rdma_resp = page_address(res_page);
-	reply_ary = svc_rdma_get_reply_array(rdma_argp);
-	if (reply_ary)
+	if (rp_ary)
 		reply_type = RDMA_NOMSG;
 	else
 		reply_type = RDMA_MSG;
@@ -612,27 +620,26 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 					 rdma_resp, reply_type);
 
 	/* Send any write-chunk data and build resp write-list */
-	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
-				rqstp, vec);
-	if (ret < 0) {
-		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
-		       ret);
-		goto err1;
+	if (wr_ary) {
+		ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+		if (ret < 0)
+			goto err1;
+		inline_bytes -= ret + xdr_padsize(ret);
 	}
-	inline_bytes -= ret;
 
 	/* Send any reply-list data and update resp reply-list */
-	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
-				rqstp, vec);
-	if (ret < 0) {
-		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
-		       ret);
-		goto err1;
+	if (rp_ary) {
+		ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+		if (ret < 0)
+			goto err1;
+		inline_bytes -= ret;
 	}
-	inline_bytes -= ret;
 
 	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
 			 inline_bytes);
+	if (ret < 0)
+		goto err1;
+
 	svc_rdma_put_req_map(rdma, vec);
 	dprintk("svcrdma: send_reply returns %d\n", ret);
 	return ret;
@@ -642,5 +649,68 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
  err0:
 	svc_rdma_put_req_map(rdma, vec);
 	svc_rdma_put_context(ctxt, 0);
-	return ret;
+	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	return -ENOTCONN;
+}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+			 int status)
+{
+	struct ib_send_wr err_wr;
+	struct page *p;
+	struct svc_rdma_op_ctxt *ctxt;
+	enum rpcrdma_errcode err;
+	__be32 *va;
+	int length;
+	int ret;
+
+	ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+	if (ret)
+		return;
+
+	p = alloc_page(GFP_KERNEL);
+	if (!p)
+		return;
+	va = page_address(p);
+
+	/* XDR encode an error reply */
+	err = ERR_CHUNK;
+	if (status == -EPROTONOSUPPORT)
+		err = ERR_VERS;
+	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+	ctxt = svc_rdma_get_context(xprt);
+	ctxt->direction = DMA_TO_DEVICE;
+	ctxt->count = 1;
+	ctxt->pages[0] = p;
+
+	/* Prepare SGE for local address */
+	ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
+	ctxt->sge[0].length = length;
+	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+					    p, 0, length, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+		dprintk("svcrdma: Error mapping buffer for protocol error\n");
+		svc_rdma_put_context(ctxt, 1);
+		return;
+	}
+	atomic_inc(&xprt->sc_dma_used);
+
+	/* Prepare SEND WR */
+	memset(&err_wr, 0, sizeof(err_wr));
+	ctxt->cqe.done = svc_rdma_wc_send;
+	err_wr.wr_cqe = &ctxt->cqe;
+	err_wr.sg_list = ctxt->sge;
+	err_wr.num_sge = 1;
+	err_wr.opcode = IB_WR_SEND;
+	err_wr.send_flags = IB_SEND_SIGNALED;
+
+	/* Post It */
+	ret = svc_rdma_send(xprt, &err_wr);
+	if (ret) {
+		dprintk("svcrdma: Error %d posting send for protocol error\n",
+			ret);
+		svc_rdma_unmap_dma(ctxt);
+		svc_rdma_put_context(ctxt, 1);
+	}
 }

+ 146 - 299
net/sunrpc/xprtrdma/svc_rdma_transport.c

@@ -63,17 +63,10 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 					int flags);
 static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
 static void svc_rdma_release_rqst(struct svc_rqst *);
-static void dto_tasklet_func(unsigned long data);
 static void svc_rdma_detach(struct svc_xprt *xprt);
 static void svc_rdma_free(struct svc_xprt *xprt);
 static int svc_rdma_has_wspace(struct svc_xprt *xprt);
 static int svc_rdma_secure_port(struct svc_rqst *);
-static void rq_cq_reap(struct svcxprt_rdma *xprt);
-static void sq_cq_reap(struct svcxprt_rdma *xprt);
-
-static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
-static DEFINE_SPINLOCK(dto_lock);
-static LIST_HEAD(dto_xprt_q);
 
 static struct svc_xprt_ops svc_rdma_ops = {
 	.xpo_create = svc_rdma_create,
@@ -352,15 +345,6 @@ static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
 	}
 }
 
-/* ib_cq event handler */
-static void cq_event_handler(struct ib_event *event, void *context)
-{
-	struct svc_xprt *xprt = context;
-	dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
-		ib_event_msg(event->event), event->event, context);
-	set_bit(XPT_CLOSE, &xprt->xpt_flags);
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -392,251 +376,171 @@ static void qp_event_handler(struct ib_event *event, void *context)
 	}
 }
 
-/*
- * Data Transfer Operation Tasklet
+/**
+ * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Walks a list of transports with I/O pending, removing entries as
- * they are added to the server's I/O pending list. Two bits indicate
- * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
- * spinlock that serializes access to the transport list with the RQ
- * and SQ interrupt handlers.
  */
-static void dto_tasklet_func(unsigned long data)
+static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct svcxprt_rdma *xprt;
-	unsigned long flags;
+	struct svcxprt_rdma *xprt = cq->cq_context;
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_op_ctxt *ctxt;
 
-	spin_lock_irqsave(&dto_lock, flags);
-	while (!list_empty(&dto_xprt_q)) {
-		xprt = list_entry(dto_xprt_q.next,
-				  struct svcxprt_rdma, sc_dto_q);
-		list_del_init(&xprt->sc_dto_q);
-		spin_unlock_irqrestore(&dto_lock, flags);
+	/* WARNING: Only wc->wr_cqe and wc->status are reliable */
+	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+	ctxt->wc_status = wc->status;
+	svc_rdma_unmap_dma(ctxt);
 
-		rq_cq_reap(xprt);
-		sq_cq_reap(xprt);
+	if (wc->status != IB_WC_SUCCESS)
+		goto flushed;
 
-		svc_xprt_put(&xprt->sc_xprt);
-		spin_lock_irqsave(&dto_lock, flags);
-	}
-	spin_unlock_irqrestore(&dto_lock, flags);
+	/* All wc fields are now known to be valid */
+	ctxt->byte_len = wc->byte_len;
+	spin_lock(&xprt->sc_rq_dto_lock);
+	list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+	spin_unlock(&xprt->sc_rq_dto_lock);
+
+	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+	if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+		goto out;
+	svc_xprt_enqueue(&xprt->sc_xprt);
+	goto out;
+
+flushed:
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_warn("svcrdma: receive: %s (%u/0x%x)\n",
+			ib_wc_status_msg(wc->status),
+			wc->status, wc->vendor_err);
+	set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+	svc_rdma_put_context(ctxt, 1);
+
+out:
+	svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * Receive Queue Completion Handler
- *
- * Since an RQ completion handler is called on interrupt context, we
- * need to defer the handling of the I/O to a tasklet
- */
-static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
+				    struct ib_wc *wc,
+				    const char *opname)
 {
-	struct svcxprt_rdma *xprt = cq_context;
-	unsigned long flags;
-
-	/* Guard against unconditional flush call for destroyed QP */
-	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
-		return;
+	if (wc->status != IB_WC_SUCCESS)
+		goto err;
 
-	/*
-	 * Set the bit regardless of whether or not it's on the list
-	 * because it may be on the list already due to an SQ
-	 * completion.
-	 */
-	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+out:
+	atomic_dec(&xprt->sc_sq_count);
+	wake_up(&xprt->sc_send_wait);
+	return;
+
+err:
+	set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("svcrdma: %s: %s (%u/0x%x)\n",
+		       opname, ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
+	goto out;
+}
 
-	/*
-	 * If this transport is not already on the DTO transport queue,
-	 * add it
-	 */
-	spin_lock_irqsave(&dto_lock, flags);
-	if (list_empty(&xprt->sc_dto_q)) {
-		svc_xprt_get(&xprt->sc_xprt);
-		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
-	}
-	spin_unlock_irqrestore(&dto_lock, flags);
+static void svc_rdma_send_wc_common_put(struct ib_cq *cq, struct ib_wc *wc,
+					const char *opname)
+{
+	struct svcxprt_rdma *xprt = cq->cq_context;
 
-	/* Tasklet does all the work to avoid irqsave locks. */
-	tasklet_schedule(&dto_tasklet);
+	svc_rdma_send_wc_common(xprt, wc, opname);
+	svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * rq_cq_reap - Process the RQ CQ.
- *
- * Take all completing WC off the CQE and enqueue the associated DTO
- * context on the dto_q for the transport.
+/**
+ * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Note that caller must hold a transport reference.
  */
-static void rq_cq_reap(struct svcxprt_rdma *xprt)
+void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
-	int ret;
-	struct ib_wc wc;
-	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_op_ctxt *ctxt;
 
-	if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
-		return;
+	svc_rdma_send_wc_common_put(cq, wc, "send");
 
-	ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-	atomic_inc(&rdma_stat_rq_poll);
+	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_put_context(ctxt, 1);
+}
 
-	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
-		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
-		ctxt->wc_status = wc.status;
-		ctxt->byte_len = wc.byte_len;
-		svc_rdma_unmap_dma(ctxt);
-		if (wc.status != IB_WC_SUCCESS) {
-			/* Close the transport */
-			dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
-			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-			svc_rdma_put_context(ctxt, 1);
-			svc_xprt_put(&xprt->sc_xprt);
-			continue;
-		}
-		spin_lock_bh(&xprt->sc_rq_dto_lock);
-		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
-		spin_unlock_bh(&xprt->sc_rq_dto_lock);
-		svc_xprt_put(&xprt->sc_xprt);
-	}
+/**
+ * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
+ */
+void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_op_ctxt *ctxt;
 
-	if (ctxt)
-		atomic_inc(&rdma_stat_rq_prod);
+	svc_rdma_send_wc_common_put(cq, wc, "write");
 
-	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-	/*
-	 * If data arrived before established event,
-	 * don't enqueue. This defers RPC I/O until the
-	 * RDMA connection is complete.
-	 */
-	if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-		svc_xprt_enqueue(&xprt->sc_xprt);
+	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_put_context(ctxt, 0);
 }
 
-/*
- * Process a completion context
+/**
+ * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
  */
-static void process_context(struct svcxprt_rdma *xprt,
-			    struct svc_rdma_op_ctxt *ctxt)
+void svc_rdma_wc_reg(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct svc_rdma_op_ctxt *read_hdr;
-	int free_pages = 0;
-
-	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_send_wc_common_put(cq, wc, "fastreg");
+}
 
-	switch (ctxt->wr_op) {
-	case IB_WR_SEND:
-		free_pages = 1;
-		break;
+/**
+ * svc_rdma_wc_read - Invoked by RDMA provider for each polled Read WC
+ * @cq:        completion queue
+ * @wc:        completed WR
+ *
+ */
+void svc_rdma_wc_read(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct svcxprt_rdma *xprt = cq->cq_context;
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_op_ctxt *ctxt;
 
-	case IB_WR_RDMA_WRITE:
-		break;
+	svc_rdma_send_wc_common(xprt, wc, "read");
 
-	case IB_WR_RDMA_READ:
-	case IB_WR_RDMA_READ_WITH_INV:
-		svc_rdma_put_frmr(xprt, ctxt->frmr);
+	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
+	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_put_frmr(xprt, ctxt->frmr);
 
-		if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags))
-			break;
+	if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+		struct svc_rdma_op_ctxt *read_hdr;
 
 		read_hdr = ctxt->read_hdr;
-		svc_rdma_put_context(ctxt, 0);
-
-		spin_lock_bh(&xprt->sc_rq_dto_lock);
-		set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+		spin_lock(&xprt->sc_rq_dto_lock);
 		list_add_tail(&read_hdr->dto_q,
 			      &xprt->sc_read_complete_q);
-		spin_unlock_bh(&xprt->sc_rq_dto_lock);
-		svc_xprt_enqueue(&xprt->sc_xprt);
-		return;
+		spin_unlock(&xprt->sc_rq_dto_lock);
 
-	default:
-		dprintk("svcrdma: unexpected completion opcode=%d\n",
-			ctxt->wr_op);
-		break;
+		set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+		svc_xprt_enqueue(&xprt->sc_xprt);
 	}
 
-	svc_rdma_put_context(ctxt, free_pages);
+	svc_rdma_put_context(ctxt, 0);
+	svc_xprt_put(&xprt->sc_xprt);
 }
 
-/*
- * Send Queue Completion Handler - potentially called on interrupt context.
+/**
+ * svc_rdma_wc_inv - Invoked by RDMA provider for each polled LOCAL_INV WC
+ * @cq:        completion queue
+ * @wc:        completed WR
  *
- * Note that caller must hold a transport reference.
  */
-static void sq_cq_reap(struct svcxprt_rdma *xprt)
-{
-	struct svc_rdma_op_ctxt *ctxt = NULL;
-	struct ib_wc wc_a[6];
-	struct ib_wc *wc;
-	struct ib_cq *cq = xprt->sc_sq_cq;
-	int ret;
-
-	memset(wc_a, 0, sizeof(wc_a));
-
-	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
-		return;
-
-	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-	atomic_inc(&rdma_stat_sq_poll);
-	while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
-		int i;
-
-		for (i = 0; i < ret; i++) {
-			wc = &wc_a[i];
-			if (wc->status != IB_WC_SUCCESS) {
-				dprintk("svcrdma: sq wc err status %s (%d)\n",
-					ib_wc_status_msg(wc->status),
-					wc->status);
-
-				/* Close the transport */
-				set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-			}
-
-			/* Decrement used SQ WR count */
-			atomic_dec(&xprt->sc_sq_count);
-			wake_up(&xprt->sc_send_wait);
-
-			ctxt = (struct svc_rdma_op_ctxt *)
-				(unsigned long)wc->wr_id;
-			if (ctxt)
-				process_context(xprt, ctxt);
-
-			svc_xprt_put(&xprt->sc_xprt);
-		}
-	}
-
-	if (ctxt)
-		atomic_inc(&rdma_stat_sq_prod);
-}
-
-static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+void svc_rdma_wc_inv(struct ib_cq *cq, struct ib_wc *wc)
 {
-	struct svcxprt_rdma *xprt = cq_context;
-	unsigned long flags;
-
-	/* Guard against unconditional flush call for destroyed QP */
-	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
-		return;
-
-	/*
-	 * Set the bit regardless of whether or not it's on the list
-	 * because it may be on the list already due to an RQ
-	 * completion.
-	 */
-	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
-
-	/*
-	 * If this transport is not already on the DTO transport queue,
-	 * add it
-	 */
-	spin_lock_irqsave(&dto_lock, flags);
-	if (list_empty(&xprt->sc_dto_q)) {
-		svc_xprt_get(&xprt->sc_xprt);
-		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
-	}
-	spin_unlock_irqrestore(&dto_lock, flags);
-
-	/* Tasklet does all the work to avoid irqsave locks. */
-	tasklet_schedule(&dto_tasklet);
+	svc_rdma_send_wc_common_put(cq, wc, "localInv");
 }
 
 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -681,6 +585,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 	ctxt = svc_rdma_get_context(xprt);
 	buflen = 0;
 	ctxt->direction = DMA_FROM_DEVICE;
+	ctxt->cqe.done = svc_rdma_wc_receive;
 	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
 		if (sge_no >= xprt->sc_max_sge) {
 			pr_err("svcrdma: Too many sges (%d)\n", sge_no);
@@ -705,7 +610,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 	recv_wr.next = NULL;
 	recv_wr.sg_list = &ctxt->sge[0];
 	recv_wr.num_sge = ctxt->count;
-	recv_wr.wr_id = (u64)(unsigned long)ctxt;
+	recv_wr.wr_cqe = &ctxt->cqe;
 
 	svc_xprt_get(&xprt->sc_xprt);
 	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
@@ -722,6 +627,21 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags)
 	return -ENOMEM;
 }
 
+int svc_rdma_repost_recv(struct svcxprt_rdma *xprt, gfp_t flags)
+{
+	int ret = 0;
+
+	ret = svc_rdma_post_recv(xprt, flags);
+	if (ret) {
+		pr_err("svcrdma: could not post a receive buffer, err=%d.\n",
+		       ret);
+		pr_err("svcrdma: closing transport %p.\n", xprt);
+		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+		ret = -ENOTCONN;
+	}
+	return ret;
+}
+
 /*
  * This function handles the CONNECT_REQUEST event on a listening
  * endpoint. It is passed the cma_id for the _new_ connection. The context in
@@ -1011,7 +931,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	struct svcxprt_rdma *listen_rdma;
 	struct svcxprt_rdma *newxprt = NULL;
 	struct rdma_conn_param conn_param;
-	struct ib_cq_init_attr cq_attr = {};
 	struct ib_qp_init_attr qp_attr;
 	struct ib_device *dev;
 	unsigned int i;
@@ -1069,22 +988,14 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 		dprintk("svcrdma: error creating PD for connect request\n");
 		goto errout;
 	}
-	cq_attr.cqe = newxprt->sc_sq_depth;
-	newxprt->sc_sq_cq = ib_create_cq(dev,
-					 sq_comp_handler,
-					 cq_event_handler,
-					 newxprt,
-					 &cq_attr);
+	newxprt->sc_sq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_sq_depth,
+					0, IB_POLL_SOFTIRQ);
 	if (IS_ERR(newxprt->sc_sq_cq)) {
 		dprintk("svcrdma: error creating SQ CQ for connect request\n");
 		goto errout;
 	}
-	cq_attr.cqe = newxprt->sc_rq_depth;
-	newxprt->sc_rq_cq = ib_create_cq(dev,
-					 rq_comp_handler,
-					 cq_event_handler,
-					 newxprt,
-					 &cq_attr);
+	newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth,
+					0, IB_POLL_SOFTIRQ);
 	if (IS_ERR(newxprt->sc_rq_cq)) {
 		dprintk("svcrdma: error creating RQ CQ for connect request\n");
 		goto errout;
@@ -1173,13 +1084,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	/* Swap out the handler */
 	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
 
-	/*
-	 * Arm the CQs for the SQ and RQ before accepting so we can't
-	 * miss the first message
-	 */
-	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-
 	/* Accept Connection */
 	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
 	memset(&conn_param, 0, sizeof conn_param);
@@ -1319,10 +1223,10 @@ static void __svc_rdma_free(struct work_struct *work)
 		ib_destroy_qp(rdma->sc_qp);
 
 	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
-		ib_destroy_cq(rdma->sc_sq_cq);
+		ib_free_cq(rdma->sc_sq_cq);
 
 	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
-		ib_destroy_cq(rdma->sc_rq_cq);
+		ib_free_cq(rdma->sc_rq_cq);
 
 	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
 		ib_dealloc_pd(rdma->sc_pd);
@@ -1383,9 +1287,6 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
 			spin_unlock_bh(&xprt->sc_lock);
 			atomic_inc(&rdma_stat_sq_starve);
 
-			/* See if we can opportunistically reap SQ WR to make room */
-			sq_cq_reap(xprt);
-
 			/* Wait until SQ WR available if SQ still full */
 			wait_event(xprt->sc_send_wait,
 				   atomic_read(&xprt->sc_sq_count) <
@@ -1418,57 +1319,3 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
 	}
 	return ret;
 }
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-			 enum rpcrdma_errcode err)
-{
-	struct ib_send_wr err_wr;
-	struct page *p;
-	struct svc_rdma_op_ctxt *ctxt;
-	__be32 *va;
-	int length;
-	int ret;
-
-	p = alloc_page(GFP_KERNEL);
-	if (!p)
-		return;
-	va = page_address(p);
-
-	/* XDR encode error */
-	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
-	ctxt = svc_rdma_get_context(xprt);
-	ctxt->direction = DMA_FROM_DEVICE;
-	ctxt->count = 1;
-	ctxt->pages[0] = p;
-
-	/* Prepare SGE for local address */
-	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
-					    p, 0, length, DMA_FROM_DEVICE);
-	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
-		put_page(p);
-		svc_rdma_put_context(ctxt, 1);
-		return;
-	}
-	atomic_inc(&xprt->sc_dma_used);
-	ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length = length;
-
-	/* Prepare SEND WR */
-	memset(&err_wr, 0, sizeof err_wr);
-	ctxt->wr_op = IB_WR_SEND;
-	err_wr.wr_id = (unsigned long)ctxt;
-	err_wr.sg_list = ctxt->sge;
-	err_wr.num_sge = 1;
-	err_wr.opcode = IB_WR_SEND;
-	err_wr.send_flags = IB_SEND_SIGNALED;
-
-	/* Post It */
-	ret = svc_rdma_send(xprt, &err_wr);
-	if (ret) {
-		dprintk("svcrdma: Error %d posting send for protocol error\n",
-			ret);
-		svc_rdma_unmap_dma(ctxt);
-		svc_rdma_put_context(ctxt, 1);
-	}
-}