浏览代码

Merge tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux

Pull nfsd updates from Bruce Fields:
 "Another RDMA update from Chuck Lever, and a bunch of miscellaneous
  bugfixes"

* tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux: (26 commits)
  nfsd: Fix up the "supattr_exclcreat" attributes
  nfsd: encoders mustn't use unitialized values in error cases
  nfsd: fix undefined behavior in nfsd4_layout_verify
  lockd: fix lockd shutdown race
  NFSv4: Fix callback server shutdown
  SUNRPC: Refactor svc_set_num_threads()
  NFSv4.x/callback: Create the callback service through svc_create_pooled
  lockd: remove redundant check on block
  svcrdma: Clean out old XDR encoders
  svcrdma: Remove the req_map cache
  svcrdma: Remove unused RDMA Write completion handler
  svcrdma: Reduce size of sge array in struct svc_rdma_op_ctxt
  svcrdma: Clean up RPC-over-RDMA backchannel reply processing
  svcrdma: Report Write/Reply chunk overruns
  svcrdma: Clean up RDMA_ERROR path
  svcrdma: Use rdma_rw API in RPC reply path
  svcrdma: Introduce local rdma_rw API helpers
  svcrdma: Clean up svc_rdma_get_inv_rkey()
  svcrdma: Add helper to save pages under I/O
  svcrdma: Eliminate RPCRDMA_SQ_DEPTH_MULT
  ...
Linus Torvalds 8 年之前
父节点
当前提交
c70422f760

+ 4 - 2
fs/lockd/svc.c

@@ -132,6 +132,8 @@ lockd(void *vrqstp)
 {
 {
 	int		err = 0;
 	int		err = 0;
 	struct svc_rqst *rqstp = vrqstp;
 	struct svc_rqst *rqstp = vrqstp;
+	struct net *net = &init_net;
+	struct lockd_net *ln = net_generic(net, lockd_net_id);
 
 
 	/* try_to_freeze() is called from svc_recv() */
 	/* try_to_freeze() is called from svc_recv() */
 	set_freezable();
 	set_freezable();
@@ -176,6 +178,8 @@ lockd(void *vrqstp)
 	if (nlmsvc_ops)
 	if (nlmsvc_ops)
 		nlmsvc_invalidate_all();
 		nlmsvc_invalidate_all();
 	nlm_shutdown_hosts();
 	nlm_shutdown_hosts();
+	cancel_delayed_work_sync(&ln->grace_period_end);
+	locks_end_grace(&ln->lockd_manager);
 	return 0;
 	return 0;
 }
 }
 
 
@@ -270,8 +274,6 @@ static void lockd_down_net(struct svc_serv *serv, struct net *net)
 	if (ln->nlmsvc_users) {
 	if (ln->nlmsvc_users) {
 		if (--ln->nlmsvc_users == 0) {
 		if (--ln->nlmsvc_users == 0) {
 			nlm_shutdown_hosts_net(net);
 			nlm_shutdown_hosts_net(net);
-			cancel_delayed_work_sync(&ln->grace_period_end);
-			locks_end_grace(&ln->lockd_manager);
 			svc_shutdown_net(serv, net);
 			svc_shutdown_net(serv, net);
 			dprintk("lockd_down_net: per-net data destroyed; net=%p\n", net);
 			dprintk("lockd_down_net: per-net data destroyed; net=%p\n", net);
 		}
 		}

+ 9 - 9
fs/lockd/svclock.c

@@ -870,15 +870,15 @@ nlmsvc_grant_reply(struct nlm_cookie *cookie, __be32 status)
 	if (!(block = nlmsvc_find_block(cookie)))
 	if (!(block = nlmsvc_find_block(cookie)))
 		return;
 		return;
 
 
-	if (block) {
-		if (status == nlm_lck_denied_grace_period) {
-			/* Try again in a couple of seconds */
-			nlmsvc_insert_block(block, 10 * HZ);
-		} else {
-			/* Lock is now held by client, or has been rejected.
-			 * In both cases, the block should be removed. */
-			nlmsvc_unlink_block(block);
-		}
+	if (status == nlm_lck_denied_grace_period) {
+		/* Try again in a couple of seconds */
+		nlmsvc_insert_block(block, 10 * HZ);
+	} else {
+		/*
+		 * Lock is now held by client, or has been rejected.
+		 * In both cases, the block should be removed.
+		 */
+		nlmsvc_unlink_block(block);
 	}
 	}
 	nlmsvc_release_block(block);
 	nlmsvc_release_block(block);
 }
 }

+ 17 - 9
fs/nfs/callback.c

@@ -76,7 +76,10 @@ nfs4_callback_svc(void *vrqstp)
 
 
 	set_freezable();
 	set_freezable();
 
 
-	while (!kthread_should_stop()) {
+	while (!kthread_freezable_should_stop(NULL)) {
+
+		if (signal_pending(current))
+			flush_signals(current);
 		/*
 		/*
 		 * Listen for a request on the socket
 		 * Listen for a request on the socket
 		 */
 		 */
@@ -85,6 +88,8 @@ nfs4_callback_svc(void *vrqstp)
 			continue;
 			continue;
 		svc_process(rqstp);
 		svc_process(rqstp);
 	}
 	}
+	svc_exit_thread(rqstp);
+	module_put_and_exit(0);
 	return 0;
 	return 0;
 }
 }
 
 
@@ -103,9 +108,10 @@ nfs41_callback_svc(void *vrqstp)
 
 
 	set_freezable();
 	set_freezable();
 
 
-	while (!kthread_should_stop()) {
-		if (try_to_freeze())
-			continue;
+	while (!kthread_freezable_should_stop(NULL)) {
+
+		if (signal_pending(current))
+			flush_signals(current);
 
 
 		prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE);
 		prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE);
 		spin_lock_bh(&serv->sv_cb_lock);
 		spin_lock_bh(&serv->sv_cb_lock);
@@ -121,11 +127,13 @@ nfs41_callback_svc(void *vrqstp)
 				error);
 				error);
 		} else {
 		} else {
 			spin_unlock_bh(&serv->sv_cb_lock);
 			spin_unlock_bh(&serv->sv_cb_lock);
-			schedule();
+			if (!kthread_should_stop())
+				schedule();
 			finish_wait(&serv->sv_cb_waitq, &wq);
 			finish_wait(&serv->sv_cb_waitq, &wq);
 		}
 		}
-		flush_signals(current);
 	}
 	}
+	svc_exit_thread(rqstp);
+	module_put_and_exit(0);
 	return 0;
 	return 0;
 }
 }
 
 
@@ -221,14 +229,14 @@ err_bind:
 static struct svc_serv_ops nfs40_cb_sv_ops = {
 static struct svc_serv_ops nfs40_cb_sv_ops = {
 	.svo_function		= nfs4_callback_svc,
 	.svo_function		= nfs4_callback_svc,
 	.svo_enqueue_xprt	= svc_xprt_do_enqueue,
 	.svo_enqueue_xprt	= svc_xprt_do_enqueue,
-	.svo_setup		= svc_set_num_threads,
+	.svo_setup		= svc_set_num_threads_sync,
 	.svo_module		= THIS_MODULE,
 	.svo_module		= THIS_MODULE,
 };
 };
 #if defined(CONFIG_NFS_V4_1)
 #if defined(CONFIG_NFS_V4_1)
 static struct svc_serv_ops nfs41_cb_sv_ops = {
 static struct svc_serv_ops nfs41_cb_sv_ops = {
 	.svo_function		= nfs41_callback_svc,
 	.svo_function		= nfs41_callback_svc,
 	.svo_enqueue_xprt	= svc_xprt_do_enqueue,
 	.svo_enqueue_xprt	= svc_xprt_do_enqueue,
-	.svo_setup		= svc_set_num_threads,
+	.svo_setup		= svc_set_num_threads_sync,
 	.svo_module		= THIS_MODULE,
 	.svo_module		= THIS_MODULE,
 };
 };
 
 
@@ -280,7 +288,7 @@ static struct svc_serv *nfs_callback_create_svc(int minorversion)
 		printk(KERN_WARNING "nfs_callback_create_svc: no kthread, %d users??\n",
 		printk(KERN_WARNING "nfs_callback_create_svc: no kthread, %d users??\n",
 			cb_info->users);
 			cb_info->users);
 
 
-	serv = svc_create(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops);
+	serv = svc_create_pooled(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops);
 	if (!serv) {
 	if (!serv) {
 		printk(KERN_ERR "nfs_callback_create_svc: create service failed\n");
 		printk(KERN_ERR "nfs_callback_create_svc: create service failed\n");
 		return ERR_PTR(-ENOMEM);
 		return ERR_PTR(-ENOMEM);

+ 17 - 6
fs/nfsd/nfs3xdr.c

@@ -334,8 +334,11 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
 	if (!p)
 	if (!p)
 		return 0;
 		return 0;
 	p = xdr_decode_hyper(p, &args->offset);
 	p = xdr_decode_hyper(p, &args->offset);
-
 	args->count = ntohl(*p++);
 	args->count = ntohl(*p++);
+
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
+
 	len = min(args->count, max_blocksize);
 	len = min(args->count, max_blocksize);
 
 
 	/* set up the kvec */
 	/* set up the kvec */
@@ -349,7 +352,7 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
 		v++;
 		v++;
 	}
 	}
 	args->vlen = v;
 	args->vlen = v;
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int
@@ -541,9 +544,11 @@ nfs3svc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p,
 	p = decode_fh(p, &args->fh);
 	p = decode_fh(p, &args->fh);
 	if (!p)
 	if (!p)
 		return 0;
 		return 0;
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 
 
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int
@@ -569,10 +574,14 @@ nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
 	args->verf   = p; p += 2;
 	args->verf   = p; p += 2;
 	args->dircount = ~0;
 	args->dircount = ~0;
 	args->count  = ntohl(*p++);
 	args->count  = ntohl(*p++);
+
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
+
 	args->count  = min_t(u32, args->count, PAGE_SIZE);
 	args->count  = min_t(u32, args->count, PAGE_SIZE);
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 
 
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int
@@ -590,6 +599,9 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
 	args->dircount = ntohl(*p++);
 	args->dircount = ntohl(*p++);
 	args->count    = ntohl(*p++);
 	args->count    = ntohl(*p++);
 
 
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
+
 	len = args->count = min(args->count, max_blocksize);
 	len = args->count = min(args->count, max_blocksize);
 	while (len > 0) {
 	while (len > 0) {
 		struct page *p = *(rqstp->rq_next_page++);
 		struct page *p = *(rqstp->rq_next_page++);
@@ -597,8 +609,7 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
 			args->buffer = page_address(p);
 			args->buffer = page_address(p);
 		len -= PAGE_SIZE;
 		len -= PAGE_SIZE;
 	}
 	}
-
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int

+ 2 - 1
fs/nfsd/nfs4proc.c

@@ -1259,7 +1259,8 @@ nfsd4_layout_verify(struct svc_export *exp, unsigned int layout_type)
 		return NULL;
 		return NULL;
 	}
 	}
 
 
-	if (!(exp->ex_layout_types & (1 << layout_type))) {
+	if (layout_type >= LAYOUT_TYPE_MAX ||
+	    !(exp->ex_layout_types & (1 << layout_type))) {
 		dprintk("%s: layout type %d not supported\n",
 		dprintk("%s: layout type %d not supported\n",
 			__func__, layout_type);
 			__func__, layout_type);
 		return NULL;
 		return NULL;

+ 6 - 19
fs/nfsd/nfs4state.c

@@ -1912,28 +1912,15 @@ static void copy_clid(struct nfs4_client *target, struct nfs4_client *source)
 	target->cl_clientid.cl_id = source->cl_clientid.cl_id; 
 	target->cl_clientid.cl_id = source->cl_clientid.cl_id; 
 }
 }
 
 
-int strdup_if_nonnull(char **target, char *source)
-{
-	if (source) {
-		*target = kstrdup(source, GFP_KERNEL);
-		if (!*target)
-			return -ENOMEM;
-	} else
-		*target = NULL;
-	return 0;
-}
-
 static int copy_cred(struct svc_cred *target, struct svc_cred *source)
 static int copy_cred(struct svc_cred *target, struct svc_cred *source)
 {
 {
-	int ret;
+	target->cr_principal = kstrdup(source->cr_principal, GFP_KERNEL);
+	target->cr_raw_principal = kstrdup(source->cr_raw_principal,
+								GFP_KERNEL);
+	if ((source->cr_principal && ! target->cr_principal) ||
+	    (source->cr_raw_principal && ! target->cr_raw_principal))
+		return -ENOMEM;
 
 
-	ret = strdup_if_nonnull(&target->cr_principal, source->cr_principal);
-	if (ret)
-		return ret;
-	ret = strdup_if_nonnull(&target->cr_raw_principal,
-					source->cr_raw_principal);
-	if (ret)
-		return ret;
 	target->cr_flavor = source->cr_flavor;
 	target->cr_flavor = source->cr_flavor;
 	target->cr_uid = source->cr_uid;
 	target->cr_uid = source->cr_uid;
 	target->cr_gid = source->cr_gid;
 	target->cr_gid = source->cr_gid;

+ 12 - 7
fs/nfsd/nfs4xdr.c

@@ -2831,9 +2831,14 @@ out_acl:
 	}
 	}
 #endif /* CONFIG_NFSD_PNFS */
 #endif /* CONFIG_NFSD_PNFS */
 	if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) {
 	if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) {
-		status = nfsd4_encode_bitmap(xdr, NFSD_SUPPATTR_EXCLCREAT_WORD0,
-						  NFSD_SUPPATTR_EXCLCREAT_WORD1,
-						  NFSD_SUPPATTR_EXCLCREAT_WORD2);
+		u32 supp[3];
+
+		memcpy(supp, nfsd_suppattrs[minorversion], sizeof(supp));
+		supp[0] &= NFSD_SUPPATTR_EXCLCREAT_WORD0;
+		supp[1] &= NFSD_SUPPATTR_EXCLCREAT_WORD1;
+		supp[2] &= NFSD_SUPPATTR_EXCLCREAT_WORD2;
+
+		status = nfsd4_encode_bitmap(xdr, supp[0], supp[1], supp[2]);
 		if (status)
 		if (status)
 			goto out;
 			goto out;
 	}
 	}
