|
@@ -220,9 +220,11 @@ static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
|
|
|
*/
|
|
|
static void svc_xprt_received(struct svc_xprt *xprt)
|
|
|
{
|
|
|
- WARN_ON_ONCE(!test_bit(XPT_BUSY, &xprt->xpt_flags));
|
|
|
- if (!test_bit(XPT_BUSY, &xprt->xpt_flags))
|
|
|
+ if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) {
|
|
|
+ WARN_ONCE(1, "xprt=0x%p already busy!", xprt);
|
|
|
return;
|
|
|
+ }
|
|
|
+
|
|
|
/* As soon as we clear busy, the xprt could be closed and
|
|
|
* 'put', so we need a reference to call svc_xprt_do_enqueue with:
|
|
|
*/
|
|
@@ -310,25 +312,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
|
|
|
}
|
|
|
EXPORT_SYMBOL_GPL(svc_print_addr);
|
|
|
|
|
|
-/*
|
|
|
- * Queue up an idle server thread. Must have pool->sp_lock held.
|
|
|
- * Note: this is really a stack rather than a queue, so that we only
|
|
|
- * use as many different threads as we need, and the rest don't pollute
|
|
|
- * the cache.
|
|
|
- */
|
|
|
-static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
|
|
|
-{
|
|
|
- list_add(&rqstp->rq_list, &pool->sp_threads);
|
|
|
-}
|
|
|
-
|
|
|
-/*
|
|
|
- * Dequeue an nfsd thread. Must have pool->sp_lock held.
|
|
|
- */
|
|
|
-static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
|
|
|
-{
|
|
|
- list_del(&rqstp->rq_list);
|
|
|
-}
|
|
|
-
|
|
|
static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
|
|
|
{
|
|
|
if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
|
|
@@ -341,11 +324,12 @@ static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
|
|
|
static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
|
|
|
{
|
|
|
struct svc_pool *pool;
|
|
|
- struct svc_rqst *rqstp;
|
|
|
+ struct svc_rqst *rqstp = NULL;
|
|
|
int cpu;
|
|
|
+ bool queued = false;
|
|
|
|
|
|
if (!svc_xprt_has_something_to_do(xprt))
|
|
|
- return;
|
|
|
+ goto out;
|
|
|
|
|
|
/* Mark transport as busy. It will remain in this state until
|
|
|
* the provider calls svc_xprt_received. We update XPT_BUSY
|
|
@@ -355,43 +339,69 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
|
|
|
if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
|
|
|
/* Don't enqueue transport while already enqueued */
|
|
|
dprintk("svc: transport %p busy, not enqueued\n", xprt);
|
|
|
- return;
|
|
|
+ goto out;
|
|
|
}
|
|
|
|
|
|
cpu = get_cpu();
|
|
|
pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
|
|
|
- spin_lock_bh(&pool->sp_lock);
|
|
|
|
|
|
- pool->sp_stats.packets++;
|
|
|
-
|
|
|
- if (!list_empty(&pool->sp_threads)) {
|
|
|
- rqstp = list_entry(pool->sp_threads.next,
|
|
|
- struct svc_rqst,
|
|
|
- rq_list);
|
|
|
- dprintk("svc: transport %p served by daemon %p\n",
|
|
|
- xprt, rqstp);
|
|
|
- svc_thread_dequeue(pool, rqstp);
|
|
|
- if (rqstp->rq_xprt)
|
|
|
- printk(KERN_ERR
|
|
|
- "svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
|
|
|
- rqstp, rqstp->rq_xprt);
|
|
|
- /* Note the order of the following 3 lines:
|
|
|
- * We want to assign xprt to rqstp->rq_xprt only _after_
|
|
|
- * we've woken up the process, so that we don't race with
|
|
|
- * the lockless check in svc_get_next_xprt().
|
|
|
+ atomic_long_inc(&pool->sp_stats.packets);
|
|
|
+
|
|
|
+redo_search:
|
|
|
+ /* find a thread for this xprt */
|
|
|
+ rcu_read_lock();
|
|
|
+ list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
|
|
|
+ /* Do a lockless check first */
|
|
|
+ if (test_bit(RQ_BUSY, &rqstp->rq_flags))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Once the xprt has been queued, it can only be dequeued by
|
|
|
+ * the task that intends to service it. All we can do at that
|
|
|
+ * point is to try to wake this thread back up so that it can
|
|
|
+ * do so.
|
|
|
*/
|
|
|
- svc_xprt_get(xprt);
|
|
|
+ if (!queued) {
|
|
|
+ spin_lock_bh(&rqstp->rq_lock);
|
|
|
+ if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
|
|
|
+ /* already busy, move on... */
|
|
|
+ spin_unlock_bh(&rqstp->rq_lock);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* this one will do */
|
|
|
+ rqstp->rq_xprt = xprt;
|
|
|
+ svc_xprt_get(xprt);
|
|
|
+ spin_unlock_bh(&rqstp->rq_lock);
|
|
|
+ }
|
|
|
+ rcu_read_unlock();
|
|
|
+
|
|
|
+ atomic_long_inc(&pool->sp_stats.threads_woken);
|
|
|
wake_up_process(rqstp->rq_task);
|
|
|
- rqstp->rq_xprt = xprt;
|
|
|
- pool->sp_stats.threads_woken++;
|
|
|
- } else {
|
|
|
+ put_cpu();
|
|
|
+ goto out;
|
|
|
+ }
|
|
|
+ rcu_read_unlock();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * We didn't find an idle thread to use, so we need to queue the xprt.
|
|
|
+ * Do so and then search again. If we find one, we can't hook this one
|
|
|
+ * up to it directly but we can wake the thread up in the hopes that it
|
|
|
+ * will pick it up once it searches for a xprt to service.
|
|
|
+ */
|
|
|
+ if (!queued) {
|
|
|
+ queued = true;
|
|
|
dprintk("svc: transport %p put into queue\n", xprt);
|
|
|
+ spin_lock_bh(&pool->sp_lock);
|
|
|
list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
|
|
|
pool->sp_stats.sockets_queued++;
|
|
|
+ spin_unlock_bh(&pool->sp_lock);
|
|
|
+ goto redo_search;
|
|
|
}
|
|
|
-
|
|
|
- spin_unlock_bh(&pool->sp_lock);
|
|
|
+ rqstp = NULL;
|
|
|
put_cpu();
|
|
|
+out:
|
|
|
+ trace_svc_xprt_do_enqueue(xprt, rqstp);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -408,22 +418,28 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
|
|
|
EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
|
|
|
|
|
|
/*
|
|
|
- * Dequeue the first transport. Must be called with the pool->sp_lock held.
|
|
|
+ * Dequeue the first transport, if there is one.
|
|
|
*/
|
|
|
static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
|
|
|
{
|
|
|
- struct svc_xprt *xprt;
|
|
|
+ struct svc_xprt *xprt = NULL;
|
|
|
|
|
|
if (list_empty(&pool->sp_sockets))
|
|
|
- return NULL;
|
|
|
-
|
|
|
- xprt = list_entry(pool->sp_sockets.next,
|
|
|
- struct svc_xprt, xpt_ready);
|
|
|
- list_del_init(&xprt->xpt_ready);
|
|
|
+ goto out;
|
|
|
|
|
|
- dprintk("svc: transport %p dequeued, inuse=%d\n",
|
|
|
- xprt, atomic_read(&xprt->xpt_ref.refcount));
|
|
|
+ spin_lock_bh(&pool->sp_lock);
|
|
|
+ if (likely(!list_empty(&pool->sp_sockets))) {
|
|
|
+ xprt = list_first_entry(&pool->sp_sockets,
|
|
|
+ struct svc_xprt, xpt_ready);
|
|
|
+ list_del_init(&xprt->xpt_ready);
|
|
|
+ svc_xprt_get(xprt);
|
|
|
|
|
|
+ dprintk("svc: transport %p dequeued, inuse=%d\n",
|
|
|
+ xprt, atomic_read(&xprt->xpt_ref.refcount));
|
|
|
+ }
|
|
|
+ spin_unlock_bh(&pool->sp_lock);
|
|
|
+out:
|
|
|
+ trace_svc_xprt_dequeue(xprt);
|
|
|
return xprt;
|
|
|
}
|
|
|
|
|
@@ -484,34 +500,36 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
- * External function to wake up a server waiting for data
|
|
|
- * This really only makes sense for services like lockd
|
|
|
- * which have exactly one thread anyway.
|
|
|
+ * Some svc_serv's will have occasional work to do, even when a xprt is not
|
|
|
+ * waiting to be serviced. This function is there to "kick" a task in one of
|
|
|
+ * those services so that it can wake up and do that work. Note that we only
|
|
|
+ * bother with pool 0 as we don't need to wake up more than one thread for
|
|
|
+ * this purpose.
|
|
|
*/
|
|
|
void svc_wake_up(struct svc_serv *serv)
|
|
|
{
|
|
|
struct svc_rqst *rqstp;
|
|
|
- unsigned int i;
|
|
|
struct svc_pool *pool;
|
|
|
|
|
|
- for (i = 0; i < serv->sv_nrpools; i++) {
|
|
|
- pool = &serv->sv_pools[i];
|
|
|
+ pool = &serv->sv_pools[0];
|
|
|
|
|
|
- spin_lock_bh(&pool->sp_lock);
|
|
|
- if (!list_empty(&pool->sp_threads)) {
|
|
|
- rqstp = list_entry(pool->sp_threads.next,
|
|
|
- struct svc_rqst,
|
|
|
- rq_list);
|
|
|
- dprintk("svc: daemon %p woken up.\n", rqstp);
|
|
|
- /*
|
|
|
- svc_thread_dequeue(pool, rqstp);
|
|
|
- rqstp->rq_xprt = NULL;
|
|
|
- */
|
|
|
- wake_up_process(rqstp->rq_task);
|
|
|
- } else
|
|
|
- pool->sp_task_pending = 1;
|
|
|
- spin_unlock_bh(&pool->sp_lock);
|
|
|
+ rcu_read_lock();
|
|
|
+ list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
|
|
|
+ /* skip any that aren't queued */
|
|
|
+ if (test_bit(RQ_BUSY, &rqstp->rq_flags))
|
|
|
+ continue;
|
|
|
+ rcu_read_unlock();
|
|
|
+ dprintk("svc: daemon %p woken up.\n", rqstp);
|
|
|
+ wake_up_process(rqstp->rq_task);
|
|
|
+ trace_svc_wake_up(rqstp->rq_task->pid);
|
|
|
+ return;
|
|
|
}
|
|
|
+ rcu_read_unlock();
|
|
|
+
|
|
|
+ /* No free entries available */
|
|
|
+ set_bit(SP_TASK_PENDING, &pool->sp_flags);
|
|
|
+ smp_wmb();
|
|
|
+ trace_svc_wake_up(0);
|
|
|
}
|
|
|
EXPORT_SYMBOL_GPL(svc_wake_up);
|
|
|
|
|
@@ -622,75 +640,86 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+static bool
|
|
|
+rqst_should_sleep(struct svc_rqst *rqstp)
|
|
|
+{
|
|
|
+ struct svc_pool *pool = rqstp->rq_pool;
|
|
|
+
|
|
|
+ /* did someone call svc_wake_up? */
|
|
|
+ if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* was a socket queued? */
|
|
|
+ if (!list_empty(&pool->sp_sockets))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* are we shutting down? */
|
|
|
+ if (signalled() || kthread_should_stop())
|
|
|
+ return false;
|
|
|
+
|
|
|
+ /* are we freezing? */
|
|
|
+ if (freezing(current))
|
|
|
+ return false;
|
|
|
+
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
|
|
|
{
|
|
|
struct svc_xprt *xprt;
|
|
|
struct svc_pool *pool = rqstp->rq_pool;
|
|
|
long time_left = 0;
|
|
|
|
|
|
+ /* rq_xprt should be clear on entry */
|
|
|
+ WARN_ON_ONCE(rqstp->rq_xprt);
|
|
|
+
|
|
|
/* Normally we will wait up to 5 seconds for any required
|
|
|
* cache information to be provided.
|
|
|
*/
|
|
|
rqstp->rq_chandle.thread_wait = 5*HZ;
|
|
|
|
|
|
- spin_lock_bh(&pool->sp_lock);
|
|
|
xprt = svc_xprt_dequeue(pool);
|
|
|
if (xprt) {
|
|
|
rqstp->rq_xprt = xprt;
|
|
|
- svc_xprt_get(xprt);
|
|
|
|
|
|
/* As there is a shortage of threads and this request
|
|
|
* had to be queued, don't allow the thread to wait so
|
|
|
* long for cache updates.
|
|
|
*/
|
|
|
rqstp->rq_chandle.thread_wait = 1*HZ;
|
|
|
- pool->sp_task_pending = 0;
|
|
|
- } else {
|
|
|
- if (pool->sp_task_pending) {
|
|
|
- pool->sp_task_pending = 0;
|
|
|
- xprt = ERR_PTR(-EAGAIN);
|
|
|
- goto out;
|
|
|
- }
|
|
|
- /*
|
|
|
- * We have to be able to interrupt this wait
|
|
|
- * to bring down the daemons ...
|
|
|
- */
|
|
|
- set_current_state(TASK_INTERRUPTIBLE);
|
|
|
+ clear_bit(SP_TASK_PENDING, &pool->sp_flags);
|
|
|
+ return xprt;
|
|
|
+ }
|
|
|
|
|
|
- /* No data pending. Go to sleep */
|
|
|
- svc_thread_enqueue(pool, rqstp);
|
|
|
- spin_unlock_bh(&pool->sp_lock);
|
|
|
+ /*
|
|
|
+ * We have to be able to interrupt this wait
|
|
|
+ * to bring down the daemons ...
|
|
|
+ */
|
|
|
+ set_current_state(TASK_INTERRUPTIBLE);
|
|
|
+ clear_bit(RQ_BUSY, &rqstp->rq_flags);
|
|
|
+ smp_mb();
|
|
|
|
|
|
- if (!(signalled() || kthread_should_stop())) {
|
|
|
- time_left = schedule_timeout(timeout);
|
|
|
- __set_current_state(TASK_RUNNING);
|
|
|
+ if (likely(rqst_should_sleep(rqstp)))
|
|
|
+ time_left = schedule_timeout(timeout);
|
|
|
+ else
|
|
|
+ __set_current_state(TASK_RUNNING);
|
|
|
|
|
|
- try_to_freeze();
|
|
|
+ try_to_freeze();
|
|
|
|
|
|
- xprt = rqstp->rq_xprt;
|
|
|
- if (xprt != NULL)
|
|
|
- return xprt;
|
|
|
- } else
|
|
|
- __set_current_state(TASK_RUNNING);
|
|
|
+ spin_lock_bh(&rqstp->rq_lock);
|
|
|
+ set_bit(RQ_BUSY, &rqstp->rq_flags);
|
|
|
+ spin_unlock_bh(&rqstp->rq_lock);
|
|
|
|
|
|
- spin_lock_bh(&pool->sp_lock);
|
|
|
- if (!time_left)
|
|
|
- pool->sp_stats.threads_timedout++;
|
|
|
+ xprt = rqstp->rq_xprt;
|
|
|
+ if (xprt != NULL)
|
|
|
+ return xprt;
|
|
|
|
|
|
- xprt = rqstp->rq_xprt;
|
|
|
- if (!xprt) {
|
|
|
- svc_thread_dequeue(pool, rqstp);
|
|
|
- spin_unlock_bh(&pool->sp_lock);
|
|
|
- dprintk("svc: server %p, no data yet\n", rqstp);
|
|
|
- if (signalled() || kthread_should_stop())
|
|
|
- return ERR_PTR(-EINTR);
|
|
|
- else
|
|
|
- return ERR_PTR(-EAGAIN);
|
|
|
- }
|
|
|
- }
|
|
|
-out:
|
|
|
- spin_unlock_bh(&pool->sp_lock);
|
|
|
- return xprt;
|
|
|
+ if (!time_left)
|
|
|
+ atomic_long_inc(&pool->sp_stats.threads_timedout);
|
|
|
+
|
|
|
+ if (signalled() || kthread_should_stop())
|
|
|
+ return ERR_PTR(-EINTR);
|
|
|
+ return ERR_PTR(-EAGAIN);
|
|
|
}
|
|
|
|
|
|
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
|
|
@@ -719,7 +748,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
|
|
|
dprintk("svc_recv: found XPT_CLOSE\n");
|
|
|
svc_delete_xprt(xprt);
|
|
|
/* Leave XPT_BUSY set on the dead xprt: */
|
|
|
- return 0;
|
|
|
+ goto out;
|
|
|
}
|
|
|
if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
|
|
|
struct svc_xprt *newxpt;
|
|
@@ -750,6 +779,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
|
|
|
}
|
|
|
/* clear XPT_BUSY: */
|
|
|
svc_xprt_received(xprt);
|
|
|
+out:
|
|
|
+ trace_svc_handle_xprt(xprt, len);
|
|
|
return len;
|
|
|
}
|
|
|
|
|
@@ -797,7 +828,10 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
|
|
|
|
|
|
clear_bit(XPT_OLD, &xprt->xpt_flags);
|
|
|
|
|
|
- rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp);
|
|
|
+ if (xprt->xpt_ops->xpo_secure_port(rqstp))
|
|
|
+ set_bit(RQ_SECURE, &rqstp->rq_flags);
|
|
|
+ else
|
|
|
+ clear_bit(RQ_SECURE, &rqstp->rq_flags);
|
|
|
rqstp->rq_chandle.defer = svc_defer;
|
|
|
rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]);
|
|
|
|
|
@@ -895,7 +929,6 @@ static void svc_age_temp_xprts(unsigned long closure)
|
|
|
continue;
|
|
|
list_del_init(le);
|
|
|
set_bit(XPT_CLOSE, &xprt->xpt_flags);
|
|
|
- set_bit(XPT_DETACHED, &xprt->xpt_flags);
|
|
|
dprintk("queuing xprt %p for closing\n", xprt);
|
|
|
|
|
|
/* a thread will dequeue and close it soon */
|
|
@@ -935,8 +968,7 @@ static void svc_delete_xprt(struct svc_xprt *xprt)
|
|
|
xprt->xpt_ops->xpo_detach(xprt);
|
|
|
|
|
|
spin_lock_bh(&serv->sv_lock);
|
|
|
- if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
|
|
|
- list_del_init(&xprt->xpt_list);
|
|
|
+ list_del_init(&xprt->xpt_list);
|
|
|
WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
|
|
|
if (test_bit(XPT_TEMP, &xprt->xpt_flags))
|
|
|
serv->sv_tmpcnt--;
|
|
@@ -1080,7 +1112,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
|
|
|
struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
|
|
|
struct svc_deferred_req *dr;
|
|
|
|
|
|
- if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral)
|
|
|
+ if (rqstp->rq_arg.page_len || !test_bit(RQ_USEDEFERRAL, &rqstp->rq_flags))
|
|
|
return NULL; /* if more than a page, give up FIXME */
|
|
|
if (rqstp->rq_deferred) {
|
|
|
dr = rqstp->rq_deferred;
|
|
@@ -1109,7 +1141,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req)
|
|
|
}
|
|
|
svc_xprt_get(rqstp->rq_xprt);
|
|
|
dr->xprt = rqstp->rq_xprt;
|
|
|
- rqstp->rq_dropme = true;
|
|
|
+ set_bit(RQ_DROPME, &rqstp->rq_flags);
|
|
|
|
|
|
dr->handle.revisit = svc_revisit;
|
|
|
return &dr->handle;
|
|
@@ -1311,10 +1343,10 @@ static int svc_pool_stats_show(struct seq_file *m, void *p)
|
|
|
|
|
|
seq_printf(m, "%u %lu %lu %lu %lu\n",
|
|
|
pool->sp_id,
|
|
|
- pool->sp_stats.packets,
|
|
|
+ (unsigned long)atomic_long_read(&pool->sp_stats.packets),
|
|
|
pool->sp_stats.sockets_queued,
|
|
|
- pool->sp_stats.threads_woken,
|
|
|
- pool->sp_stats.threads_timedout);
|
|
|
+ (unsigned long)atomic_long_read(&pool->sp_stats.threads_woken),
|
|
|
+ (unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout));
|
|
|
|
|
|
return 0;
|
|
|
}
|