|
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
|
|
|
rpclen = 0;
|
|
|
}
|
|
|
|
|
|
+ req->rl_xid = rqst->rq_xid;
|
|
|
+ rpcrdma_insert_req(&r_xprt->rx_buf, req);
|
|
|
+
|
|
|
/* This implementation supports the following combinations
|
|
|
* of chunk lists in one RPC-over-RDMA Call message:
|
|
|
*
|
|
@@ -987,11 +990,12 @@ rpcrdma_reply_handler(struct work_struct *work)
|
|
|
{
|
|
|
struct rpcrdma_rep *rep =
|
|
|
container_of(work, struct rpcrdma_rep, rr_work);
|
|
|
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
|
|
|
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
|
|
|
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
|
|
|
struct rpcrdma_msg *headerp;
|
|
|
struct rpcrdma_req *req;
|
|
|
struct rpc_rqst *rqst;
|
|
|
- struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
|
|
|
- struct rpc_xprt *xprt = &r_xprt->rx_xprt;
|
|
|
__be32 *iptr;
|
|
|
int rdmalen, status, rmerr;
|
|
|
unsigned long cwnd;
|
|
@@ -1013,28 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
|
|
|
/* Match incoming rpcrdma_rep to an rpcrdma_req to
|
|
|
* get context for handling any incoming chunks.
|
|
|
*/
|
|
|
- spin_lock_bh(&xprt->transport_lock);
|
|
|
- rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
|
|
|
- if (!rqst)
|
|
|
+ spin_lock(&buf->rb_lock);
|
|
|
+ req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
|
|
|
+ headerp->rm_xid);
|
|
|
+ if (!req)
|
|
|
goto out_nomatch;
|
|
|
-
|
|
|
- req = rpcr_to_rdmar(rqst);
|
|
|
if (req->rl_reply)
|
|
|
goto out_duplicate;
|
|
|
|
|
|
- /* Sanity checking has passed. We are now committed
|
|
|
- * to complete this transaction.
|
|
|
- */
|
|
|
list_replace_init(&req->rl_registered, &mws);
|
|
|
rpcrdma_mark_remote_invalidation(&mws, rep);
|
|
|
- list_del_init(&rqst->rq_list);
|
|
|
+
|
|
|
+ /* Avoid races with signals and duplicate replies
|
|
|
+ * by marking this req as matched.
|
|
|
+ */
|
|
|
req->rl_reply = rep;
|
|
|
- spin_unlock_bh(&xprt->transport_lock);
|
|
|
+ spin_unlock(&buf->rb_lock);
|
|
|
+
|
|
|
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
|
|
|
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
|
|
|
|
|
|
- xprt->reestablish_timeout = 0;
|
|
|
+ /* Invalidate and unmap the data payloads before waking the
|
|
|
+ * waiting application. This guarantees the memory regions
|
|
|
+ * are properly fenced from the server before the application
|
|
|
+ * accesses the data. It also ensures proper send flow control:
|
|
|
+ * waking the next RPC waits until this RPC has relinquished
|
|
|
+ * all its Send Queue entries.
|
|
|
+ */
|
|
|
+ if (!list_empty(&mws))
|
|
|
+ r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
|
|
|
|
|
|
+ /* Perform XID lookup, reconstruction of the RPC reply, and
|
|
|
+ * RPC completion while holding the transport lock to ensure
|
|
|
+ * the rep, rqst, and rq_task pointers remain stable.
|
|
|
+ */
|
|
|
+ spin_lock_bh(&xprt->transport_lock);
|
|
|
+ rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
|
|
|
+ if (!rqst)
|
|
|
+ goto out_norqst;
|
|
|
+ xprt->reestablish_timeout = 0;
|
|
|
if (headerp->rm_vers != rpcrdma_version)
|
|
|
goto out_badversion;
|
|
|
|
|
@@ -1109,17 +1130,6 @@ badheader:
|
|
|
}
|
|
|
|
|
|
out:
|
|
|
- /* Invalidate and flush the data payloads before waking the
|
|
|
- * waiting application. This guarantees the memory region is
|
|
|
- * properly fenced from the server before the application
|
|
|
- * accesses the data. It also ensures proper send flow
|
|
|
- * control: waking the next RPC waits until this RPC has
|
|
|
- * relinquished all its Send Queue entries.
|
|
|
- */
|
|
|
- if (!list_empty(&mws))
|
|
|
- r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
|
|
|
-
|
|
|
- spin_lock_bh(&xprt->transport_lock);
|
|
|
cwnd = xprt->cwnd;
|
|
|
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
|
|
|
if (xprt->cwnd > cwnd)
|
|
@@ -1128,7 +1138,7 @@ out:
|
|
|
xprt_complete_rqst(rqst->rq_task, status);
|
|
|
spin_unlock_bh(&xprt->transport_lock);
|
|
|
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
|
|
|
- __func__, xprt, rqst, status);
|
|
|
+ __func__, xprt, rqst, status);
|
|
|
return;
|
|
|
|
|
|
out_badstatus:
|
|
@@ -1177,26 +1187,37 @@ out_rdmaerr:
|
|
|
r_xprt->rx_stats.bad_reply_count++;
|
|
|
goto out;
|
|
|
|
|
|
-/* If no pending RPC transaction was matched, post a replacement
|
|
|
- * receive buffer before returning.
|
|
|
+/* The req was still available, but by the time the transport_lock
|
|
|
+ * was acquired, the rqst and task had been released. Thus the RPC
|
|
|
+ * has already been terminated.
|
|
|
*/
|
|
|
+out_norqst:
|
|
|
+ spin_unlock_bh(&xprt->transport_lock);
|
|
|
+ rpcrdma_buffer_put(req);
|
|
|
+ dprintk("RPC: %s: race, no rqst left for req %p\n",
|
|
|
+ __func__, req);
|
|
|
+ return;
|
|
|
+
|
|
|
out_shortreply:
|
|
|
dprintk("RPC: %s: short/invalid reply\n", __func__);
|
|
|
goto repost;
|
|
|
|
|
|
out_nomatch:
|
|
|
- spin_unlock_bh(&xprt->transport_lock);
|
|
|
+ spin_unlock(&buf->rb_lock);
|
|
|
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
|
|
|
__func__, be32_to_cpu(headerp->rm_xid),
|
|
|
rep->rr_len);
|
|
|
goto repost;
|
|
|
|
|
|
out_duplicate:
|
|
|
- spin_unlock_bh(&xprt->transport_lock);
|
|
|
+ spin_unlock(&buf->rb_lock);
|
|
|
dprintk("RPC: %s: "
|
|
|
"duplicate reply %p to RPC request %p: xid 0x%08x\n",
|
|
|
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
|
|
|
|
|
|
+/* If no pending RPC transaction was matched, post a replacement
|
|
|
+ * receive buffer before returning.
|
|
|
+ */
|
|
|
repost:
|
|
|
r_xprt->rx_stats.bad_reply_count++;
|
|
|
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
|