浏览代码

Merge tag 'nfs-rdma-for-4.14-1' of git://git.linux-nfs.org/projects/anna/linux-nfs into linux-next

NFS-over-RDMA client updates for Linux 4.14

Bugfixes and cleanups:
- Constify rpc_xprt_ops
- Harden RPC call encoding and decoding
- Clean up rpc call decoding to use xdr_streams
- Remove unused variables from various structures
- Refactor code to remove imul instructions
- Rearrange rx_stats structure for better cacheline sharing
Trond Myklebust 8 年之前
父节点
当前提交
f9773b22a2

+ 13 - 0
include/linux/sunrpc/xdr.h

@@ -239,6 +239,19 @@ extern unsigned int xdr_read_pages(struct xdr_stream *xdr, unsigned int len);
 extern void xdr_enter_page(struct xdr_stream *xdr, unsigned int len);
 extern int xdr_process_buf(struct xdr_buf *buf, unsigned int offset, unsigned int len, int (*actor)(struct scatterlist *, void *), void *data);
 
+/**
+ * xdr_stream_remaining - Return the number of bytes remaining in the stream
+ * @xdr: pointer to struct xdr_stream
+ *
+ * Return value:
+ *   Number of bytes remaining in @xdr before xdr->end
+ */
+static inline size_t
+xdr_stream_remaining(const struct xdr_stream *xdr)
+{
+	return xdr->nwords << 2;
+}
+
 ssize_t xdr_stream_decode_string_dup(struct xdr_stream *xdr, char **str,
 		size_t maxlen, gfp_t gfp_flags);
 /**

+ 1 - 1
include/linux/sunrpc/xprt.h

@@ -174,7 +174,7 @@ enum xprt_transports {
 
 struct rpc_xprt {
 	struct kref		kref;		/* Reference count */
-	struct rpc_xprt_ops *	ops;		/* transport methods */
+	const struct rpc_xprt_ops *ops;		/* transport methods */
 
 	const struct rpc_timeout *timeout;	/* timeout parms */
 	struct sockaddr_storage	addr;		/* server address */

+ 27 - 44
net/sunrpc/xprtrdma/backchannel.c

@@ -49,6 +49,7 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
 	if (IS_ERR(rb))
 		goto out_fail;
 	req->rl_rdmabuf = rb;
+	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
 
 	size = r_xprt->rx_data.inline_rsize;
 	rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
@@ -202,20 +203,24 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
  */
 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 {
-	struct rpc_xprt *xprt = rqst->rq_xprt;
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	struct rpcrdma_msg *headerp;
-
-	headerp = rdmab_to_msg(req->rl_rdmabuf);
-	headerp->rm_xid = rqst->rq_xid;
-	headerp->rm_vers = rpcrdma_version;
-	headerp->rm_credit =
-			cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
-	headerp->rm_type = rdma_msg;
-	headerp->rm_body.rm_chunks[0] = xdr_zero;
-	headerp->rm_body.rm_chunks[1] = xdr_zero;
-	headerp->rm_body.rm_chunks[2] = xdr_zero;
+	__be32 *p;
+
+	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
+	xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
+			req->rl_rdmabuf->rg_base);
+
+	p = xdr_reserve_space(&req->rl_stream, 28);
+	if (unlikely(!p))
+		return -EIO;
+	*p++ = rqst->rq_xid;
+	*p++ = rpcrdma_version;
+	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+	*p++ = rdma_msg;
+	*p++ = xdr_zero;
+	*p++ = xdr_zero;
+	*p = xdr_zero;
 
 	if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
 				       &rqst->rq_snd_buf, rpcrdma_noch))
@@ -271,9 +276,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
  * @xprt: transport receiving the call
  * @rep: receive buffer containing the call
  *
- * Called in the RPC reply handler, which runs in a tasklet.
- * Be quick about it.
- *
  * Operational assumptions:
  *    o Backchannel credits are ignored, just as the NFS server
  *      forechannel currently does
@@ -284,7 +286,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 			     struct rpcrdma_rep *rep)
 {
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-	struct rpcrdma_msg *headerp;
 	struct svc_serv *bc_serv;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
@@ -292,24 +293,15 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 	size_t size;
 	__be32 *p;
 
-	headerp = rdmab_to_msg(rep->rr_rdmabuf);
+	p = xdr_inline_decode(&rep->rr_stream, 0);
+	size = xdr_stream_remaining(&rep->rr_stream);
+
 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
 	pr_info("RPC:       %s: callback XID %08x, length=%u\n",
-		__func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
-	pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
+		__func__, be32_to_cpup(p), size);
+	pr_info("RPC:       %s: %*ph\n", __func__, size, p);
 #endif
 
-	/* Sanity check:
-	 * Need at least enough bytes for RPC/RDMA header, as code
-	 * here references the header fields by array offset. Also,
-	 * backward calls are always inline, so ensure there
-	 * are some bytes beyond the RPC/RDMA header.
-	 */
-	if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
-		goto out_short;
-	p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
-	size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
-
 	/* Grab a free bc rqst */
 	spin_lock(&xprt->bc_pa_lock);
 	if (list_empty(&xprt->bc_pa_list)) {
@@ -325,7 +317,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 	/* Prepare rqst */
 	rqst->rq_reply_bytes_recvd = 0;
 	rqst->rq_bytes_sent = 0;
-	rqst->rq_xid = headerp->rm_xid;
+	rqst->rq_xid = *p;
 
 	rqst->rq_private_buf.len = size;
 	set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
@@ -337,9 +329,9 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 	buf->len = size;
 
 	/* The receive buffer has to be hooked to the rpcrdma_req
-	 * so that it can be reposted after the server is done
-	 * parsing it but just before sending the backward
-	 * direction reply.
+	 * so that it is not released while the req is pointing
+	 * to its buffer, and so that it can be reposted after
+	 * the Upper Layer is done decoding it.
 	 */
 	req = rpcr_to_rdmar(rqst);
 	dprintk("RPC:       %s: attaching rep %p to req %p\n",
@@ -367,13 +359,4 @@ out_overflow:
 	 * when the connection is re-established.
 	 */
 	return;
-
-out_short:
-	pr_warn("RPC/RDMA short backward direction call\n");
-
-	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
-		xprt_disconnect_done(xprt);
-	else
-		pr_warn("RPC:       %s: reposting rep %p\n",
-			__func__, rep);
 }

+ 5 - 5
net/sunrpc/xprtrdma/fmr_ops.c

@@ -177,7 +177,7 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
-static int
+static struct rpcrdma_mr_seg *
 fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	   int nsegs, bool writing, struct rpcrdma_mw **out)
 {
@@ -188,7 +188,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
 	mw = rpcrdma_get_mw(r_xprt);
 	if (!mw)
-		return -ENOBUFS;
+		return ERR_PTR(-ENOBUFS);
 
 	pageoff = offset_in_page(seg1->mr_offset);
 	seg1->mr_offset -= pageoff;	/* start of page */
@@ -232,13 +232,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	mw->mw_offset = dma_pages[0] + pageoff;
 
 	*out = mw;
-	return mw->mw_nents;
+	return seg;
 
 out_dmamap_err:
 	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
 	       mw->mw_sg, i);
 	rpcrdma_put_mw(r_xprt, mw);
-	return -EIO;
+	return ERR_PTR(-EIO);
 
 out_maperr:
 	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
@@ -247,7 +247,7 @@ out_maperr:
 	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
 			mw->mw_sg, mw->mw_nents, mw->mw_dir);
 	rpcrdma_put_mw(r_xprt, mw);