@@ -4119,8 +4124,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
 		struct nfsd4_getdeviceinfo *gdev)
 		struct nfsd4_getdeviceinfo *gdev)
 {
 {
 	struct xdr_stream *xdr = &resp->xdr;
 	struct xdr_stream *xdr = &resp->xdr;
-	const struct nfsd4_layout_ops *ops =
-		nfsd4_layout_ops[gdev->gd_layout_type];
+	const struct nfsd4_layout_ops *ops;
 	u32 starting_len = xdr->buf->len, needed_len;
 	u32 starting_len = xdr->buf->len, needed_len;
 	__be32 *p;
 	__be32 *p;
 
 
@@ -4137,6 +4141,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
 
 
 	/* If maxcount is 0 then just update notifications */
 	/* If maxcount is 0 then just update notifications */
 	if (gdev->gd_maxcount != 0) {
 	if (gdev->gd_maxcount != 0) {
+		ops = nfsd4_layout_ops[gdev->gd_layout_type];
 		nfserr = ops->encode_getdeviceinfo(xdr, gdev);
 		nfserr = ops->encode_getdeviceinfo(xdr, gdev);
 		if (nfserr) {
 		if (nfserr) {
 			/*
 			/*
@@ -4189,8 +4194,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
 		struct nfsd4_layoutget *lgp)
 		struct nfsd4_layoutget *lgp)
 {
 {
 	struct xdr_stream *xdr = &resp->xdr;
 	struct xdr_stream *xdr = &resp->xdr;
-	const struct nfsd4_layout_ops *ops =
-		nfsd4_layout_ops[lgp->lg_layout_type];
+	const struct nfsd4_layout_ops *ops;
 	__be32 *p;
 	__be32 *p;
 
 
 	dprintk("%s: err %d\n", __func__, nfserr);
 	dprintk("%s: err %d\n", __func__, nfserr);
@@ -4213,6 +4217,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
 	*p++ = cpu_to_be32(lgp->lg_seg.iomode);
 	*p++ = cpu_to_be32(lgp->lg_seg.iomode);
 	*p++ = cpu_to_be32(lgp->lg_layout_type);
 	*p++ = cpu_to_be32(lgp->lg_layout_type);
 
 
+	ops = nfsd4_layout_ops[lgp->lg_layout_type];
 	nfserr = ops->encode_layoutget(xdr, lgp);
 	nfserr = ops->encode_layoutget(xdr, lgp);
 out:
 out:
 	kfree(lgp->lg_content);
 	kfree(lgp->lg_content);

+ 10 - 3
fs/nfsd/nfsxdr.c

@@ -257,6 +257,9 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
 	len = args->count     = ntohl(*p++);
 	len = args->count     = ntohl(*p++);
 	p++; /* totalcount - unused */
 	p++; /* totalcount - unused */
 
 
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
+
 	len = min_t(unsigned int, len, NFSSVC_MAXBLKSIZE_V2);
 	len = min_t(unsigned int, len, NFSSVC_MAXBLKSIZE_V2);
 
 
 	/* set up somewhere to store response.
 	/* set up somewhere to store response.
@@ -272,7 +275,7 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
 		v++;
 		v++;
 	}
 	}
 	args->vlen = v;
 	args->vlen = v;
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int
@@ -362,9 +365,11 @@ nfssvc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p, struct nfsd_readli
 	p = decode_fh(p, &args->fh);
 	p = decode_fh(p, &args->fh);
 	if (!p)
 	if (!p)
 		return 0;
 		return 0;
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 
 
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 int
 int
@@ -402,9 +407,11 @@ nfssvc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
 	args->cookie = ntohl(*p++);
 	args->cookie = ntohl(*p++);
 	args->count  = ntohl(*p++);
 	args->count  = ntohl(*p++);
 	args->count  = min_t(u32, args->count, PAGE_SIZE);
 	args->count  = min_t(u32, args->count, PAGE_SIZE);
+	if (!xdr_argsize_check(rqstp, p))
+		return 0;
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 	args->buffer = page_address(*(rqstp->rq_next_page++));
 
 
-	return xdr_argsize_check(rqstp, p);
+	return 1;
 }
 }
 
 
 /*
 /*

+ 20 - 4
fs/nfsd/vfs.c

@@ -94,6 +94,12 @@ nfsd_cross_mnt(struct svc_rqst *rqstp, struct dentry **dpp,
 	err = follow_down(&path);
 	err = follow_down(&path);
 	if (err < 0)
 	if (err < 0)
 		goto out;
 		goto out;
+	if (path.mnt == exp->ex_path.mnt && path.dentry == dentry &&
+	    nfsd_mountpoint(dentry, exp) == 2) {
+		/* This is only a mountpoint in some other namespace */
+		path_put(&path);
+		goto out;
+	}
 
 
 	exp2 = rqst_exp_get_by_name(rqstp, &path);
 	exp2 = rqst_exp_get_by_name(rqstp, &path);
 	if (IS_ERR(exp2)) {
 	if (IS_ERR(exp2)) {
@@ -167,16 +173,26 @@ static int nfsd_lookup_parent(struct svc_rqst *rqstp, struct dentry *dparent, st
 /*
 /*
  * For nfsd purposes, we treat V4ROOT exports as though there was an
  * For nfsd purposes, we treat V4ROOT exports as though there was an
  * export at *every* directory.
  * export at *every* directory.
+ * We return:
+ * '1' if this dentry *must* be an export point,
+ * '2' if it might be, if there is really a mount here, and
+ * '0' if there is no chance of an export point here.
  */
  */
 int nfsd_mountpoint(struct dentry *dentry, struct svc_export *exp)
 int nfsd_mountpoint(struct dentry *dentry, struct svc_export *exp)
 {
 {
-	if (d_mountpoint(dentry))
+	if (!d_inode(dentry))
+		return 0;
+	if (exp->ex_flags & NFSEXP_V4ROOT)
 		return 1;
 		return 1;
 	if (nfsd4_is_junction(dentry))
 	if (nfsd4_is_junction(dentry))
 		return 1;
 		return 1;
-	if (!(exp->ex_flags & NFSEXP_V4ROOT))
-		return 0;
-	return d_inode(dentry) != NULL;
+	if (d_mountpoint(dentry))
+		/*
+		 * Might only be a mountpoint in a different namespace,
+		 * but we need to check.
+		 */
+		return 2;
+	return 0;
 }
 }
 
 
 __be32
 __be32

+ 3 - 0
include/linux/sunrpc/rpc_rdma.h

@@ -143,6 +143,9 @@ enum rpcrdma_proc {
 #define rdma_done	cpu_to_be32(RDMA_DONE)
 #define rdma_done	cpu_to_be32(RDMA_DONE)
 #define rdma_error	cpu_to_be32(RDMA_ERROR)
 #define rdma_error	cpu_to_be32(RDMA_ERROR)
 
 
+#define err_vers	cpu_to_be32(ERR_VERS)
+#define err_chunk	cpu_to_be32(ERR_CHUNK)
+
 /*
 /*
  * Private extension to RPC-over-RDMA Version One.
  * Private extension to RPC-over-RDMA Version One.
  * Message passed during RDMA-CM connection set-up.
  * Message passed during RDMA-CM connection set-up.

+ 2 - 2
include/linux/sunrpc/svc.h

@@ -336,8 +336,7 @@ xdr_argsize_check(struct svc_rqst *rqstp, __be32 *p)
 {
 {
 	char *cp = (char *)p;
 	char *cp = (char *)p;
 	struct kvec *vec = &rqstp->rq_arg.head[0];
 	struct kvec *vec = &rqstp->rq_arg.head[0];
-	return cp >= (char*)vec->iov_base
-		&& cp <= (char*)vec->iov_base + vec->iov_len;
+	return cp == (char *)vec->iov_base + vec->iov_len;
 }
 }
 
 
 static inline int
 static inline int
@@ -474,6 +473,7 @@ void		   svc_pool_map_put(void);
 struct svc_serv *  svc_create_pooled(struct svc_program *, unsigned int,
 struct svc_serv *  svc_create_pooled(struct svc_program *, unsigned int,
 			struct svc_serv_ops *);
 			struct svc_serv_ops *);
 int		   svc_set_num_threads(struct svc_serv *, struct svc_pool *, int);
 int		   svc_set_num_threads(struct svc_serv *, struct svc_pool *, int);
+int		   svc_set_num_threads_sync(struct svc_serv *, struct svc_pool *, int);
 int		   svc_pool_stats_open(struct svc_serv *serv, struct file *file);
 int		   svc_pool_stats_open(struct svc_serv *serv, struct file *file);
 void		   svc_destroy(struct svc_serv *);
 void		   svc_destroy(struct svc_serv *);
 void		   svc_shutdown_net(struct svc_serv *, struct net *);
 void		   svc_shutdown_net(struct svc_serv *, struct net *);

+ 27 - 48
include/linux/sunrpc/svc_rdma.h

@@ -48,6 +48,12 @@
 #include <rdma/rdma_cm.h>
 #include <rdma/rdma_cm.h>
 #define SVCRDMA_DEBUG
 #define SVCRDMA_DEBUG
 
 
+/* Default and maximum inline threshold sizes */
+enum {
+	RPCRDMA_DEF_INLINE_THRESH = 4096,
+	RPCRDMA_MAX_INLINE_THRESH = 65536
+};
+
 /* RPC/RDMA parameters and stats */
 /* RPC/RDMA parameters and stats */
 extern unsigned int svcrdma_ord;
 extern unsigned int svcrdma_ord;
 extern unsigned int svcrdma_max_requests;
 extern unsigned int svcrdma_max_requests;
@@ -85,27 +91,11 @@ struct svc_rdma_op_ctxt {
 	enum dma_data_direction direction;
 	enum dma_data_direction direction;
 	int count;
 	int count;
 	unsigned int mapped_sges;
 	unsigned int mapped_sges;
-	struct ib_sge sge[RPCSVC_MAXPAGES];
+	struct ib_send_wr send_wr;
+	struct ib_sge sge[1 + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
 	struct page *pages[RPCSVC_MAXPAGES];
 	struct page *pages[RPCSVC_MAXPAGES];
 };
 };
 
 
-/*
- * NFS_ requests are mapped on the client side by the chunk lists in
- * the RPCRDMA header. During the fetching of the RPC from the client
- * and the writing of the reply to the client, the memory in the
- * client and the memory in the server must be mapped as contiguous
- * vaddr/len for access by the hardware. These data strucures keep
- * these mappings.
- *
- * For an RDMA_WRITE, the 'sge' maps the RPC REPLY. For RDMA_READ, the
- * 'sge' in the svc_rdma_req_map maps the server side RPC reply and the
- * 'ch' field maps the read-list of the RPCRDMA header to the 'sge'
- * mapping of the reply.
- */
-struct svc_rdma_chunk_sge {
-	int start;		/* sge no for this chunk */
-	int count;		/* sge count for this chunk */
-};
 struct svc_rdma_fastreg_mr {
 struct svc_rdma_fastreg_mr {
 	struct ib_mr *mr;
 	struct ib_mr *mr;
 	struct scatterlist *sg;
 	struct scatterlist *sg;
@@ -114,15 +104,7 @@ struct svc_rdma_fastreg_mr {
 	enum dma_data_direction direction;
 	enum dma_data_direction direction;
 	struct list_head frmr_list;
 	struct list_head frmr_list;
 };
 };
-struct svc_rdma_req_map {
-	struct list_head free;
-	unsigned long count;
-	union {
-		struct kvec sge[RPCSVC_MAXPAGES];
-		struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
-		unsigned long lkey[RPCSVC_MAXPAGES];
-	};
-};
+
 #define RDMACTXT_F_LAST_CTXT	2
 #define RDMACTXT_F_LAST_CTXT	2
 
 
 #define	SVCRDMA_DEVCAP_FAST_REG		1	/* fast mr registration */
 #define	SVCRDMA_DEVCAP_FAST_REG		1	/* fast mr registration */
@@ -144,14 +126,15 @@ struct svcxprt_rdma {
 	u32		     sc_max_requests;	/* Max requests */
 	u32		     sc_max_requests;	/* Max requests */
 	u32		     sc_max_bc_requests;/* Backward credits */
 	u32		     sc_max_bc_requests;/* Backward credits */
 	int                  sc_max_req_size;	/* Size of each RQ WR buf */
 	int                  sc_max_req_size;	/* Size of each RQ WR buf */
+	u8		     sc_port_num;
 
 
 	struct ib_pd         *sc_pd;
 	struct ib_pd         *sc_pd;
 
 
 	spinlock_t	     sc_ctxt_lock;
 	spinlock_t	     sc_ctxt_lock;
 	struct list_head     sc_ctxts;
 	struct list_head     sc_ctxts;
 	int		     sc_ctxt_used;
 	int		     sc_ctxt_used;
-	spinlock_t	     sc_map_lock;
-	struct list_head     sc_maps;
+	spinlock_t	     sc_rw_ctxt_lock;
+	struct list_head     sc_rw_ctxts;
 
 
 	struct list_head     sc_rq_dto_q;
 	struct list_head     sc_rq_dto_q;
 	spinlock_t	     sc_rq_dto_lock;
 	spinlock_t	     sc_rq_dto_lock;
@@ -181,9 +164,7 @@ struct svcxprt_rdma {
 /* The default ORD value is based on two outstanding full-size writes with a
 /* The default ORD value is based on two outstanding full-size writes with a
  * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ.  */
  * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ.  */
 #define RPCRDMA_ORD             (64/4)
 #define RPCRDMA_ORD             (64/4)
-#define RPCRDMA_SQ_DEPTH_MULT   8
 #define RPCRDMA_MAX_REQUESTS    32
 #define RPCRDMA_MAX_REQUESTS    32
-#define RPCRDMA_MAX_REQ_SIZE    4096
 
 
 /* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our
 /* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our
  * current NFSv4.1 implementation supports one backchannel slot.
  * current NFSv4.1 implementation supports one backchannel slot.
@@ -201,19 +182,11 @@ static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
 
 
 /* svc_rdma_backchannel.c */
 /* svc_rdma_backchannel.c */
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
-				    struct rpcrdma_msg *rmsgp,
+				    __be32 *rdma_resp,
 				    struct xdr_buf *rcvbuf);
 				    struct xdr_buf *rcvbuf);
 
 
 /* svc_rdma_marshal.c */
 /* svc_rdma_marshal.c */
 extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
 extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
-extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
-				     struct rpcrdma_msg *,
-				     enum rpcrdma_errcode, __be32 *);
-extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
-extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
-extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
-					    __be32, __be64, u32);
-extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp);
 
 
 /* svc_rdma_recvfrom.c */
 /* svc_rdma_recvfrom.c */
 extern int svc_rdma_recvfrom(struct svc_rqst *);
 extern int svc_rdma_recvfrom(struct svc_rqst *);
@@ -224,16 +197,25 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
 				struct svc_rdma_op_ctxt *, int *, u32 *,
 				struct svc_rdma_op_ctxt *, int *, u32 *,
 				u32, u32, u64, bool);
 				u32, u32, u64, bool);
 
 
+/* svc_rdma_rw.c */
+extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
+				     __be32 *wr_ch, struct xdr_buf *xdr);
+extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
+				     __be32 *rp_ch, bool writelist,
+				     struct xdr_buf *xdr);
+
 /* svc_rdma_sendto.c */
 /* svc_rdma_sendto.c */
-extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
-			    struct svc_rdma_req_map *, bool);
+extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+				  struct svc_rdma_op_ctxt *ctxt,
+				  __be32 *rdma_resp, unsigned int len);
+extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
+				 struct svc_rdma_op_ctxt *ctxt,
+				 int num_sge, u32 inv_rkey);
 extern int svc_rdma_sendto(struct svc_rqst *);
 extern int svc_rdma_sendto(struct svc_rqst *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
-				int);
 
 
 /* svc_rdma_transport.c */
 /* svc_rdma_transport.c */
 extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
