Browse Source

SUNRPC: Add a server side per-connection limit

Allow the user to limit the number of requests serviced through a single
connection, to help prevent faster clients from starving slower clients.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Trond Myklebust 9 years ago
parent
commit
ff3ac5c3dc

+ 6 - 0
Documentation/kernel-parameters.txt

@@ -3832,6 +3832,12 @@ bytes respectively. Such letter suffixes can also be entirely omitted.
 			using these two parameters to set the minimum and
 			using these two parameters to set the minimum and
 			maximum port values.
 			maximum port values.
 
 
+	sunrpc.svc_rpc_per_connection_limit=
+			[NFS,SUNRPC]
+			Limit the number of requests that the server will
+			process in parallel from a single connection.
+			The default value is 0 (no limit).
+
 	sunrpc.pool_mode=
 	sunrpc.pool_mode=
 			[NFS]
 			[NFS]
 			Control how the NFS server code allocates CPUs to
 			Control how the NFS server code allocates CPUs to

+ 1 - 0
include/linux/sunrpc/svc.h

@@ -268,6 +268,7 @@ struct svc_rqst {
 						 * cache pages */
 						 * cache pages */
 #define	RQ_VICTIM	(5)			/* about to be shut down */
 #define	RQ_VICTIM	(5)			/* about to be shut down */
 #define	RQ_BUSY		(6)			/* request is busy */
 #define	RQ_BUSY		(6)			/* request is busy */
+#define	RQ_DATA		(7)			/* request has data */
 	unsigned long		rq_flags;	/* flags field */
 	unsigned long		rq_flags;	/* flags field */
 
 
 	void *			rq_argp;	/* decoded arguments */
 	void *			rq_argp;	/* decoded arguments */

+ 1 - 0
include/linux/sunrpc/svc_xprt.h

@@ -69,6 +69,7 @@ struct svc_xprt {
 
 
 	struct svc_serv		*xpt_server;	/* service for transport */
 	struct svc_serv		*xpt_server;	/* service for transport */
 	atomic_t    	    	xpt_reserved;	/* space on outq that is rsvd */
 	atomic_t    	    	xpt_reserved;	/* space on outq that is rsvd */
+	atomic_t		xpt_nr_rqsts;	/* Number of requests */
 	struct mutex		xpt_mutex;	/* to serialize sending data */
 	struct mutex		xpt_mutex;	/* to serialize sending data */
 	spinlock_t		xpt_lock;	/* protects sk_deferred
 	spinlock_t		xpt_lock;	/* protects sk_deferred
 						 * and xpt_auth_cache */
 						 * and xpt_auth_cache */

+ 36 - 3
net/sunrpc/svc_xprt.c

@@ -21,6 +21,10 @@
 
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
 
+static unsigned int svc_rpc_per_connection_limit __read_mostly;
+module_param(svc_rpc_per_connection_limit, uint, 0644);
+
+
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
@@ -329,12 +333,41 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
 }
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
 
+static bool svc_xprt_slots_in_range(struct svc_xprt *xprt)
+{
+	unsigned int limit = svc_rpc_per_connection_limit;
+	int nrqsts = atomic_read(&xprt->xpt_nr_rqsts);
+
+	return limit == 0 || (nrqsts >= 0 && nrqsts < limit);
+}
+
+static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+	if (!test_bit(RQ_DATA, &rqstp->rq_flags)) {
+		if (!svc_xprt_slots_in_range(xprt))
+			return false;
+		atomic_inc(&xprt->xpt_nr_rqsts);
+		set_bit(RQ_DATA, &rqstp->rq_flags);
+	}
+	return true;
+}
+
+static void svc_xprt_release_slot(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt = rqstp->rq_xprt;
+	if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
+		atomic_dec(&xprt->xpt_nr_rqsts);
+		svc_xprt_enqueue(xprt);
+	}
+}
+
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
 {
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
 		return true;
 		return true;
 	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
 	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
-		if (xprt->xpt_ops->xpo_has_wspace(xprt))
+		if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
+		    svc_xprt_slots_in_range(xprt))
 			return true;
 			return true;
 		trace_svc_xprt_no_write_space(xprt);
 		trace_svc_xprt_no_write_space(xprt);
 		return false;
 		return false;
@@ -516,8 +549,8 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
 
 
 	rqstp->rq_res.head[0].iov_len = 0;
 	rqstp->rq_res.head[0].iov_len = 0;
 	svc_reserve(rqstp, 0);
 	svc_reserve(rqstp, 0);
+	svc_xprt_release_slot(rqstp);
 	rqstp->rq_xprt = NULL;
 	rqstp->rq_xprt = NULL;
-
 	svc_xprt_put(xprt);
 	svc_xprt_put(xprt);
 }
 }
 
 
@@ -785,7 +818,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
 			svc_add_new_temp_xprt(serv, newxpt);
 			svc_add_new_temp_xprt(serv, newxpt);
 		else
 		else
 			module_put(xprt->xpt_class->xcl_owner);
 			module_put(xprt->xpt_class->xcl_owner);
-	} else {
+	} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
 		/* XPT_DATA|XPT_DEFERRED case: */
 		/* XPT_DATA|XPT_DEFERRED case: */
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, rqstp->rq_pool->sp_id, xprt,
 			rqstp, rqstp->rq_pool->sp_id, xprt,