-	return -EIO;
+	return ERR_PTR(-EIO);
 }
 
 /* Invalidate all memory regions that were registered for "req".

+ 6 - 6
net/sunrpc/xprtrdma/frwr_ops.c

@@ -344,7 +344,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 /* Post a REG_MR Work Request to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
-static int
+static struct rpcrdma_mr_seg *
 frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	    int nsegs, bool writing, struct rpcrdma_mw **out)
 {
@@ -364,7 +364,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 			rpcrdma_defer_mr_recovery(mw);
 		mw = rpcrdma_get_mw(r_xprt);
 		if (!mw)
-			return -ENOBUFS;
+			return ERR_PTR(-ENOBUFS);
 	} while (mw->frmr.fr_state != FRMR_IS_INVALID);
 	frmr = &mw->frmr;
 	frmr->fr_state = FRMR_IS_VALID;
@@ -429,25 +429,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	mw->mw_offset = mr->iova;
 
 	*out = mw;
-	return mw->mw_nents;
+	return seg;
 
 out_dmamap_err:
 	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
 	       mw->mw_sg, i);
 	frmr->fr_state = FRMR_IS_INVALID;
 	rpcrdma_put_mw(r_xprt, mw);
-	return -EIO;
+	return ERR_PTR(-EIO);
 
 out_mapmr_err:
 	pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
 	       frmr->fr_mr, n, mw->mw_nents);
 	rpcrdma_defer_mr_recovery(mw);
-	return -EIO;
+	return ERR_PTR(-EIO);
 
 out_senderr:
 	pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc);
 	rpcrdma_defer_mr_recovery(mw);
-	return -ENOTCONN;
+	return ERR_PTR(-ENOTCONN);
 }
 
 /* Invalidate all memory regions that were registered for "req".

+ 505 - 339
net/sunrpc/xprtrdma/rpc_rdma.c

@@ -169,40 +169,41 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
-/* Split "vec" on page boundaries into segments. FMR registers pages,
- * not a byte range. Other modes coalesce these segments into a single
- * MR when they can.
+/* Split @vec on page boundaries into SGEs. FMR registers pages, not
+ * a byte range. Other modes coalesce these SGEs into a single MR
+ * when they can.
+ *
+ * Returns pointer to next available SGE, and bumps the total number
+ * of SGEs consumed.
  */
-static int
-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
+static struct rpcrdma_mr_seg *
+rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
+		     unsigned int *n)
 {
-	size_t page_offset;
-	u32 remaining;
+	u32 remaining, page_offset;
 	char *base;
 
 	base = vec->iov_base;
 	page_offset = offset_in_page(base);
 	remaining = vec->iov_len;
-	while (remaining && n < RPCRDMA_MAX_SEGS) {
-		seg[n].mr_page = NULL;
-		seg[n].mr_offset = base;
-		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
-		remaining -= seg[n].mr_len;
-		base += seg[n].mr_len;
-		++n;
+	while (remaining) {
+		seg->mr_page = NULL;
+		seg->mr_offset = base;
+		seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
+		remaining -= seg->mr_len;
+		base += seg->mr_len;
+		++seg;
+		++(*n);
 		page_offset = 0;
 	}
-	return n;
+	return seg;
 }
 
-/*
- * Chunk assembly from upper layer xdr_buf.
+/* Convert @xdrbuf into SGEs no larger than a page each. As they
+ * are registered, these SGEs are then coalesced into RDMA segments
+ * when the selected memreg mode supports it.
  *
- * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
- * elements. Segments are then coalesced when registered, if possible
- * within the selected memreg mode.
- *
- * Returns positive number of segments converted, or a negative errno.
+ * Returns positive number of SGEs consumed, or a negative errno.
  */
 
 static int
@@ -210,47 +211,41 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 		     unsigned int pos, enum rpcrdma_chunktype type,
 		     struct rpcrdma_mr_seg *seg)
 {
-	int len, n, p, page_base;
+	unsigned long page_base;
+	unsigned int len, n;
 	struct page **ppages;
 
 	n = 0;
-	if (pos == 0) {
-		n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n);
-		if (n == RPCRDMA_MAX_SEGS)
-			goto out_overflow;
-	}
+	if (pos == 0)
+		seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
 
 	len = xdrbuf->page_len;
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
 	page_base = offset_in_page(xdrbuf->page_base);