@@ -244,9 +226,6 @@ extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
 extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
 extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
 extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
 extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
 extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
 extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
-extern struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *);
-extern void svc_rdma_put_req_map(struct svcxprt_rdma *,
-				 struct svc_rdma_req_map *);
 extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *);
 extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *);
 extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
 extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
 			      struct svc_rdma_fastreg_mr *);
 			      struct svc_rdma_fastreg_mr *);

+ 8 - 6
include/uapi/linux/nfsd/cld.h

@@ -22,6 +22,8 @@
 #ifndef _NFSD_CLD_H
 #ifndef _NFSD_CLD_H
 #define _NFSD_CLD_H
 #define _NFSD_CLD_H
 
 
+#include <linux/types.h>
+
 /* latest upcall version available */
 /* latest upcall version available */
 #define CLD_UPCALL_VERSION 1
 #define CLD_UPCALL_VERSION 1
 
 
@@ -37,18 +39,18 @@ enum cld_command {
 
 
 /* representation of long-form NFSv4 client ID */
 /* representation of long-form NFSv4 client ID */
 struct cld_name {
 struct cld_name {
-	uint16_t	cn_len;				/* length of cm_id */
+	__u16		cn_len;				/* length of cm_id */
 	unsigned char	cn_id[NFS4_OPAQUE_LIMIT];	/* client-provided */
 	unsigned char	cn_id[NFS4_OPAQUE_LIMIT];	/* client-provided */
 } __attribute__((packed));
 } __attribute__((packed));
 
 
 /* message struct for communication with userspace */
 /* message struct for communication with userspace */
 struct cld_msg {
 struct cld_msg {
-	uint8_t		cm_vers;		/* upcall version */
-	uint8_t		cm_cmd;			/* upcall command */
-	int16_t		cm_status;		/* return code */
-	uint32_t	cm_xid;			/* transaction id */
+	__u8		cm_vers;		/* upcall version */
+	__u8		cm_cmd;			/* upcall command */
+	__s16		cm_status;		/* return code */
+	__u32		cm_xid;			/* transaction id */
 	union {
 	union {
-		int64_t		cm_gracetime;	/* grace period start time */
+		__s64		cm_gracetime;	/* grace period start time */
 		struct cld_name	cm_name;
 		struct cld_name	cm_name;
 	} __attribute__((packed)) cm_u;
 	} __attribute__((packed)) cm_u;
 } __attribute__((packed));
 } __attribute__((packed));

+ 1 - 0
net/sunrpc/Kconfig

@@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA
 	tristate "RPC-over-RDMA transport"
 	tristate "RPC-over-RDMA transport"
 	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
 	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
 	default SUNRPC && INFINIBAND
 	default SUNRPC && INFINIBAND
+	select SG_POOL
 	help
 	help
 	  This option allows the NFS client and server to use RDMA
 	  This option allows the NFS client and server to use RDMA
 	  transports (InfiniBand, iWARP, or RoCE).
 	  transports (InfiniBand, iWARP, or RoCE).

+ 96 - 38
net/sunrpc/svc.c

@@ -702,59 +702,32 @@ found_pool:
 	return task;
 	return task;
 }
 }
 
 
-/*
- * Create or destroy enough new threads to make the number
- * of threads the given number.  If `pool' is non-NULL, applies
- * only to threads in that pool, otherwise round-robins between
- * all pools.  Caller must ensure that mutual exclusion between this and
- * server startup or shutdown.
- *
- * Destroying threads relies on the service threads filling in
- * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
- * has been created using svc_create_pooled().
- *
- * Based on code that used to be in nfsd_svc() but tweaked
- * to be pool-aware.
- */
-int
-svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+/* create new threads */
+static int
+svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 {
 {
 	struct svc_rqst	*rqstp;
 	struct svc_rqst	*rqstp;
 	struct task_struct *task;
 	struct task_struct *task;
 	struct svc_pool *chosen_pool;
 	struct svc_pool *chosen_pool;
-	int error = 0;
 	unsigned int state = serv->sv_nrthreads-1;
 	unsigned int state = serv->sv_nrthreads-1;
 	int node;
 	int node;
 
 
-	if (pool == NULL) {
-		/* The -1 assumes caller has done a svc_get() */
-		nrservs -= (serv->sv_nrthreads-1);
-	} else {
-		spin_lock_bh(&pool->sp_lock);
-		nrservs -= pool->sp_nrthreads;
-		spin_unlock_bh(&pool->sp_lock);
-	}
-
-	/* create new threads */
-	while (nrservs > 0) {
+	do {
 		nrservs--;
 		nrservs--;
 		chosen_pool = choose_pool(serv, pool, &state);
 		chosen_pool = choose_pool(serv, pool, &state);
 
 
 		node = svc_pool_map_get_node(chosen_pool->sp_id);
 		node = svc_pool_map_get_node(chosen_pool->sp_id);
 		rqstp = svc_prepare_thread(serv, chosen_pool, node);
 		rqstp = svc_prepare_thread(serv, chosen_pool, node);
-		if (IS_ERR(rqstp)) {
-			error = PTR_ERR(rqstp);
-			break;
-		}
+		if (IS_ERR(rqstp))
+			return PTR_ERR(rqstp);
 
 
 		__module_get(serv->sv_ops->svo_module);
 		__module_get(serv->sv_ops->svo_module);
 		task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 		task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 					      node, "%s", serv->sv_name);
 					      node, "%s", serv->sv_name);
 		if (IS_ERR(task)) {
 		if (IS_ERR(task)) {
-			error = PTR_ERR(task);
 			module_put(serv->sv_ops->svo_module);
 			module_put(serv->sv_ops->svo_module);
 			svc_exit_thread(rqstp);
 			svc_exit_thread(rqstp);
-			break;
+			return PTR_ERR(task);
 		}
 		}
 
 
 		rqstp->rq_task = task;
 		rqstp->rq_task = task;
@@ -763,18 +736,103 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 
 
 		svc_sock_update_bufs(serv);
 		svc_sock_update_bufs(serv);
 		wake_up_process(task);
 		wake_up_process(task);
-	}
+	} while (nrservs > 0);
+
+	return 0;
+}
+
+
+/* destroy old threads */
+static int
+svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	struct task_struct *task;
+	unsigned int state = serv->sv_nrthreads-1;
+
 	/* destroy old threads */
 	/* destroy old threads */
-	while (nrservs < 0 &&
-	       (task = choose_victim(serv, pool, &state)) != NULL) {
+	do {
+		task = choose_victim(serv, pool, &state);
+		if (task == NULL)
+			break;
 		send_sig(SIGINT, task, 1);
 		send_sig(SIGINT, task, 1);
 		nrservs++;
 		nrservs++;
+	} while (nrservs < 0);
+
+	return 0;
+}
+
+/*
+ * Create or destroy enough new threads to make the number
+ * of threads the given number.  If `pool' is non-NULL, applies
+ * only to threads in that pool, otherwise round-robins between
+ * all pools.  Caller must ensure that mutual exclusion between this and
+ * server startup or shutdown.
+ *
+ * Destroying threads relies on the service threads filling in
+ * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
+ * has been created using svc_create_pooled().
+ *
+ * Based on code that used to be in nfsd_svc() but tweaked
+ * to be pool-aware.
+ */
+int
+svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	if (pool == NULL) {
+		/* The -1 assumes caller has done a svc_get() */
+		nrservs -= (serv->sv_nrthreads-1);
+	} else {
+		spin_lock_bh(&pool->sp_lock);
+		nrservs -= pool->sp_nrthreads;
+		spin_unlock_bh(&pool->sp_lock);
 	}
 	}
 
 
-	return error;
+	if (nrservs > 0)
+		return svc_start_kthreads(serv, pool, nrservs);
+	if (nrservs < 0)
+		return svc_signal_kthreads(serv, pool, nrservs);
+	return 0;
 }
 }
 EXPORT_SYMBOL_GPL(svc_set_num_threads);
 EXPORT_SYMBOL_GPL(svc_set_num_threads);
 
 
+/* destroy old threads */
+static int
+svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	struct task_struct *task;
+	unsigned int state = serv->sv_nrthreads-1;
+
+	/* destroy old threads */
+	do {
+		task = choose_victim(serv, pool, &state);
+		if (task == NULL)
+			break;
+		kthread_stop(task);
+		nrservs++;
+	} while (nrservs < 0);
+	return 0;
+}
+
+int
+svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	if (pool == NULL) {
+		/* The -1 assumes caller has done a svc_get() */
+		nrservs -= (serv->sv_nrthreads-1);
+	} else {
+		spin_lock_bh(&pool->sp_lock);
+		nrservs -= pool->sp_nrthreads;
+		spin_unlock_bh(&pool->sp_lock);
+	}
+
+	if (nrservs > 0)
+		return svc_start_kthreads(serv, pool, nrservs);
+	if (nrservs < 0)
+		return svc_stop_kthreads(serv, pool, nrservs);
+	return 0;
+}
+EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
+
 /*
 /*
  * Called from a server thread as it's exiting. Caller must hold the "service
  * Called from a server thread as it's exiting. Caller must hold the "service
  * mutex" for the service.
  * mutex" for the service.

+ 1 - 1
net/sunrpc/xprtrdma/Makefile

@@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	fmr_ops.o frwr_ops.o \
 	fmr_ops.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
-	module.o
+	svc_rdma_rw.o module.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o

+ 3 - 5
net/sunrpc/xprtrdma/svc_rdma.c

@@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
 unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
 unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
 static unsigned int min_max_requests = 4;
 static unsigned int min_max_requests = 4;
 static unsigned int max_max_requests = 16384;
 static unsigned int max_max_requests = 16384;
-unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
-static unsigned int min_max_inline = 4096;
-static unsigned int max_max_inline = 65536;
+unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH;
 
 
 atomic_t rdma_stat_recv;
 atomic_t rdma_stat_recv;
 atomic_t rdma_stat_read;
 atomic_t rdma_stat_read;
@@ -247,8 +247,6 @@ int svc_rdma_init(void)
 	dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
 	dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
 	dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
 	dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
 	dprintk("\tmax_requests     : %u\n", svcrdma_max_requests);
 	dprintk("\tmax_requests     : %u\n", svcrdma_max_requests);
-	dprintk("\tsq_depth         : %u\n",
-		svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
 	dprintk("\tmax_bc_requests  : %u\n", svcrdma_max_bc_requests);
 	dprintk("\tmax_bc_requests  : %u\n", svcrdma_max_bc_requests);
 	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
 	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
 
 

+ 29 - 42
net/sunrpc/xprtrdma/svc_rdma_backchannel.c

@@ -12,7 +12,17 @@
 
 
 #undef SVCRDMA_BACKCHANNEL_DEBUG
 #undef SVCRDMA_BACKCHANNEL_DEBUG
 
 
-int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
+/**
+ * svc_rdma_handle_bc_reply - Process incoming backchannel reply
+ * @xprt: controlling backchannel transport
+ * @rdma_resp: pointer to incoming transport header
+ * @rcvbuf: XDR buffer into which to decode the reply
+ *
+ * Returns:
+ *	%0 if @rcvbuf is filled in, xprt_complete_rqst called,
+ *	%-EAGAIN if server should call ->recvfrom again.
+ */
+int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 			     struct xdr_buf *rcvbuf)
 			     struct xdr_buf *rcvbuf)
 {
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 
 
 	p = (__be32 *)src->iov_base;
 	p = (__be32 *)src->iov_base;
 	len = src->iov_len;
 	len = src->iov_len;
-	xid = rmsgp->rm_xid;
+	xid = *rdma_resp;
 
 
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
 	pr_info("%s: xid=%08x, length=%zu\n",
 	pr_info("%s: xid=%08x, length=%zu\n",
 		__func__, be32_to_cpu(xid), len);
 		__func__, be32_to_cpu(xid), len);
 	pr_info("%s: RPC/RDMA: %*ph\n",
 	pr_info("%s: RPC/RDMA: %*ph\n",
-		__func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
+		__func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp);
 	pr_info("%s:      RPC: %*ph\n",
 	pr_info("%s:      RPC: %*ph\n",
 		__func__, (int)len, p);
 		__func__, (int)len, p);
 #endif
 #endif
@@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 		goto out_unlock;
 		goto out_unlock;
 	memcpy(dst->iov_base, p, len);
 	memcpy(dst->iov_base, p, len);
 
 
-	credits = be32_to_cpu(rmsgp->rm_credit);
+	credits = be32_to_cpup(rdma_resp + 2);
 	if (credits == 0)
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
 		credits = 1;	/* don't deadlock */
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
@@ -90,9 +100,9 @@ out_notfound:
  * Caller holds the connection's mutex and has already marshaled
  * Caller holds the connection's mutex and has already marshaled
  * the RPC/RDMA request.
  * the RPC/RDMA request.
  *
  *
- * This is similar to svc_rdma_reply, but takes an rpc_rqst
- * instead, does not support chunks, and avoids blocking memory
- * allocation.
+ * This is similar to svc_rdma_send_reply_msg, but takes a struct
+ * rpc_rqst instead, does not support chunks, and avoids blocking
+ * memory allocation.
  *
  *
  * XXX: There is still an opportunity to block in svc_rdma_send()
  * XXX: There is still an opportunity to block in svc_rdma_send()
  * if there are no SQ entries to post the Send. This may occur if
  * if there are no SQ entries to post the Send. This may occur if
@@ -101,59 +111,36 @@ out_notfound:
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 			      struct rpc_rqst *rqst)
 			      struct rpc_rqst *rqst)
 {
 {
-	struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
 	struct svc_rdma_op_ctxt *ctxt;
 	struct svc_rdma_op_ctxt *ctxt;
-	struct svc_rdma_req_map *vec;
-	struct ib_send_wr send_wr;
 	int ret;
 	int ret;
 
 
-	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
-	if (ret)
+	ctxt = svc_rdma_get_context(rdma);
+
+	/* rpcrdma_bc_send_request builds the transport header and
+	 * the backchannel RPC message in the same buffer. Thus only
+	 * one SGE is needed to send both.
+	 */
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
+				     rqst->rq_snd_buf.len);
+	if (ret < 0)
 		goto out_err;
 		goto out_err;
 
 
 	ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
 	ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
 	if (ret)
 	if (ret)
 		goto out_err;
 		goto out_err;
 
 
-	ctxt = svc_rdma_get_context(rdma);
-	ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
-	ctxt->count = 1;
-
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length = sndbuf->len;
-	ctxt->sge[0].addr =
-	    ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
-			    sndbuf->len, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
-		ret = -EIO;
-		goto out_unmap;
-	}
-	svc_rdma_count_mappings(rdma, ctxt);
-
-	memset(&send_wr, 0, sizeof(send_wr));
-	ctxt->cqe.done = svc_rdma_wc_send;
-	send_wr.wr_cqe = &ctxt->cqe;
-	send_wr.sg_list = ctxt->sge;
-	send_wr.num_sge = 1;
-	send_wr.opcode = IB_WR_SEND;
-	send_wr.send_flags = IB_SEND_SIGNALED;
-
-	ret = svc_rdma_send(rdma, &send_wr);
-	if (ret) {
-		ret = -EIO;
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0);
+	if (ret)
 		goto out_unmap;
 		goto out_unmap;
-	}
 
 
 out_err:
 out_err:
-	svc_rdma_put_req_map(rdma, vec);
 	dprintk("svcrdma: %s returns %d\n", __func__, ret);
 	dprintk("svcrdma: %s returns %d\n", __func__, ret);
 	return ret;
 	return ret;
 
 
 out_unmap:
 out_unmap:
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 1);
 	svc_rdma_put_context(ctxt, 1);
+	ret = -EIO;
 	goto out_err;
 	goto out_err;
 }
 }
 
 

+ 0 - 89
net/sunrpc/xprtrdma/svc_rdma_marshal.c

@@ -166,92 +166,3 @@ out_inval:
 	dprintk("svcrdma: failed to parse transport header\n");
 	dprintk("svcrdma: failed to parse transport header\n");
 	return -EINVAL;
 	return -EINVAL;
 }
 }
-
-int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
-			      struct rpcrdma_msg *rmsgp,
-			      enum rpcrdma_errcode err, __be32 *va)
-{
-	__be32 *startp = va;
-
-	*va++ = rmsgp->rm_xid;
-	*va++ = rmsgp->rm_vers;
-	*va++ = xprt->sc_fc_credits;
-	*va++ = rdma_error;
-	*va++ = cpu_to_be32(err);
-	if (err == ERR_VERS) {
-		*va++ = rpcrdma_version;
-		*va++ = rpcrdma_version;
-	}
-
-	return (int)((unsigned long)va - (unsigned long)startp);
-}
-
-/**
- * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header
- * @rdma_resp: buffer containing Reply transport header
- *
- * Returns length of transport header, in bytes.
- */
-unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp)
-{
-	unsigned int nsegs;
-	__be32 *p;
-
-	p = rdma_resp;
-
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p += rpcrdma_fixed_maxsz + 1;
-
-	/* Skip Write list. */
-	while (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	/* Skip Reply chunk. */
-	if (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	return (unsigned long)p - (unsigned long)rdma_resp;
-}
-
-void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
-{
-	struct rpcrdma_write_array *ary;
-
-	/* no read-list */
-	rmsgp->rm_body.rm_chunks[0] = xdr_zero;
-
-	/* write-array discrim */
-	ary = (struct rpcrdma_write_array *)
-		&rmsgp->rm_body.rm_chunks[1];
-	ary->wc_discrim = xdr_one;
-	ary->wc_nchunks = cpu_to_be32(chunks);
-
-	/* write-list terminator */
-	ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
-
-	/* reply-array discriminator */
-	ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
-}
-
-void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
-				 int chunks)
-{
-	ary->wc_discrim = xdr_one;
-	ary->wc_nchunks = cpu_to_be32(chunks);
-}
-
-void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
-				     int chunk_no,
-				     __be32 rs_handle,
-				     __be64 rs_offset,
-				     u32 write_len)
-{
-	struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
-	seg->rs_handle = rs_handle;
-	seg->rs_offset = rs_offset;
-	seg->rs_length = cpu_to_be32(write_len);
-}

+ 66 - 13
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c

@@ -558,33 +558,85 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
 	rqstp->rq_arg.buflen = head->arg.buflen;
 	rqstp->rq_arg.buflen = head->arg.buflen;
 }
 }
 
 
+static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
+				__be32 *rdma_argp, int status)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	__be32 *p, *err_msgp;
+	unsigned int length;
+	struct page *page;
+	int ret;
+
+	ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+	if (ret)
+		return;
+
+	page = alloc_page(GFP_KERNEL);
+	if (!page)
+		return;
+	err_msgp = page_address(page);
+
+	p = err_msgp;
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
+	*p++ = xprt->sc_fc_credits;
+	*p++ = rdma_error;
+	if (status == -EPROTONOSUPPORT) {
+		*p++ = err_vers;
+		*p++ = rpcrdma_version;
+		*p++ = rpcrdma_version;
+	} else {
+		*p++ = err_chunk;
+	}
+	length = (unsigned long)p - (unsigned long)err_msgp;
+
+	/* Map transport header; no RPC message payload */
+	ctxt = svc_rdma_get_context(xprt);
+	ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
+	if (ret) {
+		dprintk("svcrdma: Error %d mapping send for protocol error\n",
+			ret);
+		return;
+	}
+
+	ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
+	if (ret) {
+		dprintk("svcrdma: Error %d posting send for protocol error\n",
+			ret);
+		svc_rdma_unmap_dma(ctxt);
+		svc_rdma_put_context(ctxt, 1);
+	}
+}
+
 /* By convention, backchannel calls arrive via rdma_msg type
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
  * messages, and never populate the chunk lists. This makes
  * the RPC/RDMA header small and fixed in size, so it is
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  * straightforward to check the RPC header's direction field.
  */
  */
-static bool
-svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
+static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
+					  __be32 *rdma_resp)
 {
 {
-	__be32 *p = (__be32 *)rmsgp;
+	__be32 *p;
 
 
 	if (!xprt->xpt_bc_xprt)
 	if (!xprt->xpt_bc_xprt)
 		return false;
 		return false;
 
 
-	if (rmsgp->rm_type != rdma_msg)
+	p = rdma_resp + 3;
+	if (*p++ != rdma_msg)
 		return false;
 		return false;
-	if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
+
+	if (*p++ != xdr_zero)
 		return false;
 		return false;
-	if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
 		return false;
-	if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
 		return false;
 
 
-	/* sanity */
-	if (p[7] != rmsgp->rm_xid)
+	/* XID sanity */
+	if (*p++ != *rdma_resp)
 		return false;
 		return false;
 	/* call direction */
 	/* call direction */
-	if (p[8] == cpu_to_be32(RPC_CALL))
+	if (*p == cpu_to_be32(RPC_CALL))
 		return false;
 		return false;
 
 
 	return true;
 	return true;
@@ -650,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 		goto out_drop;
 		goto out_drop;
 	rqstp->rq_xprt_hlen = ret;
 	rqstp->rq_xprt_hlen = ret;
 
 
-	if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
-		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
+	if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
+		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
+					       &rmsgp->rm_xid,
 					       &rqstp->rq_arg);
 					       &rqstp->rq_arg);
 		svc_rdma_put_context(ctxt, 0);
 		svc_rdma_put_context(ctxt, 0);
 		if (ret)
 		if (ret)
@@ -686,7 +739,7 @@ complete:
 	return ret;
 	return ret;
 
 
 out_err:
 out_err:
-	svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+	svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
 	svc_rdma_put_context(ctxt, 0);
 	svc_rdma_put_context(ctxt, 0);
 	return 0;
 	return 0;
 
 

+ 512 - 0
net/sunrpc/xprtrdma/svc_rdma_rw.c