-	p = 0;
-	while (len && n < RPCRDMA_MAX_SEGS) {
-		if (!ppages[p]) {
-			/* alloc the pagelist for receiving buffer */
-			ppages[p] = alloc_page(GFP_ATOMIC);
-			if (!ppages[p])
+	while (len) {
+		if (unlikely(!*ppages)) {
+			/* XXX: Certain upper layer operations do
+			 *	not provide receive buffer pages.
+			 */
+			*ppages = alloc_page(GFP_ATOMIC);
+			if (!*ppages)
 				return -EAGAIN;
 		}
-		seg[n].mr_page = ppages[p];
-		seg[n].mr_offset = (void *)(unsigned long) page_base;
-		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-		if (seg[n].mr_len > PAGE_SIZE)
-			goto out_overflow;
-		len -= seg[n].mr_len;
+		seg->mr_page = *ppages;
+		seg->mr_offset = (char *)page_base;
+		seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
+		len -= seg->mr_len;
+		++ppages;
+		++seg;
 		++n;
-		++p;
-		page_base = 0;	/* page offset only applies to first page */
+		page_base = 0;
 	}
 
-	/* Message overflows the seg array */
-	if (len && n == RPCRDMA_MAX_SEGS)
-		goto out_overflow;
-
 	/* When encoding a Read chunk, the tail iovec contains an
 	 * XDR pad and may be omitted.
 	 */
 	if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
-		return n;
+		goto out;
 
 	/* When encoding a Write chunk, some servers need to see an
 	 * extra segment for non-XDR-aligned Write chunks. The upper
@@ -258,30 +253,81 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 	 * for this purpose.
 	 */
 	if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
-		return n;
+		goto out;
 
-	if (xdrbuf->tail[0].iov_len) {
-		n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n);
-		if (n == RPCRDMA_MAX_SEGS)
-			goto out_overflow;
-	}
+	if (xdrbuf->tail[0].iov_len)
+		seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
 
+out:
+	if (unlikely(n > RPCRDMA_MAX_SEGS))
+		return -EIO;
 	return n;
+}
+
+static inline int
+encode_item_present(struct xdr_stream *xdr)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EMSGSIZE;
+
+	*p = xdr_one;
+	return 0;
+}
+
+static inline int
+encode_item_not_present(struct xdr_stream *xdr)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EMSGSIZE;
 
-out_overflow:
-	pr_err("rpcrdma: segment array overflow\n");
-	return -EIO;
+	*p = xdr_zero;
+	return 0;
 }
 
-static inline __be32 *
+static void
 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
 {
 	*iptr++ = cpu_to_be32(mw->mw_handle);
 	*iptr++ = cpu_to_be32(mw->mw_length);
-	return xdr_encode_hyper(iptr, mw->mw_offset);
+	xdr_encode_hyper(iptr, mw->mw_offset);
+}
+
+static int
+encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
+	if (unlikely(!p))
+		return -EMSGSIZE;
+
+	xdr_encode_rdma_segment(p, mw);
+	return 0;
+}
+
+static int
+encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mw *mw,
+		    u32 position)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, 6 * sizeof(*p));
+	if (unlikely(!p))
+		return -EMSGSIZE;
+
+	*p++ = xdr_one;			/* Item present */
+	*p++ = cpu_to_be32(position);
+	xdr_encode_rdma_segment(p, mw);
+	return 0;
 }
 
-/* XDR-encode the Read list. Supports encoding a list of read
+/* Register and XDR encode the Read list. Supports encoding a list of read
  * segments that belong to a single read chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
@@ -290,23 +336,20 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw)
  *   N elements, position P (same P for all chunks of same arg!):
  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Read list, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
+ *
+ * Only a single @pos value is currently supported.
  */
-static __be32 *
-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
-			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
-			 __be32 *iptr, enum rpcrdma_chunktype rtype)
+static noinline int
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+			 struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
 {
+	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mw *mw;
 	unsigned int pos;
-	int n, nsegs;
-
-	if (rtype == rpcrdma_noch) {
-		*iptr++ = xdr_zero;	/* item not present */
-		return iptr;
-	}
+	int nsegs;
 
 	pos = rqst->rq_snd_buf.head[0].iov_len;
 	if (rtype == rpcrdma_areadch)
@@ -315,40 +358,33 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
 				     rtype, seg);
 	if (nsegs < 0)
-		return ERR_PTR(nsegs);
+		return nsegs;
 
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						 false, &mw);
-		if (n < 0)
-			return ERR_PTR(n);
+		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						   false, &mw);
+		if (IS_ERR(seg))
+			return PTR_ERR(seg);
 		rpcrdma_push_mw(mw, &req->rl_registered);
 
-		*iptr++ = xdr_one;	/* item present */
-
-		/* All read segments in this chunk
-		 * have the same "position".
-		 */
-		*iptr++ = cpu_to_be32(pos);
-		iptr = xdr_encode_rdma_segment(iptr, mw);
+		if (encode_read_segment(xdr, mw, pos) < 0)
+			return -EMSGSIZE;
 
 		dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__, pos,
 			mw->mw_length, (unsigned long long)mw->mw_offset,
-			mw->mw_handle, n < nsegs ? "more" : "last");
+			mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.read_chunk_count++;
-		seg += n;
-		nsegs -= n;
+		nsegs -= mw->mw_nents;
 	} while (nsegs);
 
-	/* Finish Read list */
-	*iptr++ = xdr_zero;	/* Next item not present */
-	return iptr;
+	return 0;
 }
 
-/* XDR-encode the Write list. Supports encoding a list containing
- * one array of plain segments that belong to a single write chunk.
+/* Register and XDR encode the Write list. Supports encoding a list
+ * containing one array of plain segments that belong to a single
+ * write chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -356,66 +392,65 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO - 0
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Write list, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
+ *
+ * Only a single Write chunk is currently supported.
  */
-static __be32 *
+static noinline int
 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
-			  struct rpc_rqst *rqst, __be32 *iptr,
-			  enum rpcrdma_chunktype wtype)
+			  struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 {
+	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mw *mw;
-	int n, nsegs, nchunks;
+	int nsegs, nchunks;
 	__be32 *segcount;
 
-	if (wtype != rpcrdma_writech) {
-		*iptr++ = xdr_zero;	/* no Write list present */
-		return iptr;
-	}
-
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
 				     wtype, seg);
 	if (nsegs < 0)
-		return ERR_PTR(nsegs);
+		return nsegs;
 
-	*iptr++ = xdr_one;	/* Write list present */
-	segcount = iptr++;	/* save location of segment count */
+	if (encode_item_present(xdr) < 0)
+		return -EMSGSIZE;
+	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
+	if (unlikely(!segcount))
+		return -EMSGSIZE;
+	/* Actual value encoded below */
 
 	nchunks = 0;
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						 true, &mw);
-		if (n < 0)
-			return ERR_PTR(n);
+		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						   true, &mw);
+		if (IS_ERR(seg))
+			return PTR_ERR(seg);
 		rpcrdma_push_mw(mw, &req->rl_registered);
 