@@ -0,0 +1,512 @@
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
+ */
+
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/debug.h>
+
+#include <rdma/rw.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* Each R/W context contains state for one chain of RDMA Read or
+ * Write Work Requests.
+ *
+ * Each WR chain handles a single contiguous server-side buffer,
+ * because scatterlist entries after the first have to start on
+ * page alignment. xdr_buf iovecs cannot guarantee alignment.
+ *
+ * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
+ * from a client may contain a unique R_key, so each WR chain moves
+ * up to one segment at a time.
+ *
+ * The scatterlist makes this data structure over 4KB in size. To
+ * make it less likely to fail, and to handle the allocation for
+ * smaller I/O requests without disabling bottom-halves, these
+ * contexts are created on demand, but cached and reused until the
+ * controlling svcxprt_rdma is destroyed.
+ */
+struct svc_rdma_rw_ctxt {
+	struct list_head	rw_list;
+	struct rdma_rw_ctx	rw_ctx;
+	int			rw_nents;
+	struct sg_table		rw_sg_table;
+	struct scatterlist	rw_first_sgl[0];
+};
+
+static inline struct svc_rdma_rw_ctxt *
+svc_rdma_next_ctxt(struct list_head *list)
+{
+	return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
+					rw_list);
+}
+
+static struct svc_rdma_rw_ctxt *
+svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+
+	ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
+	if (ctxt) {
+		list_del(&ctxt->rw_list);
+		spin_unlock(&rdma->sc_rw_ctxt_lock);
+	} else {
+		spin_unlock(&rdma->sc_rw_ctxt_lock);
+		ctxt = kmalloc(sizeof(*ctxt) +
+			       SG_CHUNK_SIZE * sizeof(struct scatterlist),
+			       GFP_KERNEL);
+		if (!ctxt)
+			goto out;
+		INIT_LIST_HEAD(&ctxt->rw_list);
+	}
+
+	ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
+	if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
+				   ctxt->rw_sg_table.sgl)) {
+		kfree(ctxt);
+		ctxt = NULL;
+	}
+out:
+	return ctxt;
+}
+
+static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
+				 struct svc_rdma_rw_ctxt *ctxt)
+{
+	sg_free_table_chained(&ctxt->rw_sg_table, true);
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+	list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
+	spin_unlock(&rdma->sc_rw_ctxt_lock);
+}
+
+/**
+ * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
+ * @rdma: transport about to be destroyed
+ *
+ */
+void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
+		list_del(&ctxt->rw_list);
+		kfree(ctxt);
+	}
+}
+
+/* A chunk context tracks all I/O for moving one Read or Write
+ * chunk. This is a a set of rdma_rw's that handle data movement
+ * for all segments of one chunk.
+ *
+ * These are small, acquired with a single allocator call, and
+ * no more than one is needed per chunk. They are allocated on
+ * demand, and not cached.
+ */
+struct svc_rdma_chunk_ctxt {
+	struct ib_cqe		cc_cqe;
+	struct svcxprt_rdma	*cc_rdma;
+	struct list_head	cc_rwctxts;
+	int			cc_sqecount;
+	enum dma_data_direction cc_dir;
+};
+
+static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
+			     struct svc_rdma_chunk_ctxt *cc,
+			     enum dma_data_direction dir)
+{
+	cc->cc_rdma = rdma;
+	svc_xprt_get(&rdma->sc_xprt);
+
+	INIT_LIST_HEAD(&cc->cc_rwctxts);
+	cc->cc_sqecount = 0;
+	cc->cc_dir = dir;
+}
+
+static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
+{
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
+		list_del(&ctxt->rw_list);
+
+		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
+				    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+				    ctxt->rw_nents, cc->cc_dir);
+		svc_rdma_put_rw_ctxt(rdma, ctxt);
+	}
+	svc_xprt_put(&rdma->sc_xprt);
+}
+
+/* State for sending a Write or Reply chunk.
+ *  - Tracks progress of writing one chunk over all its segments
+ *  - Stores arguments for the SGL constructor functions
+ */
+struct svc_rdma_write_info {
+	/* write state of this chunk */
+	unsigned int		wi_seg_off;
+	unsigned int		wi_seg_no;
+	unsigned int		wi_nsegs;
+	__be32			*wi_segs;
+
+	/* SGL constructor arguments */
+	struct xdr_buf		*wi_xdr;
+	unsigned char		*wi_base;
+	unsigned int		wi_next_off;
+
+	struct svc_rdma_chunk_ctxt	wi_cc;
+};
+
+static struct svc_rdma_write_info *
+svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
+{
+	struct svc_rdma_write_info *info;
+
+	info = kmalloc(sizeof(*info), GFP_KERNEL);
+	if (!info)
+		return info;
+
+	info->wi_seg_off = 0;
+	info->wi_seg_no = 0;
+	info->wi_nsegs = be32_to_cpup(++chunk);
+	info->wi_segs = ++chunk;
+	svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+	return info;
+}
+
+static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
+{
+	svc_rdma_cc_release(&info->wi_cc);
+	kfree(info);
+}
+
+/**
+ * svc_rdma_write_done - Write chunk completion
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ * Pages under I/O are freed by a subsequent Send completion.
+ */
+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_chunk_ctxt *cc =
+			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_write_info *info =
+			container_of(cc, struct svc_rdma_write_info, wi_cc);
+
+	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+		if (wc->status != IB_WC_WR_FLUSH_ERR)
+			pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
+			       ib_wc_status_msg(wc->status),
+			       wc->status, wc->vendor_err);
+	}
+
+	svc_rdma_write_info_free(info);
+}
+
+/* This function sleeps when the transport's Send Queue is congested.
+ *
+ * Assumptions:
+ * - If ib_post_send() succeeds, only one completion is expected,
+ *   even if one or more WRs are flushed. This is true when posting
+ *   an rdma_rw_ctx or when posting a single signaled WR.
+ */
+static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
+{
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_xprt *xprt = &rdma->sc_xprt;
+	struct ib_send_wr *first_wr, *bad_wr;
+	struct list_head *tmp;
+	struct ib_cqe *cqe;
+	int ret;
+
+	first_wr = NULL;
+	cqe = &cc->cc_cqe;
+	list_for_each(tmp, &cc->cc_rwctxts) {
+		struct svc_rdma_rw_ctxt *ctxt;
+
+		ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
+		first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
+					   rdma->sc_port_num, cqe, first_wr);
+		cqe = NULL;
+	}
+
+	do {
+		if (atomic_sub_return(cc->cc_sqecount,
+				      &rdma->sc_sq_avail) > 0) {
+			ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+			if (ret)
+				break;
+			return 0;
+		}
+
+		atomic_inc(&rdma_stat_sq_starve);
+		atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+		wait_event(rdma->sc_send_wait,
+			   atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
+	} while (1);
+
+	pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+
+	/* If even one was posted, there will be a completion. */
+	if (bad_wr != first_wr)
+		return 0;
+
+	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+	return -ENOTCONN;
+}
+
+/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
+ */
+static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
+			       unsigned int len,
+			       struct svc_rdma_rw_ctxt *ctxt)
+{
+	struct scatterlist *sg = ctxt->rw_sg_table.sgl;
+
+	sg_set_buf(&sg[0], info->wi_base, len);
+	info->wi_base += len;
+
+	ctxt->rw_nents = 1;
+}
+
+/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
+ */
+static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
+				    unsigned int remaining,
+				    struct svc_rdma_rw_ctxt *ctxt)
+{
+	unsigned int sge_no, sge_bytes, page_off, page_no;
+	struct xdr_buf *xdr = info->wi_xdr;
+	struct scatterlist *sg;
+	struct page **page;
+
+	page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
+	page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+	page = xdr->pages + page_no;
+	info->wi_next_off += remaining;
+	sg = ctxt->rw_sg_table.sgl;
+	sge_no = 0;
+	do {
+		sge_bytes = min_t(unsigned int, remaining,
+				  PAGE_SIZE - page_off);
+		sg_set_page(sg, *page, sge_bytes, page_off);
+
+		remaining -= sge_bytes;
+		sg = sg_next(sg);
+		page_off = 0;
+		sge_no++;
+		page++;
+	} while (remaining);
+
+	ctxt->rw_nents = sge_no;
+}
+
+/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
+ * an RPC Reply.
+ */
+static int
+svc_rdma_build_writes(struct svc_rdma_write_info *info,
+		      void (*constructor)(struct svc_rdma_write_info *info,
+					  unsigned int len,
+					  struct svc_rdma_rw_ctxt *ctxt),
+		      unsigned int remaining)
+{
+	struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_rw_ctxt *ctxt;
+	__be32 *seg;
+	int ret;
+
+	cc->cc_cqe.done = svc_rdma_write_done;
+	seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
+	do {
+		unsigned int write_len;
+		u32 seg_length, seg_handle;
+		u64 seg_offset;
+
+		if (info->wi_seg_no >= info->wi_nsegs)
+			goto out_overflow;
+
+		seg_handle = be32_to_cpup(seg);
+		seg_length = be32_to_cpup(seg + 1);
+		xdr_decode_hyper(seg + 2, &seg_offset);
+		seg_offset += info->wi_seg_off;
+
+		write_len = min(remaining, seg_length - info->wi_seg_off);
+		ctxt = svc_rdma_get_rw_ctxt(rdma,
+					    (write_len >> PAGE_SHIFT) + 2);
+		if (!ctxt)
+			goto out_noctx;
+
+		constructor(info, write_len, ctxt);
+		ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
+				       rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+				       ctxt->rw_nents, 0, seg_offset,
+				       seg_handle, DMA_TO_DEVICE);
+		if (ret < 0)
+			goto out_initerr;
+
+		list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+		cc->cc_sqecount += ret;
+		if (write_len == seg_length - info->wi_seg_off) {
+			seg += 4;
+			info->wi_seg_no++;
+			info->wi_seg_off = 0;
+		} else {
+			info->wi_seg_off += write_len;
+		}
+		remaining -= write_len;
+	} while (remaining);
+
+	return 0;
+
+out_overflow:
+	dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
+		info->wi_nsegs);
+	return -E2BIG;
+
+out_noctx:
+	dprintk("svcrdma: no R/W ctxs available\n");
+	return -ENOMEM;
+
+out_initerr:
+	svc_rdma_put_rw_ctxt(rdma, ctxt);
+	pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+	return -EIO;
+}
+
+/* Send one of an xdr_buf's kvecs by itself. To send a Reply
+ * chunk, the whole RPC Reply is written back to the client.
+ * This function writes either the head or tail of the xdr_buf
+ * containing the Reply.
+ */
+static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
+				  struct kvec *vec)
+{
+	info->wi_base = vec->iov_base;
+	return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
+				     vec->iov_len);
+}
+
+/* Send an xdr_buf's page list by itself. A Write chunk is
+ * just the page list. a Reply chunk is the head, page list,
+ * and tail. This function is shared between the two types
+ * of chunk.
+ */
+static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
+				      struct xdr_buf *xdr)
+{
+	info->wi_xdr = xdr;
+	info->wi_next_off = 0;
+	return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
+				     xdr->page_len);
+}
+
+/**
+ * svc_rdma_send_write_chunk - Write all segments in a Write chunk
+ * @rdma: controlling RDMA transport
+ * @wr_ch: Write chunk provided by client
+ * @xdr: xdr_buf containing the data payload
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *	%-E2BIG if the payload was larger than the Write chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
+			      struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info *info;
+	int ret;
+
+	if (!xdr->page_len)
+		return 0;
+
+	info = svc_rdma_write_info_alloc(rdma, wr_ch);
+	if (!info)
+		return -ENOMEM;
+
+	ret = svc_rdma_send_xdr_pagelist(info, xdr);
+	if (ret < 0)
+		goto out_err;
+
+	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	if (ret < 0)
+		goto out_err;
+	return xdr->page_len;
+
+out_err:
+	svc_rdma_write_info_free(info);
+	return ret;
+}
+
+/**
+ * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
+ * @rdma: controlling RDMA transport
+ * @rp_ch: Reply chunk provided by client
+ * @writelist: true if client provided a Write list
+ * @xdr: xdr_buf containing an RPC Reply
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *	%-E2BIG if the payload was larger than the Reply chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
+			      bool writelist, struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info *info;
+	int consumed, ret;
+
+	info = svc_rdma_write_info_alloc(rdma, rp_ch);
+	if (!info)
+		return -ENOMEM;
+
+	ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
+	if (ret < 0)
+		goto out_err;
+	consumed = xdr->head[0].iov_len;
+
+	/* Send the page list in the Reply chunk only if the
+	 * client did not provide Write chunks.
+	 */
+	if (!writelist && xdr->page_len) {
+		ret = svc_rdma_send_xdr_pagelist(info, xdr);
+		if (ret < 0)
+			goto out_err;
+		consumed += xdr->page_len;
+	}
+
+	if (xdr->tail[0].iov_len) {
+		ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
+		if (ret < 0)
+			goto out_err;
+		consumed += xdr->tail[0].iov_len;
+	}
+
+	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	if (ret < 0)
+		goto out_err;
+	return consumed;
+
+out_err:
+	svc_rdma_write_info_free(info);
+	return ret;
+}

+ 482 - 496
net/sunrpc/xprtrdma/svc_rdma_sendto.c

@@ -1,4 +1,5 @@
 /*
 /*
+ * Copyright (c) 2016 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
  *
@@ -40,6 +41,63 @@
  * Author: Tom Tucker <tom@opengridcomputing.com>
  * Author: Tom Tucker <tom@opengridcomputing.com>
  */
  */
 
 
+/* Operation
+ *
+ * The main entry point is svc_rdma_sendto. This is called by the
+ * RPC server when an RPC Reply is ready to be transmitted to a client.
+ *
+ * The passed-in svc_rqst contains a struct xdr_buf which holds an
+ * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
+ * transport header, post all Write WRs needed for this Reply, then post
+ * a Send WR conveying the transport header and the RPC message itself to
+ * the client.
+ *
+ * svc_rdma_sendto must fully transmit the Reply before returning, as
+ * the svc_rqst will be recycled as soon as sendto returns. Remaining
+ * resources referred to by the svc_rqst are also recycled at that time.
+ * Therefore any resources that must remain longer must be detached
+ * from the svc_rqst and released later.
+ *
+ * Page Management
+ *
+ * The I/O that performs Reply transmission is asynchronous, and may
+ * complete well after sendto returns. Thus pages under I/O must be
+ * removed from the svc_rqst before sendto returns.
+ *
+ * The logic here depends on Send Queue and completion ordering. Since
+ * the Send WR is always posted last, it will always complete last. Thus
+ * when it completes, it is guaranteed that all previous Write WRs have
+ * also completed.
+ *
+ * Write WRs are constructed and posted. Each Write segment gets its own
+ * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
+ * DMA-unmap the pages under I/O for that Write segment. The Write
+ * completion handler does not release any pages.
+ *
+ * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * The ownership of all of the Reply's pages are transferred into that
+ * ctxt, the Send WR is posted, and sendto returns.
+ *
+ * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * Send completion handler finally releases the Reply's pages.
+ *
+ * This mechanism also assumes that completions on the transport's Send
+ * Completion Queue do not run in parallel. Otherwise a Write completion
+ * and Send completion running at the same time could release pages that
+ * are still DMA-mapped.
+ *
+ * Error Handling
+ *
+ * - If the Send WR is posted successfully, it will either complete
+ *   successfully, or get flushed. Either way, the Send completion
+ *   handler releases the Reply's pages.
+ * - If the Send WR cannot be not posted, the forward path releases
+ *   the Reply's pages.
+ *
+ * This handles the case, without the use of page reference counting,
+ * where two different Write segments send portions of the same page.
+ */
+
 #include <linux/sunrpc/debug.h>
 #include <linux/sunrpc/debug.h>
 #include <linux/sunrpc/rpc_rdma.h>
 #include <linux/sunrpc/rpc_rdma.h>
 #include <linux/spinlock.h>
 #include <linux/spinlock.h>
@@ -55,113 +113,141 @@ static u32 xdr_padsize(u32 len)
 	return (len & 3) ? (4 - (len & 3)) : 0;
 	return (len & 3) ? (4 - (len & 3)) : 0;
 }
 }
 
 