-		iptr = xdr_encode_rdma_segment(iptr, mw);
+		if (encode_rdma_segment(xdr, mw) < 0)
+			return -EMSGSIZE;
 
 		dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__,
 			mw->mw_length, (unsigned long long)mw->mw_offset,
-			mw->mw_handle, n < nsegs ? "more" : "last");
+			mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.write_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
 		nchunks++;
-		seg   += n;
-		nsegs -= n;
+		nsegs -= mw->mw_nents;
 	} while (nsegs);
 
 	/* Update count of segments in this Write chunk */
 	*segcount = cpu_to_be32(nchunks);
 
-	/* Finish Write list */
-	*iptr++ = xdr_zero;	/* Next item not present */
-	return iptr;
+	return 0;
 }
 
-/* XDR-encode the Reply chunk. Supports encoding an array of plain
- * segments that belong to a single write (reply) chunk.
+/* Register and XDR encode the Reply chunk. Supports encoding an array
+ * of plain segments that belong to a single write (reply) chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -423,58 +458,57 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
  *
- * Returns a pointer to the XDR word in the RDMA header following
- * the end of the Reply chunk, or an error pointer.
+ * Returns zero on success, or a negative errno if a failure occurred.
+ * @xdr is advanced to the next position in the stream.
  */
-static __be32 *
-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
-			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
-			   __be32 *iptr, enum rpcrdma_chunktype wtype)
+static noinline int
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+			   struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
 {
+	struct xdr_stream *xdr = &req->rl_stream;
 	struct rpcrdma_mr_seg *seg;
 	struct rpcrdma_mw *mw;
-	int n, nsegs, nchunks;
+	int nsegs, nchunks;
 	__be32 *segcount;
 
-	if (wtype != rpcrdma_replych) {
-		*iptr++ = xdr_zero;	/* no Reply chunk present */
-		return iptr;
-	}
-
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
 	if (nsegs < 0)
-		return ERR_PTR(nsegs);
+		return nsegs;
 
-	*iptr++ = xdr_one;	/* Reply chunk present */
-	segcount = iptr++;	/* save location of segment count */
+	if (encode_item_present(xdr) < 0)
+		return -EMSGSIZE;
+	segcount = xdr_reserve_space(xdr, sizeof(*segcount));
+	if (unlikely(!segcount))
+		return -EMSGSIZE;
+	/* Actual value encoded below */
 
 	nchunks = 0;
 	do {
-		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
-						 true, &mw);
-		if (n < 0)
-			return ERR_PTR(n);
+		seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
+						   true, &mw);
+		if (IS_ERR(seg))
+			return PTR_ERR(seg);
 		rpcrdma_push_mw(mw, &req->rl_registered);
 
-		iptr = xdr_encode_rdma_segment(iptr, mw);
+		if (encode_rdma_segment(xdr, mw) < 0)
+			return -EMSGSIZE;
 
 		dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
 			rqst->rq_task->tk_pid, __func__,
 			mw->mw_length, (unsigned long long)mw->mw_offset,
-			mw->mw_handle, n < nsegs ? "more" : "last");
+			mw->mw_handle, mw->mw_nents < nsegs ? "more" : "last");
 
 		r_xprt->rx_stats.reply_chunk_count++;
 		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
 		nchunks++;
-		seg   += n;
-		nsegs -= n;
+		nsegs -= mw->mw_nents;
 	} while (nsegs);
 
 	/* Update count of segments in the Reply chunk */
 	*segcount = cpu_to_be32(nchunks);
 
-	return iptr;
+	return 0;
 }
 
 /* Prepare the RPC-over-RDMA header SGE.
@@ -651,37 +685,52 @@ rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 	req->rl_mapped_sges = 0;
 }
 
-/*
- * Marshal a request: the primary job of this routine is to choose
- * the transfer modes. See comments below.
+/**
+ * rpcrdma_marshal_req - Marshal and send one RPC request
+ * @r_xprt: controlling transport
+ * @rqst: RPC request to be marshaled
+ *
+ * For the RPC in "rqst", this function:
+ *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
+ *  - Registers Read, Write, and Reply chunks
+ *  - Constructs the transport header
+ *  - Posts a Send WR to send the transport header and request
  *
- * Returns zero on success, otherwise a negative errno.
+ * Returns:
+ *	%0 if the RPC was sent successfully,
+ *	%-ENOTCONN if the connection was lost,
+ *	%-EAGAIN if not enough pages are available for on-demand reply buffer,
+ *	%-ENOBUFS if no MRs are available to register chunks,
+ *	%-EMSGSIZE if the transport header is too small,
+ *	%-EIO if a permanent problem occurred while marshaling.
  */
-
 int
-rpcrdma_marshal_req(struct rpc_rqst *rqst)
+rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 {
-	struct rpc_xprt *xprt = rqst->rq_xprt;
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct xdr_stream *xdr = &req->rl_stream;
 	enum rpcrdma_chunktype rtype, wtype;
-	struct rpcrdma_msg *headerp;
 	bool ddp_allowed;
-	ssize_t hdrlen;
-	size_t rpclen;
-	__be32 *iptr;
+	__be32 *p;
+	int ret;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
 		return rpcrdma_bc_marshal_reply(rqst);
 #endif
 
-	headerp = rdmab_to_msg(req->rl_rdmabuf);
-	/* don't byte-swap XID, it's already done in request */
-	headerp->rm_xid = rqst->rq_xid;
-	headerp->rm_vers = rpcrdma_version;
-	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
-	headerp->rm_type = rdma_msg;
+	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
+	xdr_init_encode(xdr, &req->rl_hdrbuf,
+			req->rl_rdmabuf->rg_base);
+
+	/* Fixed header fields */
+	ret = -EMSGSIZE;
+	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
+	if (!p)
+		goto out_err;
+	*p++ = rqst->rq_xid;
+	*p++ = rpcrdma_version;
+	*p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
 
 	/* When the ULP employs a GSS flavor that guarantees integrity
 	 * or privacy, direct data placement of individual data items
@@ -721,17 +770,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * by themselves are larger than the inline threshold.
 	 */
 	if (rpcrdma_args_inline(r_xprt, rqst)) {
+		*p++ = rdma_msg;
 		rtype = rpcrdma_noch;
-		rpclen = rqst->rq_snd_buf.len;
 	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
+		*p++ = rdma_msg;
 		rtype = rpcrdma_readch;
-		rpclen = rqst->rq_snd_buf.head[0].iov_len +
-			 rqst->rq_snd_buf.tail[0].iov_len;
 	} else {
 		r_xprt->rx_stats.nomsg_call_count++;
-		headerp->rm_type = htonl(RDMA_NOMSG);
+		*p++ = rdma_nomsg;
 		rtype = rpcrdma_areadch;
-		rpclen = 0;
 	}
 
 	req->rl_xid = rqst->rq_xid;
@@ -759,79 +806,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * send a Call message with a Position Zero Read chunk and a
 	 * regular Read chunk at the same time.
 	 */
-	iptr = headerp->rm_body.rm_chunks;
-	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
-	if (IS_ERR(iptr))
+	if (rtype != rpcrdma_noch) {
+		ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
+		if (ret)
+			goto out_err;
+	}
+	ret = encode_item_not_present(xdr);
+	if (ret)
 		goto out_err;
-	iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
-	if (IS_ERR(iptr))
+
+	if (wtype == rpcrdma_writech) {
+		ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
+		if (ret)
+			goto out_err;
+	}
+	ret = encode_item_not_present(xdr);
+	if (ret)
 		goto out_err;
-	iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
-	if (IS_ERR(iptr))
+
+	if (wtype != rpcrdma_replych)
+		ret = encode_item_not_present(xdr);
+	else
+		ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
+	if (ret)
 		goto out_err;
-	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
 
-	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+	dprintk("RPC: %5u %s: %s/%s: hdrlen %u rpclen\n",
 		rqst->rq_task->tk_pid, __func__,
 		transfertypes[rtype], transfertypes[wtype],
-		hdrlen, rpclen);
+		xdr_stream_pos(xdr));
 