-int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
-		     struct xdr_buf *xdr,
-		     struct svc_rdma_req_map *vec,
-		     bool write_chunk_present)
+/* Returns length of transport header, in bytes.
+ */
+static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
 {
 {
-	int sge_no;
-	u32 sge_bytes;
-	u32 page_bytes;
-	u32 page_off;
-	int page_no;
-
-	if (xdr->len !=
-	    (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
-		pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
-		return -EIO;
-	}
+	unsigned int nsegs;
+	__be32 *p;
 
 
-	/* Skip the first sge, this is for the RPCRDMA header */
-	sge_no = 1;
+	p = rdma_resp;
+
+	/* RPC-over-RDMA V1 replies never have a Read list. */
+	p += rpcrdma_fixed_maxsz + 1;
 
 
-	/* Head SGE */
-	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
-	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
-	sge_no++;
-
-	/* pages SGE */
-	page_no = 0;
-	page_bytes = xdr->page_len;
-	page_off = xdr->page_base;
-	while (page_bytes) {
-		vec->sge[sge_no].iov_base =
-			page_address(xdr->pages[page_no]) + page_off;
-		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
-		page_bytes -= sge_bytes;
-		vec->sge[sge_no].iov_len = sge_bytes;
-
-		sge_no++;
-		page_no++;
-		page_off = 0; /* reset for next time through loop */
+	/* Skip Write list. */
+	while (*p++ != xdr_zero) {
+		nsegs = be32_to_cpup(p++);
+		p += nsegs * rpcrdma_segment_maxsz;
 	}
 	}
 
 
-	/* Tail SGE */
-	if (xdr->tail[0].iov_len) {
-		unsigned char *base = xdr->tail[0].iov_base;
-		size_t len = xdr->tail[0].iov_len;
-		u32 xdr_pad = xdr_padsize(xdr->page_len);
+	/* Skip Reply chunk. */
+	if (*p++ != xdr_zero) {
+		nsegs = be32_to_cpup(p++);
+		p += nsegs * rpcrdma_segment_maxsz;
+	}
 
 
-		if (write_chunk_present && xdr_pad) {
-			base += xdr_pad;
-			len -= xdr_pad;
-		}
+	return (unsigned long)p - (unsigned long)rdma_resp;
+}
 
 
-		if (len) {
-			vec->sge[sge_no].iov_base = base;
-			vec->sge[sge_no].iov_len = len;
-			sge_no++;
+/* One Write chunk is copied from Call transport header to Reply
+ * transport header. Each segment's length field is updated to
+ * reflect number of bytes consumed in the segment.
+ *
+ * Returns number of segments in this chunk.
+ */
+static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+					   unsigned int remaining)
+{
+	unsigned int i, nsegs;
+	u32 seg_len;
+
+	/* Write list discriminator */
+	*dst++ = *src++;
+
+	/* number of segments in this chunk */
+	nsegs = be32_to_cpup(src);
+	*dst++ = *src++;
+
+	for (i = nsegs; i; i--) {
+		/* segment's RDMA handle */
+		*dst++ = *src++;
+
+		/* bytes returned in this segment */
+		seg_len = be32_to_cpu(*src);
+		if (remaining >= seg_len) {
+			/* entire segment was consumed */
+			*dst = *src;
+			remaining -= seg_len;
+		} else {
+			/* segment only partly filled */
+			*dst = cpu_to_be32(remaining);
+			remaining = 0;
 		}
 		}
-	}
+		dst++; src++;
 
 
-	dprintk("svcrdma: %s: sge_no %d page_no %d "
-		"page_base %u page_len %u head_len %zu tail_len %zu\n",
-		__func__, sge_no, page_no, xdr->page_base, xdr->page_len,
-		xdr->head[0].iov_len, xdr->tail[0].iov_len);
+		/* segment's RDMA offset */
+		*dst++ = *src++;
+		*dst++ = *src++;
+	}
 
 
-	vec->count = sge_no;
-	return 0;
+	return nsegs;
 }
 }
 
 
-static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
-			      struct xdr_buf *xdr,
-			      u32 xdr_off, size_t len, int dir)
+/* The client provided a Write list in the Call message. Fill in
+ * the segments in the first Write chunk in the Reply's transport
+ * header with the number of bytes consumed in each segment.
+ * Remaining chunks are returned unused.
+ *
+ * Assumptions:
+ *  - Client has provided only one Write chunk
+ */
+static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
+					   unsigned int consumed)
 {
 {
-	struct page *page;
-	dma_addr_t dma_addr;
-	if (xdr_off < xdr->head[0].iov_len) {
-		/* This offset is in the head */
-		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
-		page = virt_to_page(xdr->head[0].iov_base);
-	} else {
-		xdr_off -= xdr->head[0].iov_len;
-		if (xdr_off < xdr->page_len) {
-			/* This offset is in the page list */
-			xdr_off += xdr->page_base;
-			page = xdr->pages[xdr_off >> PAGE_SHIFT];
-			xdr_off &= ~PAGE_MASK;
-		} else {
-			/* This offset is in the tail */
-			xdr_off -= xdr->page_len;
-			xdr_off += (unsigned long)
-				xdr->tail[0].iov_base & ~PAGE_MASK;
-			page = virt_to_page(xdr->tail[0].iov_base);
-		}
+	unsigned int nsegs;
+	__be32 *p, *q;
+
+	/* RPC-over-RDMA V1 replies never have a Read list. */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	q = wr_ch;
+	while (*q != xdr_zero) {
+		nsegs = xdr_encode_write_chunk(p, q, consumed);
+		q += 2 + nsegs * rpcrdma_segment_maxsz;
+		p += 2 + nsegs * rpcrdma_segment_maxsz;
+		consumed = 0;
 	}
 	}
-	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
-				   min_t(size_t, PAGE_SIZE, len), dir);
-	return dma_addr;
+
+	/* Terminate Write list */
+	*p++ = xdr_zero;
+
+	/* Reply chunk discriminator; may be replaced later */
+	*p = xdr_zero;
+}
+
+/* The client provided a Reply chunk in the Call message. Fill in
+ * the segments in the Reply chunk in the Reply message with the
+ * number of bytes consumed in each segment.
+ *
+ * Assumptions:
+ * - Reply can always fit in the provided Reply chunk
+ */
+static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
+					    unsigned int consumed)
+{
+	__be32 *p;
+
+	/* Find the Reply chunk in the Reply's xprt header.
+	 * RPC-over-RDMA V1 replies never have a Read list.
+	 */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	/* Skip past Write list */
+	while (*p++ != xdr_zero)
+		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+
+	xdr_encode_write_chunk(p, rp_ch, consumed);
 }
 }
 
 
 /* Parse the RPC Call's transport header.
 /* Parse the RPC Call's transport header.
  */
  */
-static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
-				      struct rpcrdma_write_array **write,
-				      struct rpcrdma_write_array **reply)
+static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
+				      __be32 **write, __be32 **reply)
 {
 {
 	__be32 *p;
 	__be32 *p;
 
 
-	p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
+	p = rdma_argp + rpcrdma_fixed_maxsz;
 
 
 	/* Read list */
 	/* Read list */
 	while (*p++ != xdr_zero)
 	while (*p++ != xdr_zero)
@@ -169,7 +255,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
 
 	/* Write list */
 	/* Write list */
 	if (*p != xdr_zero) {
 	if (*p != xdr_zero) {
-		*write = (struct rpcrdma_write_array *)p;
+		*write = p;
 		while (*p++ != xdr_zero)
 		while (*p++ != xdr_zero)
 			p += 1 + be32_to_cpu(*p) * 4;
 			p += 1 + be32_to_cpu(*p) * 4;
 	} else {
 	} else {
@@ -179,7 +265,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
 
 	/* Reply chunk */
 	/* Reply chunk */
 	if (*p != xdr_zero)
 	if (*p != xdr_zero)
-		*reply = (struct rpcrdma_write_array *)p;
+		*reply = p;
 	else
 	else
 		*reply = NULL;
 		*reply = NULL;
 }
 }
@@ -189,360 +275,321 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
  * Invalidate, and responder chooses one rkey to invalidate.
  * Invalidate, and responder chooses one rkey to invalidate.
  *
  *
  * Find a candidate rkey to invalidate when sending a reply.  Picks the
  * Find a candidate rkey to invalidate when sending a reply.  Picks the
- * first rkey it finds in the chunks lists.
+ * first R_key it finds in the chunk lists.
  *
  *
  * Returns zero if RPC's chunk lists are empty.
  * Returns zero if RPC's chunk lists are empty.
  */
  */
-static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
-				 struct rpcrdma_write_array *wr_ary,
-				 struct rpcrdma_write_array *rp_ary)
+static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
+				 __be32 *wr_lst, __be32 *rp_ch)
 {
 {
-	struct rpcrdma_read_chunk *rd_ary;
-	struct rpcrdma_segment *arg_ch;
+	__be32 *p;
 
 
-	rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0];
-	if (rd_ary->rc_discrim != xdr_zero)
-		return be32_to_cpu(rd_ary->rc_target.rs_handle);
+	p = rdma_argp + rpcrdma_fixed_maxsz;
+	if (*p != xdr_zero)
+		p += 2;
+	else if (wr_lst && be32_to_cpup(wr_lst + 1))
+		p = wr_lst + 2;
+	else if (rp_ch && be32_to_cpup(rp_ch + 1))
+		p = rp_ch + 2;
+	else
+		return 0;
+	return be32_to_cpup(p);
+}
 
 
-	if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
-		arg_ch = &wr_ary->wc_array[0].wc_target;
-		return be32_to_cpu(arg_ch->rs_handle);
-	}
+/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
+ * is used during completion to DMA-unmap this memory, and
+ * it uses ib_dma_unmap_page() exclusively.
+ */
+static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
+				struct svc_rdma_op_ctxt *ctxt,
+				unsigned int sge_no,
+				unsigned char *base,
+				unsigned int len)
+{
+	unsigned long offset = (unsigned long)base & ~PAGE_MASK;
+	struct ib_device *dev = rdma->sc_cm_id->device;
+	dma_addr_t dma_addr;
 
 
-	if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
-		arg_ch = &rp_ary->wc_array[0].wc_target;
-		return be32_to_cpu(arg_ch->rs_handle);
-	}
+	dma_addr = ib_dma_map_page(dev, virt_to_page(base),
+				   offset, len, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(dev, dma_addr))
+		return -EIO;
 
 
+	ctxt->sge[sge_no].addr = dma_addr;
+	ctxt->sge[sge_no].length = len;
+	ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+	svc_rdma_count_mappings(rdma, ctxt);
 	return 0;
 	return 0;
 }
 }
 
 
-/* Assumptions:
- * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
- */
-static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
-		      u32 rmr, u64 to,
-		      u32 xdr_off, int write_len,
-		      struct svc_rdma_req_map *vec)
+static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
+				 struct svc_rdma_op_ctxt *ctxt,
+				 unsigned int sge_no,
+				 struct page *page,
+				 unsigned int offset,
+				 unsigned int len)
 {
 {
-	struct ib_rdma_wr write_wr;
-	struct ib_sge *sge;
-	int xdr_sge_no;
-	int sge_no;
-	int sge_bytes;
-	int sge_off;
-	int bc;
-	struct svc_rdma_op_ctxt *ctxt;
+	struct ib_device *dev = rdma->sc_cm_id->device;
+	dma_addr_t dma_addr;
 
 
-	if (vec->count > RPCSVC_MAXPAGES) {
-		pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
+	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(dev, dma_addr))
 		return -EIO;
 		return -EIO;
-	}
 
 
-	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
-		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
-		rmr, (unsigned long long)to, xdr_off,
-		write_len, vec->sge, vec->count);
+	ctxt->sge[sge_no].addr = dma_addr;
+	ctxt->sge[sge_no].length = len;
+	ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+	svc_rdma_count_mappings(rdma, ctxt);
+	return 0;
+}
 
 
-	ctxt = svc_rdma_get_context(xprt);
+/**
+ * svc_rdma_map_reply_hdr - DMA map the transport header buffer
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for the Send WR
+ * @rdma_resp: buffer containing transport header
+ * @len: length of transport header
+ *
+ * Returns:
+ *	%0 if the header is DMA mapped,
+ *	%-EIO if DMA mapping failed.
+ */
+int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+			   struct svc_rdma_op_ctxt *ctxt,
+			   __be32 *rdma_resp,
+			   unsigned int len)
+{
 	ctxt->direction = DMA_TO_DEVICE;
 	ctxt->direction = DMA_TO_DEVICE;
-	sge = ctxt->sge;
-
-	/* Find the SGE associated with xdr_off */
-	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
-	     xdr_sge_no++) {
-		if (vec->sge[xdr_sge_no].iov_len > bc)
-			break;
-		bc -= vec->sge[xdr_sge_no].iov_len;
-	}
-
-	sge_off = bc;
-	bc = write_len;
-	sge_no = 0;
-
-	/* Copy the remaining SGE */
-	while (bc != 0) {
-		sge_bytes = min_t(size_t,
-			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
-		sge[sge_no].length = sge_bytes;
-		sge[sge_no].addr =
-			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
-				    sge_bytes, DMA_TO_DEVICE);
-		xdr_off += sge_bytes;
-		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-					 sge[sge_no].addr))
-			goto err;
-		svc_rdma_count_mappings(xprt, ctxt);
-		sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
-		ctxt->count++;
-		sge_off = 0;
-		sge_no++;
-		xdr_sge_no++;
-		if (xdr_sge_no > vec->count) {
-			pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
-			goto err;
-		}
-		bc -= sge_bytes;
-		if (sge_no == xprt->sc_max_sge)
-			break;
-	}
-
-	/* Prepare WRITE WR */
-	memset(&write_wr, 0, sizeof write_wr);
-	ctxt->cqe.done = svc_rdma_wc_write;
-	write_wr.wr.wr_cqe = &ctxt->cqe;
-	write_wr.wr.sg_list = &sge[0];
-	write_wr.wr.num_sge = sge_no;
-	write_wr.wr.opcode = IB_WR_RDMA_WRITE;
-	write_wr.wr.send_flags = IB_SEND_SIGNALED;
-	write_wr.rkey = rmr;
-	write_wr.remote_addr = to;
-
-	/* Post It */
-	atomic_inc(&rdma_stat_write);
-	if (svc_rdma_send(xprt, &write_wr.wr))
-		goto err;
-	return write_len - bc;
- err:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 0);
-	return -EIO;
+	ctxt->pages[0] = virt_to_page(rdma_resp);
+	ctxt->count = 1;
+	return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
 }
 }
 
 