-	if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+	if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req,
+				       xdr_stream_pos(xdr),
 				       &rqst->rq_snd_buf, rtype)) {
-		iptr = ERR_PTR(-EIO);
+		ret = -EIO;
 		goto out_err;
 	}
 	return 0;
 
 out_err:
-	if (PTR_ERR(iptr) != -ENOBUFS) {
-		pr_err("rpcrdma: rpcrdma_marshal_req failed, status %ld\n",
-		       PTR_ERR(iptr));
+	if (ret != -ENOBUFS) {
+		pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
 		r_xprt->rx_stats.failed_marshal_count++;
 	}
-	return PTR_ERR(iptr);
-}
-
-/*
- * Chase down a received write or reply chunklist to get length
- * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
- */
-static int
-rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
-{
-	unsigned int i, total_len;
-	struct rpcrdma_write_chunk *cur_wchunk;
-	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
-
-	i = be32_to_cpu(**iptrp);
-	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
-	total_len = 0;
-	while (i--) {
-		struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
-		ifdebug(FACILITY) {
-			u64 off;
-			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
-			dprintk("RPC:       %s: chunk %d@0x%016llx:0x%08x\n",
-				__func__,
-				be32_to_cpu(seg->rs_length),
-				(unsigned long long)off,
-				be32_to_cpu(seg->rs_handle));
-		}
-		total_len += be32_to_cpu(seg->rs_length);
-		++cur_wchunk;
-	}
-	/* check and adjust for properly terminated write chunk */
-	if (wrchunk) {
-		__be32 *w = (__be32 *) cur_wchunk;
-		if (*w++ != xdr_zero)
-			return -1;
-		cur_wchunk = (struct rpcrdma_write_chunk *) w;
-	}
-	if ((char *)cur_wchunk > base + rep->rr_len)
-		return -1;
-
-	*iptrp = (__be32 *) cur_wchunk;
-	return total_len;
+	return ret;
 }
 
 /**
@@ -949,37 +967,254 @@ rpcrdma_mark_remote_invalidation(struct list_head *mws,
 		}
 }
 
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  */
 static bool
-rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+		 __be32 xid, __be32 proc)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 {
-	__be32 *p = (__be32 *)headerp;
+	struct xdr_stream *xdr = &rep->rr_stream;
+	__be32 *p;
 
-	if (headerp->rm_type != rdma_msg)
+	if (proc != rdma_msg)
 		return false;
-	if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+
+	/* Peek at stream contents without advancing. */
+	p = xdr_inline_decode(xdr, 0);
+
+	/* Chunk lists */
+	if (*p++ != xdr_zero)
 		return false;
-	if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
-	if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
 
-	/* sanity */
-	if (p[7] != headerp->rm_xid)
+	/* RPC header */
+	if (*p++ != xid)
 		return false;
-	/* call direction */
-	if (p[8] != cpu_to_be32(RPC_CALL))
+	if (*p != cpu_to_be32(RPC_CALL))
 		return false;
 
+	/* Now that we are sure this is a backchannel call,
+	 * advance to the RPC header.
+	 */
+	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
+	if (unlikely(!p))
+		goto out_short;
+
+	rpcrdma_bc_receive_call(r_xprt, rep);
+	return true;
+
+out_short:
+	pr_warn("RPC/RDMA short backward direction call\n");
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
+		xprt_disconnect_done(&r_xprt->rx_xprt);
 	return true;
 }
+#else	/* CONFIG_SUNRPC_BACKCHANNEL */
+{
+	return false;
+}
 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 
+static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
+{
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, 4 * sizeof(*p));
+	if (unlikely(!p))
+		return -EIO;
+
+	ifdebug(FACILITY) {
+		u64 offset;
+		u32 handle;
+
+		handle = be32_to_cpup(p++);
+		*length = be32_to_cpup(p++);
+		xdr_decode_hyper(p, &offset);
+		dprintk("RPC:       %s:   segment %u@0x%016llx:0x%08x\n",
+			__func__, *length, (unsigned long long)offset,
+			handle);
+	} else {
+		*length = be32_to_cpup(p + 1);
+	}
+
+	return 0;
+}
+
+static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
+{
+	u32 segcount, seglength;
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EIO;
+
+	*length = 0;
+	segcount = be32_to_cpup(p);
+	while (segcount--) {
+		if (decode_rdma_segment(xdr, &seglength))
+			return -EIO;
+		*length += seglength;
+	}
+
+	dprintk("RPC:       %s: segcount=%u, %u bytes\n",
+		__func__, be32_to_cpup(p), *length);
+	return 0;
+}
+
+/* In RPC-over-RDMA Version One replies, a Read list is never
+ * expected. This decoder is a stub that returns an error if
+ * a Read list is present.
+ */
+static int decode_read_list(struct xdr_stream *xdr)
+{
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EIO;
+	if (unlikely(*p != xdr_zero))
+		return -EIO;
+	return 0;
+}
+
+/* Supports only one Write chunk in the Write list
+ */
+static int decode_write_list(struct xdr_stream *xdr, u32 *length)
+{
+	u32 chunklen;
+	bool first;
+	__be32 *p;
+
+	*length = 0;
+	first = true;
+	do {
+		p = xdr_inline_decode(xdr, sizeof(*p));
+		if (unlikely(!p))
+			return -EIO;
+		if (*p == xdr_zero)
+			break;
+		if (!first)
+			return -EIO;
+
+		if (decode_write_chunk(xdr, &chunklen))
+			return -EIO;
+		*length += chunklen;
+		first = false;
+	} while (true);
+	return 0;
+}
+
+static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
+{
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EIO;
+
+	*length = 0;
+	if (*p != xdr_zero)
+		if (decode_write_chunk(xdr, length))
+			return -EIO;
+	return 0;
+}
+
+static int
+rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+		   struct rpc_rqst *rqst)
+{
+	struct xdr_stream *xdr = &rep->rr_stream;
+	u32 writelist, replychunk, rpclen;
+	char *base;
+
+	/* Decode the chunk lists */
+	if (decode_read_list(xdr))
+		return -EIO;
+	if (decode_write_list(xdr, &writelist))
+		return -EIO;
+	if (decode_reply_chunk(xdr, &replychunk))
+		return -EIO;
+
+	/* RDMA_MSG sanity checks */
+	if (unlikely(replychunk))
+		return -EIO;
+
+	/* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
+	base = (char *)xdr_inline_decode(xdr, 0);
+	rpclen = xdr_stream_remaining(xdr);
+	r_xprt->rx_stats.fixup_copy_count +=
+		rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
+
+	r_xprt->rx_stats.total_rdma_reply += writelist;
+	return rpclen + xdr_align_size(writelist);
+}
+
+static noinline int
+rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
+{
+	struct xdr_stream *xdr = &rep->rr_stream;
+	u32 writelist, replychunk;
+
+	/* Decode the chunk lists */
+	if (decode_read_list(xdr))
+		return -EIO;
+	if (decode_write_list(xdr, &writelist))
+		return -EIO;
+	if (decode_reply_chunk(xdr, &replychunk))
+		return -EIO;
+
+	/* RDMA_NOMSG sanity checks */
+	if (unlikely(writelist))
+		return -EIO;
+	if (unlikely(!replychunk))
+		return -EIO;
+
+	/* Reply chunk buffer already is the reply vector */
+	r_xprt->rx_stats.total_rdma_reply += replychunk;
+	return replychunk;
+}
+
+static noinline int
+rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
+		     struct rpc_rqst *rqst)
+{
+	struct xdr_stream *xdr = &rep->rr_stream;
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (unlikely(!p))
+		return -EIO;
+
+	switch (*p) {
+	case err_vers:
+		p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+		if (!p)
+			break;
+		dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
+			rqst->rq_task->tk_pid, __func__,
+			be32_to_cpup(p), be32_to_cpu(*(p + 1)));
+		break;
+	case err_chunk:
+		dprintk("RPC: %5u: %s: server reports header decoding error\n",
+			rqst->rq_task->tk_pid, __func__);
+		break;
+	default:
+		dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
+			rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
+	}
+
+	r_xprt->rx_stats.bad_reply_count++;
+	return -EREMOTEIO;
+}
+
 /* Process received RPC/RDMA messages.
  *
  * Errors must result in the RPC task either being awakened, or
@@ -993,33 +1228,39 @@ rpcrdma_reply_handler(struct work_struct *work)
 	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-	struct rpcrdma_msg *headerp;
+	struct xdr_stream *xdr = &rep->rr_stream;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	__be32 *iptr;
-	int rdmalen, status, rmerr;
+	__be32 *p, xid, vers, proc;
 	unsigned long cwnd;
 	struct list_head mws;
+	int status;
 
 	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
-	if (rep->rr_len == RPCRDMA_BAD_LEN)
+	if (rep->rr_hdrbuf.head[0].iov_len == 0)
 		goto out_badstatus;
-	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
+
+	xdr_init_decode(xdr, &rep->rr_hdrbuf,
+			rep->rr_hdrbuf.head[0].iov_base);
+
+	/* Fixed transport header fields */
+	p = xdr_inline_decode(xdr, 4 * sizeof(*p));
+	if (unlikely(!p))
 		goto out_shortreply;
+	xid = *p++;
+	vers = *p++;
+	p++;	/* credits */
+	proc = *p++;
 
-	headerp = rdmab_to_msg(rep->rr_rdmabuf);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-	if (rpcrdma_is_bcall(headerp))
-		goto out_bcall;
-#endif
+	if (rpcrdma_is_bcall(r_xprt, rep, xid, proc))
+		return;
 
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
 	spin_lock(&buf->rb_lock);
-	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
-					headerp->rm_xid);
+	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, xid);
 	if (!req)
 		goto out_nomatch;
 	if (req->rl_reply)
@@ -1035,7 +1276,7 @@ rpcrdma_reply_handler(struct work_struct *work)
 	spin_unlock(&buf->rb_lock);
 
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
-		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+		__func__, rep, req, be32_to_cpu(xid));
 
 	/* Invalidate and unmap the data payloads before waking the
 	 * waiting application. This guarantees the memory regions
@@ -1052,82 +1293,28 @@ rpcrdma_reply_handler(struct work_struct *work)
 	 * the rep, rqst, and rq_task pointers remain stable.
 	 */
 	spin_lock(&xprt->recv_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	rqst = xprt_lookup_rqst(xprt, xid);
 	if (!rqst)
 		goto out_norqst;
 	xprt->reestablish_timeout = 0;
-	if (headerp->rm_vers != rpcrdma_version)
+	if (vers != rpcrdma_version)
 		goto out_badversion;
 
-	/* check for expected message types */
-	/* The order of some of these tests is important. */
-	switch (headerp->rm_type) {
+	switch (proc) {
 	case rdma_msg:
-		/* never expect read chunks */
-		/* never expect reply chunks (two ways to check) */
-		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
-		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
-		     headerp->rm_body.rm_chunks[2] != xdr_zero))
-			goto badheader;
-		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
-			/* count any expected write chunks in read reply */
-			/* start at write chunk array count */
-			iptr = &headerp->rm_body.rm_chunks[2];
-			rdmalen = rpcrdma_count_chunks(rep, 1, &iptr);
-			/* check for validity, and no reply chunk after */
-			if (rdmalen < 0 || *iptr++ != xdr_zero)
-				goto badheader;
-			rep->rr_len -=
-			    ((unsigned char *)iptr - (unsigned char *)headerp);
-			status = rep->rr_len + rdmalen;
-			r_xprt->rx_stats.total_rdma_reply += rdmalen;
-			/* special case - last chunk may omit padding */
-			if (rdmalen &= 3) {
-				rdmalen = 4 - rdmalen;
-				status += rdmalen;
-			}
-		} else {
-			/* else ordinary inline */
-			rdmalen = 0;
-			iptr = (__be32 *)((unsigned char *)headerp +
-							RPCRDMA_HDRLEN_MIN);
-			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
-			status = rep->rr_len;
-		}
-
-		r_xprt->rx_stats.fixup_copy_count +=
-			rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len,
-					     rdmalen);
+		status = rpcrdma_decode_msg(r_xprt, rep, rqst);
 		break;