-noinline
-static int send_write_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_write_array *wr_ary,
-			     struct rpcrdma_msg *rdma_resp,
-			     struct svc_rqst *rqstp,
-			     struct svc_rdma_req_map *vec)
+/* Load the xdr_buf into the ctxt's sge array, and DMA map each
+ * element as it is added.
+ *
+ * Returns the number of sge elements loaded on success, or
+ * a negative errno on failure.
+ */
+static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
+				  struct svc_rdma_op_ctxt *ctxt,
+				  struct xdr_buf *xdr, __be32 *wr_lst)
 {
 {
-	u32 xfer_len = rqstp->rq_res.page_len;
-	int write_len;
-	u32 xdr_off;
-	int chunk_off;
-	int chunk_no;
-	int nchunks;
-	struct rpcrdma_write_array *res_ary;
+	unsigned int len, sge_no, remaining, page_off;
+	struct page **ppages;
+	unsigned char *base;
+	u32 xdr_pad;
 	int ret;
 	int ret;
 
 
-	res_ary = (struct rpcrdma_write_array *)
-		&rdma_resp->rm_body.rm_chunks[1];
-
-	/* Write chunks start at the pagelist */
-	nchunks = be32_to_cpu(wr_ary->wc_nchunks);
-	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
-	     xfer_len && chunk_no < nchunks;
-	     chunk_no++) {
-		struct rpcrdma_segment *arg_ch;
-		u64 rs_offset;
-
-		arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
-		write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
-
-		/* Prepare the response chunk given the length actually
-		 * written */
-		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
-		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-						arg_ch->rs_handle,
-						arg_ch->rs_offset,
-						write_len);
-		chunk_off = 0;
-		while (write_len) {
-			ret = send_write(xprt, rqstp,
-					 be32_to_cpu(arg_ch->rs_handle),
-					 rs_offset + chunk_off,
-					 xdr_off,
-					 write_len,
-					 vec);
-			if (ret <= 0)
-				goto out_err;
-			chunk_off += ret;
-			xdr_off += ret;
-			xfer_len -= ret;
-			write_len -= ret;
+	sge_no = 1;
+
+	ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
+				   xdr->head[0].iov_base,
+				   xdr->head[0].iov_len);
+	if (ret < 0)
+		return ret;
+
+	/* If a Write chunk is present, the xdr_buf's page list
+	 * is not included inline. However the Upper Layer may
+	 * have added XDR padding in the tail buffer, and that
+	 * should not be included inline.
+	 */
+	if (wr_lst) {
+		base = xdr->tail[0].iov_base;
+		len = xdr->tail[0].iov_len;
+		xdr_pad = xdr_padsize(xdr->page_len);
+
+		if (len && xdr_pad) {
+			base += xdr_pad;
+			len -= xdr_pad;
 		}
 		}
+
+		goto tail;
+	}
+
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	page_off = xdr->page_base & ~PAGE_MASK;
+	remaining = xdr->page_len;
+	while (remaining) {
+		len = min_t(u32, PAGE_SIZE - page_off, remaining);
+
+		ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
+					    *ppages++, page_off, len);
+		if (ret < 0)
+			return ret;
+
+		remaining -= len;
+		page_off = 0;
 	}
 	}
-	/* Update the req with the number of chunks actually used */
-	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 
 
-	return rqstp->rq_res.page_len;
+	base = xdr->tail[0].iov_base;
+	len = xdr->tail[0].iov_len;
+tail:
+	if (len) {
+		ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
+		if (ret < 0)
+			return ret;
+	}
 
 
-out_err:
-	pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
-	return -EIO;
+	return sge_no - 1;
 }
 }
 
 
-noinline
-static int send_reply_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_write_array *rp_ary,
-			     struct rpcrdma_msg *rdma_resp,
-			     struct svc_rqst *rqstp,
-			     struct svc_rdma_req_map *vec)
+/* The svc_rqst and all resources it owns are released as soon as
+ * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
+ * so they are released by the Send completion handler.
+ */
+static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
+				   struct svc_rdma_op_ctxt *ctxt)
 {
 {
-	u32 xfer_len = rqstp->rq_res.len;
-	int write_len;
-	u32 xdr_off;
-	int chunk_no;
-	int chunk_off;
-	int nchunks;
-	struct rpcrdma_segment *ch;
-	struct rpcrdma_write_array *res_ary;
-	int ret;
+	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
 
 
-	/* XXX: need to fix when reply lists occur with read-list and or
-	 * write-list */
-	res_ary = (struct rpcrdma_write_array *)
-		&rdma_resp->rm_body.rm_chunks[2];
-
-	/* xdr offset starts at RPC message */
-	nchunks = be32_to_cpu(rp_ary->wc_nchunks);
-	for (xdr_off = 0, chunk_no = 0;
-	     xfer_len && chunk_no < nchunks;
-	     chunk_no++) {
-		u64 rs_offset;
-		ch = &rp_ary->wc_array[chunk_no].wc_target;
-		write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
-
-		/* Prepare the reply chunk given the length actually
-		 * written */
-		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
-		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-						ch->rs_handle, ch->rs_offset,
-						write_len);
-		chunk_off = 0;
-		while (write_len) {
-			ret = send_write(xprt, rqstp,
-					 be32_to_cpu(ch->rs_handle),
-					 rs_offset + chunk_off,
-					 xdr_off,
-					 write_len,
-					 vec);
-			if (ret <= 0)
-				goto out_err;
-			chunk_off += ret;
-			xdr_off += ret;
-			xfer_len -= ret;
-			write_len -= ret;
-		}
+	ctxt->count += pages;
+	for (i = 0; i < pages; i++) {
+		ctxt->pages[i + 1] = rqstp->rq_respages[i];
+		rqstp->rq_respages[i] = NULL;
 	}
 	}
-	/* Update the req with the number of chunks actually used */
-	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+	rqstp->rq_next_page = rqstp->rq_respages + 1;
+}
 
 
-	return rqstp->rq_res.len;
+/**
+ * svc_rdma_post_send_wr - Set up and post one Send Work Request
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for transmitting the Send WR
+ * @num_sge: number of SGEs to send
+ * @inv_rkey: R_key argument to Send With Invalidate, or zero
+ *
+ * Returns:
+ *	%0 if the Send* was posted successfully,
+ *	%-ENOTCONN if the connection was lost or dropped,
+ *	%-EINVAL if there was a problem with the Send we built,
+ *	%-ENOMEM if ib_post_send failed.
+ */
+int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
+			  struct svc_rdma_op_ctxt *ctxt, int num_sge,
+			  u32 inv_rkey)
+{
+	struct ib_send_wr *send_wr = &ctxt->send_wr;
 
 
-out_err:
-	pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
-	return -EIO;
+	dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
+
+	send_wr->next = NULL;
+	ctxt->cqe.done = svc_rdma_wc_send;
+	send_wr->wr_cqe = &ctxt->cqe;
+	send_wr->sg_list = ctxt->sge;
+	send_wr->num_sge = num_sge;
+	send_wr->send_flags = IB_SEND_SIGNALED;
+	if (inv_rkey) {
+		send_wr->opcode = IB_WR_SEND_WITH_INV;
+		send_wr->ex.invalidate_rkey = inv_rkey;
+	} else {
+		send_wr->opcode = IB_WR_SEND;
+	}
+
+	return svc_rdma_send(rdma, send_wr);
 }
 }
 
 
-/* This function prepares the portion of the RPCRDMA message to be
- * sent in the RDMA_SEND. This function is called after data sent via
- * RDMA has already been transmitted. There are three cases:
- * - The RPCRDMA header, RPC header, and payload are all sent in a
- *   single RDMA_SEND. This is the "inline" case.
- * - The RPCRDMA header and some portion of the RPC header and data
- *   are sent via this RDMA_SEND and another portion of the data is
- *   sent via RDMA.
- * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
- *   header and data are all transmitted via RDMA.
- * In all three cases, this function prepares the RPCRDMA header in
- * sge[0], the 'type' parameter indicates the type to place in the
- * RPCRDMA header, and the 'byte_count' field indicates how much of
- * the XDR to include in this RDMA_SEND. NB: The offset of the payload
- * to send is zero in the XDR.
+/* Prepare the portion of the RPC Reply that will be transmitted
+ * via RDMA Send. The RPC-over-RDMA transport header is prepared
+ * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ *
+ * Depending on whether a Write list or Reply chunk is present,
+ * the server may send all, a portion of, or none of the xdr_buf.
+ * In the latter case, only the transport header (sge[0]) is
+ * transmitted.
+ *
+ * RDMA Send is the last step of transmitting an RPC reply. Pages
+ * involved in the earlier RDMA Writes are here transferred out
+ * of the rqstp and into the ctxt's page array. These pages are
+ * DMA unmapped by each Write completion, but the subsequent Send
+ * completion finally releases these pages.
+ *
+ * Assumptions:
+ * - The Reply's transport header will never be larger than a page.
  */
  */
-static int send_reply(struct svcxprt_rdma *rdma,
-		      struct svc_rqst *rqstp,
-		      struct page *page,
-		      struct rpcrdma_msg *rdma_resp,
-		      struct svc_rdma_req_map *vec,
-		      int byte_count,
-		      u32 inv_rkey)
+static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
+				   __be32 *rdma_argp, __be32 *rdma_resp,
+				   struct svc_rqst *rqstp,
+				   __be32 *wr_lst, __be32 *rp_ch)
 {
 {
 	struct svc_rdma_op_ctxt *ctxt;
 	struct svc_rdma_op_ctxt *ctxt;
-	struct ib_send_wr send_wr;
-	u32 xdr_off;
-	int sge_no;
-	int sge_bytes;
-	int page_no;
-	int pages;
-	int ret = -EIO;
-
-	/* Prepare the context */
+	u32 inv_rkey;
+	int ret;
+
+	dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
+		(rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
+		rqstp->rq_res.head[0].iov_len,
+		rqstp->rq_res.page_len,
+		rqstp->rq_res.tail[0].iov_len);
+
 	ctxt = svc_rdma_get_context(rdma);
 	ctxt = svc_rdma_get_context(rdma);
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->pages[0] = page;
-	ctxt->count = 1;
 
 
-	/* Prepare the SGE for the RPCRDMA Header */
-	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length =
-	    svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
-	ctxt->sge[0].addr =
-	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
-			    ctxt->sge[0].length, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
+				     svc_rdma_reply_hdr_len(rdma_resp));
+	if (ret < 0)
 		goto err;
 		goto err;
-	svc_rdma_count_mappings(rdma, ctxt);
-
-	ctxt->direction = DMA_TO_DEVICE;
 
 
-	/* Map the payload indicated by 'byte_count' */
-	xdr_off = 0;
-	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
-		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
-		byte_count -= sge_bytes;
-		ctxt->sge[sge_no].addr =
-			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
-				    sge_bytes, DMA_TO_DEVICE);
-		xdr_off += sge_bytes;
-		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
-					 ctxt->sge[sge_no].addr))
+	if (!rp_ch) {
+		ret = svc_rdma_map_reply_msg(rdma, ctxt,
+					     &rqstp->rq_res, wr_lst);
+		if (ret < 0)
 			goto err;
 			goto err;
-		svc_rdma_count_mappings(rdma, ctxt);
-		ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
-		ctxt->sge[sge_no].length = sge_bytes;
 	}
 	}
-	if (byte_count != 0) {
-		pr_err("svcrdma: Could not map %d bytes\n", byte_count);
+
+	svc_rdma_save_io_pages(rqstp, ctxt);
+
+	inv_rkey = 0;
+	if (rdma->sc_snd_w_inv)
+		inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
+	if (ret)
 		goto err;
 		goto err;
-	}
 
 
-	/* Save all respages in the ctxt and remove them from the
-	 * respages array. They are our pages until the I/O
-	 * completes.
+	return 0;
+
+err:
+	pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
+	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_put_context(ctxt, 1);
+	return ret;
+}
+
+/* Given the client-provided Write and Reply chunks, the server was not
+ * able to form a complete reply. Return an RDMA_ERROR message so the
+ * client can retire this RPC transaction. As above, the Send completion
+ * routine releases payload pages that were part of a previous RDMA Write.
+ *
+ * Remote Invalidation is skipped for simplicity.
+ */
+static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+				   __be32 *rdma_resp, struct svc_rqst *rqstp)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	__be32 *p;
+	int ret;
+
+	ctxt = svc_rdma_get_context(rdma);
+
+	/* Replace the original transport header with an
+	 * RDMA_ERROR response. XID etc are preserved.
 	 */
 	 */