-
 	case rdma_nomsg:
-		/* never expect read or write chunks, always reply chunks */
-		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
-		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
-		    headerp->rm_body.rm_chunks[2] != xdr_one)
-			goto badheader;
-		iptr = (__be32 *)((unsigned char *)headerp +
-							RPCRDMA_HDRLEN_MIN);
-		rdmalen = rpcrdma_count_chunks(rep, 0, &iptr);
-		if (rdmalen < 0)
-			goto badheader;
-		r_xprt->rx_stats.total_rdma_reply += rdmalen;
-		/* Reply chunk buffer already is the reply vector - no fixup. */
-		status = rdmalen;
+		status = rpcrdma_decode_nomsg(r_xprt, rep);
 		break;
-
 	case rdma_error:
-		goto out_rdmaerr;
-
-badheader:
+		status = rpcrdma_decode_error(r_xprt, rep, rqst);
+		break;
 	default:
-		dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
-			rqst->rq_task->tk_pid, __func__,
-			be32_to_cpu(headerp->rm_type));
 		status = -EIO;
-		r_xprt->rx_stats.bad_reply_count++;
-		break;
 	}
+	if (status < 0)
+		goto out_badheader;
 
 out:
 	cwnd = xprt->cwnd;
@@ -1149,42 +1336,22 @@ out_badstatus:
 	}
 	return;
 
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-out_bcall:
-	rpcrdma_bc_receive_call(r_xprt, rep);
-	return;
-#endif
-
 /* If the incoming reply terminated a pending RPC, the next
  * RPC call will post a replacement receive buffer as it is
  * being marshaled.
  */
 out_badversion:
 	dprintk("RPC:       %s: invalid version %d\n",
-		__func__, be32_to_cpu(headerp->rm_vers));
+		__func__, be32_to_cpu(vers));
 	status = -EIO;
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-out_rdmaerr:
-	rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
-	switch (rmerr) {
-	case ERR_VERS:
-		pr_err("%s: server reports header version error (%u-%u)\n",
-		       __func__,
-		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
-		       be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
-		break;
-	case ERR_CHUNK:
-		pr_err("%s: server reports header decoding error\n",
-		       __func__);
-		break;
-	default:
-		pr_err("%s: server reports unknown error %d\n",
-		       __func__, rmerr);
-	}
-	status = -EREMOTEIO;
+out_badheader:
+	dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
+		rqst->rq_task->tk_pid, __func__, be32_to_cpu(proc));
 	r_xprt->rx_stats.bad_reply_count++;
+	status = -EIO;
 	goto out;
 
 /* The req was still available, but by the time the recv_lock
@@ -1204,16 +1371,15 @@ out_shortreply:
 
 out_nomatch:
 	spin_unlock(&buf->rb_lock);
-	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
-		__func__, be32_to_cpu(headerp->rm_xid),
-		rep->rr_len);
+	dprintk("RPC:       %s: no match for incoming xid 0x%08x\n",
+		__func__, be32_to_cpu(xid));
 	goto repost;
 
 out_duplicate:
 	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
-		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+		__func__, rep, req, be32_to_cpu(xid));
 
 /* If no pending RPC transaction was matched, post a replacement
  * receive buffer before returning.

+ 1 - 1
net/sunrpc/xprtrdma/svc_rdma_backchannel.c

@@ -269,7 +269,7 @@ xprt_rdma_bc_put(struct rpc_xprt *xprt)
 	module_put(THIS_MODULE);
 }
 
-static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
 	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong,
 	.alloc_slot		= xprt_alloc_slot,

+ 4 - 3
net/sunrpc/xprtrdma/transport.c

@@ -149,7 +149,7 @@ static struct ctl_table sunrpc_table[] = {
 
 #endif
 
-static struct rpc_xprt_ops xprt_rdma_procs;	/*forward reference */
+static const struct rpc_xprt_ops xprt_rdma_procs;
 
 static void
 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -559,6 +559,7 @@ rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 
 	r_xprt->rx_stats.hardway_register_count += size;
 	req->rl_rdmabuf = rb;
+	xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
 	return true;
 }
 
@@ -730,7 +731,7 @@ xprt_rdma_send_request(struct rpc_task *task)
 	if (unlikely(!list_empty(&req->rl_registered)))
 		r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
 
-	rc = rpcrdma_marshal_req(rqst);
+	rc = rpcrdma_marshal_req(r_xprt, rqst);
 	if (rc < 0)
 		goto failed_marshal;
 
@@ -811,7 +812,7 @@ xprt_rdma_disable_swap(struct rpc_xprt *xprt)
  * Plumbing for rpc transport switch and kernel module
  */
 
-static struct rpc_xprt_ops xprt_rdma_procs = {
+static const struct rpc_xprt_ops xprt_rdma_procs = {
 	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
 	.alloc_slot		= xprt_alloc_slot,

+ 9 - 12
net/sunrpc/xprtrdma/verbs.c

@@ -139,14 +139,11 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 static void
 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 {
-	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
 	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
+	__be32 *p = rep->rr_rdmabuf->rg_base;
 	u32 credits;
 
-	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
-		return;
-
-	credits = be32_to_cpu(rmsgp->rm_credit);
+	credits = be32_to_cpup(p + 2);
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
 	else if (credits > buffer->rb_max_requests)
@@ -173,21 +170,19 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 		goto out_fail;
 
 	/* status == SUCCESS means all fields in wc are trustworthy */
-	if (wc->opcode != IB_WC_RECV)
-		return;
-
 	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
 		__func__, rep, wc->byte_len);
 
-	rep->rr_len = wc->byte_len;
+	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
 	rep->rr_wc_flags = wc->wc_flags;
 	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
 
 	ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
 				   rdmab_addr(rep->rr_rdmabuf),
-				   rep->rr_len, DMA_FROM_DEVICE);
+				   wc->byte_len, DMA_FROM_DEVICE);
 
-	rpcrdma_update_granted_credits(rep);
+	if (wc->byte_len >= RPCRDMA_HDRLEN_ERR)
+		rpcrdma_update_granted_credits(rep);
 
 out_schedule:
 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
@@ -198,7 +193,7 @@ out_fail:
 		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
 		       ib_wc_status_msg(wc->status),
 		       wc->status, wc->vendor_err);
-	rep->rr_len = RPCRDMA_BAD_LEN;
+	rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0);
 	goto out_schedule;
 }
 
@@ -974,6 +969,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 		rc = PTR_ERR(rep->rr_rdmabuf);
 		goto out_free;
 	}
+	xdr_buf_init(&rep->rr_hdrbuf, rep->rr_rdmabuf->rg_base,
+		     rdmab_length(rep->rr_rdmabuf));
 
 	rep->rr_cqe.done = rpcrdma_wc_receive;
 	rep->rr_rxprt = r_xprt;

+ 22 - 11
net/sunrpc/xprtrdma/xprt_rdma.h

@@ -218,18 +218,17 @@ enum {
 
 struct rpcrdma_rep {
 	struct ib_cqe		rr_cqe;
-	unsigned int		rr_len;
 	int			rr_wc_flags;
 	u32			rr_inv_rkey;
+	struct rpcrdma_regbuf	*rr_rdmabuf;
 	struct rpcrdma_xprt	*rr_rxprt;
 	struct work_struct	rr_work;
+	struct xdr_buf		rr_hdrbuf;
+	struct xdr_stream	rr_stream;
 	struct list_head	rr_list;
 	struct ib_recv_wr	rr_recv_wr;
-	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
 
-#define RPCRDMA_BAD_LEN		(~0U)
-
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
@@ -346,6 +345,8 @@ struct rpcrdma_req {
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;
+	struct xdr_stream	rl_stream;
+	struct xdr_buf		rl_hdrbuf;
 	struct ib_send_wr	rl_send_wr;
 	struct ib_sge		rl_send_sge[RPCRDMA_MAX_SEND_SGES];
 	struct rpcrdma_regbuf	*rl_rdmabuf;	/* xprt header */
@@ -440,24 +441,27 @@ struct rpcrdma_create_data_internal {
  * Statistics for RPCRDMA
  */
 struct rpcrdma_stats {
+	/* accessed when sending a call */
 	unsigned long		read_chunk_count;
 	unsigned long		write_chunk_count;
 	unsigned long		reply_chunk_count;
-
 	unsigned long long	total_rdma_request;
-	unsigned long long	total_rdma_reply;
 
+	/* rarely accessed error counters */
 	unsigned long long	pullup_copy_count;
-	unsigned long long	fixup_copy_count;
 	unsigned long		hardway_register_count;
 	unsigned long		failed_marshal_count;
 	unsigned long		bad_reply_count;
-	unsigned long		nomsg_call_count;
-	unsigned long		bcall_count;
 	unsigned long		mrs_recovered;
 	unsigned long		mrs_orphaned;
 	unsigned long		mrs_allocated;
+
+	/* accessed when receiving a reply */
+	unsigned long long	total_rdma_reply;
+	unsigned long long	fixup_copy_count;
 	unsigned long		local_inv_needed;
+	unsigned long		nomsg_call_count;
+	unsigned long		bcall_count;
 };
 
 /*
@@ -465,7 +469,8 @@ struct rpcrdma_stats {
  */
 struct rpcrdma_xprt;
 struct rpcrdma_memreg_ops {
-	int		(*ro_map)(struct rpcrdma_xprt *,
+	struct rpcrdma_mr_seg *
+			(*ro_map)(struct rpcrdma_xprt *,
 				  struct rpcrdma_mr_seg *, int, bool,
 				  struct rpcrdma_mw **);
 	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
@@ -638,10 +643,16 @@ enum rpcrdma_chunktype {
 bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
 			       u32, struct xdr_buf *, enum rpcrdma_chunktype);
 void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
-int rpcrdma_marshal_req(struct rpc_rqst *);
+int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_reply_handler(struct work_struct *work);
 
+static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
+{
+	xdr->head[0].iov_len = len;
+	xdr->len = len;
+}
+
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
 extern unsigned int xprt_rdma_max_inline_read;

+ 4 - 4
net/sunrpc/xprtsock.c

@@ -2728,7 +2728,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
 	module_put(THIS_MODULE);
 }
 
-static struct rpc_xprt_ops xs_local_ops = {
+static const struct rpc_xprt_ops xs_local_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xs_tcp_release_xprt,
 	.alloc_slot		= xprt_alloc_slot,
@@ -2746,7 +2746,7 @@ static struct rpc_xprt_ops xs_local_ops = {
 	.disable_swap		= xs_disable_swap,
 };
 
-static struct rpc_xprt_ops xs_udp_ops = {
+static const struct rpc_xprt_ops xs_udp_ops = {
 	.set_buffer_size	= xs_udp_set_buffer_size,
 	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong,
@@ -2768,7 +2768,7 @@ static struct rpc_xprt_ops xs_udp_ops = {
 	.inject_disconnect	= xs_inject_disconnect,
 };
 
-static struct rpc_xprt_ops xs_tcp_ops = {
+static const struct rpc_xprt_ops xs_tcp_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xs_tcp_release_xprt,
 	.alloc_slot		= xprt_lock_and_alloc_slot,
@@ -2799,7 +2799,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
  * The rpc_xprt_ops for the server backchannel
  */
 
-static struct rpc_xprt_ops bc_tcp_ops = {
+static const struct rpc_xprt_ops bc_tcp_ops = {
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xprt_release_xprt,
 	.alloc_slot		= xprt_alloc_slot,