-	pages = rqstp->rq_next_page - rqstp->rq_respages;
-	for (page_no = 0; page_no < pages; page_no++) {
-		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
-		ctxt->count++;
-		rqstp->rq_respages[page_no] = NULL;
-	}
-	rqstp->rq_next_page = rqstp->rq_respages + 1;
+	p = rdma_resp + 3;
+	*p++ = rdma_error;
+	*p   = err_chunk;
 
 
-	if (sge_no > rdma->sc_max_sge) {
-		pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
+	if (ret < 0)
 		goto err;
 		goto err;
-	}
-	memset(&send_wr, 0, sizeof send_wr);
-	ctxt->cqe.done = svc_rdma_wc_send;
-	send_wr.wr_cqe = &ctxt->cqe;
-	send_wr.sg_list = ctxt->sge;
-	send_wr.num_sge = sge_no;
-	if (inv_rkey) {
-		send_wr.opcode = IB_WR_SEND_WITH_INV;
-		send_wr.ex.invalidate_rkey = inv_rkey;
-	} else
-		send_wr.opcode = IB_WR_SEND;
-	send_wr.send_flags =  IB_SEND_SIGNALED;
 
 
-	ret = svc_rdma_send(rdma, &send_wr);
+	svc_rdma_save_io_pages(rqstp, ctxt);
+
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0);
 	if (ret)
 	if (ret)
 		goto err;
 		goto err;
 
 
 	return 0;
 	return 0;
 
 
- err:
+err:
+	pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 1);
 	svc_rdma_put_context(ctxt, 1);
 	return ret;
 	return ret;
@@ -552,39 +599,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
 {
 {
 }
 }
 
 
+/**
+ * svc_rdma_sendto - Transmit an RPC reply
+ * @rqstp: processed RPC request, reply XDR already in ::rq_res
+ *
+ * Any resources still associated with @rqstp are released upon return.
+ * If no reply message was possible, the connection is closed.
+ *
+ * Returns:
+ *	%0 if an RPC reply has been successfully posted,
+ *	%-ENOMEM if a resource shortage occurred (connection is lost),
+ *	%-ENOTCONN if posting failed (connection is lost).
+ */
 int svc_rdma_sendto(struct svc_rqst *rqstp)
 int svc_rdma_sendto(struct svc_rqst *rqstp)
 {
 {
 	struct svc_xprt *xprt = rqstp->rq_xprt;
 	struct svc_xprt *xprt = rqstp->rq_xprt;
 	struct svcxprt_rdma *rdma =
 	struct svcxprt_rdma *rdma =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
-	struct rpcrdma_msg *rdma_argp;
-	struct rpcrdma_msg *rdma_resp;
-	struct rpcrdma_write_array *wr_ary, *rp_ary;
-	int ret;
-	int inline_bytes;
+	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct page *res_page;
 	struct page *res_page;
-	struct svc_rdma_req_map *vec;
-	u32 inv_rkey;
-	__be32 *p;
-
-	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+	int ret;
 
 
-	/* Get the RDMA request header. The receive logic always
-	 * places this at the start of page 0.
+	/* Find the call's chunk lists to decide how to send the reply.
+	 * Receive places the Call's xprt header at the start of page 0.
 	 */
 	 */
 	rdma_argp = page_address(rqstp->rq_pages[0]);
 	rdma_argp = page_address(rqstp->rq_pages[0]);
-	svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
-
-	inv_rkey = 0;
-	if (rdma->sc_snd_w_inv)
-		inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
+	svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
 
 
-	/* Build an req vec for the XDR */
-	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
-	if (ret)
-		goto err0;
-	inline_bytes = rqstp->rq_res.len;
+	dprintk("svcrdma: preparing response for XID 0x%08x\n",
+		be32_to_cpup(rdma_argp));
 
 
 	/* Create the RDMA response header. xprt->xpt_mutex,
 	/* Create the RDMA response header. xprt->xpt_mutex,
 	 * acquired in svc_send(), serializes RPC replies. The
 	 * acquired in svc_send(), serializes RPC replies. The
@@ -598,115 +642,57 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		goto err0;
 		goto err0;
 	rdma_resp = page_address(res_page);
 	rdma_resp = page_address(res_page);
 
 
-	p = &rdma_resp->rm_xid;
-	*p++ = rdma_argp->rm_xid;
-	*p++ = rdma_argp->rm_vers;
+	p = rdma_resp;
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
 	*p++ = rdma->sc_fc_credits;
-	*p++ = rp_ary ? rdma_nomsg : rdma_msg;
+	*p++ = rp_ch ? rdma_nomsg : rdma_msg;
 
 
 	/* Start with empty chunks */
 	/* Start with empty chunks */
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p   = xdr_zero;
 	*p   = xdr_zero;
 
 
-	/* Send any write-chunk data and build resp write-list */
-	if (wr_ary) {
-		ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+	if (wr_lst) {
+		/* XXX: Presume the client sent only one Write chunk */
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
 		if (ret < 0)
 		if (ret < 0)
-			goto err1;
-		inline_bytes -= ret + xdr_padsize(ret);
+			goto err2;
+		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
 	}
 	}
-
-	/* Send any reply-list data and update resp reply-list */
-	if (rp_ary) {
-		ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+	if (rp_ch) {
+		ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
 		if (ret < 0)
 		if (ret < 0)
-			goto err1;
-		inline_bytes -= ret;
+			goto err2;
+		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
 	}
 	}
 
 
-	/* Post a fresh Receive buffer _before_ sending the reply */
 	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
 	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
 	if (ret)
 	if (ret)
 		goto err1;
 		goto err1;
-
-	ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
-			 inline_bytes, inv_rkey);
+	ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
+				      wr_lst, rp_ch);
 	if (ret < 0)
 	if (ret < 0)
 		goto err0;
 		goto err0;
+	return 0;
 
 
-	svc_rdma_put_req_map(rdma, vec);
-	dprintk("svcrdma: send_reply returns %d\n", ret);
-	return ret;
+ err2:
+	if (ret != -E2BIG)
+		goto err1;
+
+	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+	if (ret)
+		goto err1;
+	ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
+	if (ret < 0)
+		goto err0;
+	return 0;
 
 
  err1:
  err1:
 	put_page(res_page);
 	put_page(res_page);
  err0:
  err0:
-	svc_rdma_put_req_map(rdma, vec);
 	pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
 	pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
 	       ret);
 	       ret);
-	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 	return -ENOTCONN;
 	return -ENOTCONN;
 }
 }
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-			 int status)
-{
-	struct ib_send_wr err_wr;
-	struct page *p;
-	struct svc_rdma_op_ctxt *ctxt;
-	enum rpcrdma_errcode err;
-	__be32 *va;
-	int length;
-	int ret;
-
-	ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
-	if (ret)
-		return;
-
-	p = alloc_page(GFP_KERNEL);
-	if (!p)
-		return;
-	va = page_address(p);
-
-	/* XDR encode an error reply */
-	err = ERR_CHUNK;
-	if (status == -EPROTONOSUPPORT)
-		err = ERR_VERS;
-	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
-	ctxt = svc_rdma_get_context(xprt);
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->count = 1;
-	ctxt->pages[0] = p;
-
-	/* Prepare SGE for local address */
-	ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length = length;
-	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
-					    p, 0, length, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
-		dprintk("svcrdma: Error mapping buffer for protocol error\n");
-		svc_rdma_put_context(ctxt, 1);
-		return;
-	}
-	svc_rdma_count_mappings(xprt, ctxt);
-
-	/* Prepare SEND WR */
-	memset(&err_wr, 0, sizeof(err_wr));
-	ctxt->cqe.done = svc_rdma_wc_send;
-	err_wr.wr_cqe = &ctxt->cqe;
-	err_wr.sg_list = ctxt->sge;
-	err_wr.num_sge = 1;
-	err_wr.opcode = IB_WR_SEND;
-	err_wr.send_flags = IB_SEND_SIGNALED;
-
-	/* Post It */
-	ret = svc_rdma_send(xprt, &err_wr);
-	if (ret) {
-		dprintk("svcrdma: Error %d posting send for protocol error\n",
-			ret);
-		svc_rdma_unmap_dma(ctxt);
-		svc_rdma_put_context(ctxt, 1);
-	}
-}

+ 7 - 103
net/sunrpc/xprtrdma/svc_rdma_transport.c

@@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
 	}
 	}
 }
 }
 
 
-static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
-{
-	struct svc_rdma_req_map *map;
-
-	map = kmalloc(sizeof(*map), flags);
-	if (map)
-		INIT_LIST_HEAD(&map->free);
-	return map;
-}
-
-static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
-{
-	unsigned int i;
-
-	/* One for each receive buffer on this connection. */
-	i = xprt->sc_max_requests;
-
-	while (i--) {
-		struct svc_rdma_req_map *map;
-
-		map = alloc_req_map(GFP_KERNEL);
-		if (!map) {
-			dprintk("svcrdma: No memory for request map\n");
-			return false;
-		}
-		list_add(&map->free, &xprt->sc_maps);
-	}
-	return true;
-}
-
-struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
-{
-	struct svc_rdma_req_map *map = NULL;
-
-	spin_lock(&xprt->sc_map_lock);
-	if (list_empty(&xprt->sc_maps))
-		goto out_empty;
-
-	map = list_first_entry(&xprt->sc_maps,
-			       struct svc_rdma_req_map, free);
-	list_del_init(&map->free);
-	spin_unlock(&xprt->sc_map_lock);
-
-out:
-	map->count = 0;
-	return map;
-
-out_empty:
-	spin_unlock(&xprt->sc_map_lock);
-
-	/* Pre-allocation amount was incorrect */
-	map = alloc_req_map(GFP_NOIO);
-	if (map)
-		goto out;
-
-	WARN_ONCE(1, "svcrdma: empty request map list?\n");
-	return NULL;
-}
-
-void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
-			  struct svc_rdma_req_map *map)
-{
-	spin_lock(&xprt->sc_map_lock);
-	list_add(&map->free, &xprt->sc_maps);
-	spin_unlock(&xprt->sc_map_lock);
-}
-
-static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
-{
-	while (!list_empty(&xprt->sc_maps)) {
-		struct svc_rdma_req_map *map;
-
-		map = list_first_entry(&xprt->sc_maps,
-				       struct svc_rdma_req_map, free);
-		list_del(&map->free);
-		kfree(map);
-	}
-}
-
 /* QP event handler */
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 static void qp_event_handler(struct ib_event *event, void *context)
 {
 {
@@ -473,24 +394,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 	svc_rdma_put_context(ctxt, 1);
 	svc_rdma_put_context(ctxt, 1);
 }
 }
 
 
-/**
- * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct svc_rdma_op_ctxt *ctxt;
-
-	svc_rdma_send_wc_common_put(cq, wc, "write");
-
-	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 0);
-}
-
 /**
 /**
  * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
  * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
  * @cq:        completion queue
  * @cq:        completion queue
@@ -561,14 +464,14 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
-	INIT_LIST_HEAD(&cma_xprt->sc_maps);
+	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
 
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 	spin_lock_init(&cma_xprt->sc_ctxt_lock);
 	spin_lock_init(&cma_xprt->sc_ctxt_lock);
-	spin_lock_init(&cma_xprt->sc_map_lock);
+	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
 
 	/*
 	/*
 	 * Note that this implies that the underlying transport support
 	 * Note that this implies that the underlying transport support
@@ -999,6 +902,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 		newxprt, newxprt->sc_cm_id);
 		newxprt, newxprt->sc_cm_id);
 
 
 	dev = newxprt->sc_cm_id->device;
 	dev = newxprt->sc_cm_id->device;
+	newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
 
 
 	/* Qualify the transport resource defaults with the
 	/* Qualify the transport resource defaults with the
 	 * capabilities of this particular device */
 	 * capabilities of this particular device */
@@ -1014,13 +918,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 					    svcrdma_max_bc_requests);
 					    svcrdma_max_bc_requests);
 	newxprt->sc_rq_depth = newxprt->sc_max_requests +
 	newxprt->sc_rq_depth = newxprt->sc_max_requests +
 			       newxprt->sc_max_bc_requests;
 			       newxprt->sc_max_bc_requests;
-	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
+	newxprt->sc_sq_depth = newxprt->sc_rq_depth;
 	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
 
 	if (!svc_rdma_prealloc_ctxts(newxprt))
 	if (!svc_rdma_prealloc_ctxts(newxprt))
 		goto errout;
 		goto errout;
-	if (!svc_rdma_prealloc_maps(newxprt))
-		goto errout;
 
 
 	/*
 	/*
 	 * Limit ORD based on client limit, local device limit, and
 	 * Limit ORD based on client limit, local device limit, and
@@ -1050,6 +952,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	memset(&qp_attr, 0, sizeof qp_attr);
 	memset(&qp_attr, 0, sizeof qp_attr);
 	qp_attr.event_handler = qp_event_handler;
 	qp_attr.event_handler = qp_event_handler;
 	qp_attr.qp_context = &newxprt->sc_xprt;
 	qp_attr.qp_context = &newxprt->sc_xprt;
+	qp_attr.port_num = newxprt->sc_cm_id->port_num;
+	qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
 	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
 	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
 	qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
 	qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
 	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
 	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
@@ -1248,8 +1152,8 @@ static void __svc_rdma_free(struct work_struct *work)
 	}
 	}
 
 
 	rdma_dealloc_frmr_q(rdma);
 	rdma_dealloc_frmr_q(rdma);
+	svc_rdma_destroy_rw_ctxts(rdma);
 	svc_rdma_destroy_ctxts(rdma);
 	svc_rdma_destroy_ctxts(rdma);
-	svc_rdma_destroy_maps(rdma);
 
 
 	/* Destroy the QP if present (not a listener) */
 	/* Destroy the QP if present (not a listener) */
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))