|
@@ -449,77 +449,72 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
|
|
|
/*
|
|
|
* The RDMA allocate/free functions need the task structure as a place
|
|
|
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
|
|
|
- * sequence. For this reason, the recv buffers are attached to send
|
|
|
- * buffers for portions of the RPC. Note that the RPC layer allocates
|
|
|
- * both send and receive buffers in the same call. We may register
|
|
|
- * the receive buffer portion when using reply chunks.
|
|
|
+ * sequence.
|
|
|
+ *
|
|
|
+ * The RPC layer allocates both send and receive buffers in the same call
|
|
|
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
|
|
|
+ * We may register rq_rcv_buf when using reply chunks.
|
|
|
*/
|
|
|
static void *
|
|
|
xprt_rdma_allocate(struct rpc_task *task, size_t size)
|
|
|
{
|
|
|
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
|
|
|
- struct rpcrdma_req *req, *nreq;
|
|
|
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
|
|
+ struct rpcrdma_regbuf *rb;
|
|
|
+ struct rpcrdma_req *req;
|
|
|
+ size_t min_size;
|
|
|
+ gfp_t flags = task->tk_flags & RPC_TASK_SWAPPER ?
|
|
|
+ GFP_ATOMIC : GFP_NOFS;
|
|
|
|
|
|
- req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
|
|
|
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
|
|
|
if (req == NULL)
|
|
|
return NULL;
|
|
|
|
|
|
- if (size > req->rl_size) {
|
|
|
- dprintk("RPC: %s: size %zd too large for buffer[%zd]: "
|
|
|
- "prog %d vers %d proc %d\n",
|
|
|
- __func__, size, req->rl_size,
|
|
|
- task->tk_client->cl_prog, task->tk_client->cl_vers,
|
|
|
- task->tk_msg.rpc_proc->p_proc);
|
|
|
- /*
|
|
|
- * Outgoing length shortage. Our inline write max must have
|
|
|
- * been configured to perform direct i/o.
|
|
|
- *
|
|
|
- * This is therefore a large metadata operation, and the
|
|
|
- * allocate call was made on the maximum possible message,
|
|
|
- * e.g. containing long filename(s) or symlink data. In
|
|
|
- * fact, while these metadata operations *might* carry
|
|
|
- * large outgoing payloads, they rarely *do*. However, we
|
|
|
- * have to commit to the request here, so reallocate and
|
|
|
- * register it now. The data path will never require this
|
|
|
- * reallocation.
|
|
|
- *
|
|
|
- * If the allocation or registration fails, the RPC framework
|
|
|
- * will (doggedly) retry.
|
|
|
- */
|
|
|
- if (task->tk_flags & RPC_TASK_SWAPPER)
|
|
|
- nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
|
|
|
- else
|
|
|
- nreq = kmalloc(sizeof *req + size, GFP_NOFS);
|
|
|
- if (nreq == NULL)
|
|
|
- goto outfail;
|
|
|
-
|
|
|
- if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
|
|
|
- nreq->rl_base, size + sizeof(struct rpcrdma_req)
|
|
|
- - offsetof(struct rpcrdma_req, rl_base),
|
|
|
- &nreq->rl_handle, &nreq->rl_iov)) {
|
|
|
- kfree(nreq);
|
|
|
- goto outfail;
|
|
|
- }
|
|
|
- rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
|
|
|
- nreq->rl_size = size;
|
|
|
- nreq->rl_niovs = 0;
|
|
|
- nreq->rl_nchunks = 0;
|
|
|
- nreq->rl_buffer = (struct rpcrdma_buffer *)req;
|
|
|
- nreq->rl_reply = req->rl_reply;
|
|
|
- memcpy(nreq->rl_segments,
|
|
|
- req->rl_segments, sizeof nreq->rl_segments);
|
|
|
- /* flag the swap with an unused field */
|
|
|
- nreq->rl_iov.length = 0;
|
|
|
- req->rl_reply = NULL;
|
|
|
- req = nreq;
|
|
|
- }
|
|
|
+ if (req->rl_sendbuf == NULL)
|
|
|
+ goto out_sendbuf;
|
|
|
+ if (size > req->rl_sendbuf->rg_size)
|
|
|
+ goto out_sendbuf;
|
|
|
+
|
|
|
+out:
|
|
|
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
|
|
|
req->rl_connect_cookie = 0; /* our reserved value */
|
|
|
- return req->rl_xdr_buf;
|
|
|
-
|
|
|
-outfail:
|
|
|
+ return req->rl_sendbuf->rg_base;
|
|
|
+
|
|
|
+out_sendbuf:
|
|
|
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
|
|
|
+ * yet occurred. Thus a lower bound is needed to prevent buffer
|
|
|
+ * overrun during marshaling.
|
|
|
+ *
|
|
|
+ * RPC/RDMA marshaling may choose to send payload bearing ops
|
|
|
+ * inline, if the result is smaller than the inline threshold.
|
|
|
+ * The value of the "size" argument accounts for header
|
|
|
+ * requirements but not for the payload in these cases.
|
|
|
+ *
|
|
|
+ * Likewise, allocate enough space to receive a reply up to the
|
|
|
+ * size of the inline threshold.
|
|
|
+ *
|
|
|
+ * It's unlikely that both the send header and the received
|
|
|
+ * reply will be large, but slush is provided here to allow
|
|
|
+ * flexibility when marshaling.
|
|
|
+ */
|
|
|
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
|
|
|
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
|
|
|
+ if (size < min_size)
|
|
|
+ size = min_size;
|
|
|
+
|
|
|
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
|
|
|
+ if (IS_ERR(rb))
|
|
|
+ goto out_fail;
|
|
|
+ rb->rg_owner = req;
|
|
|
+
|
|
|
+ r_xprt->rx_stats.hardway_register_count += size;
|
|
|
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
|
|
|
+ req->rl_sendbuf = rb;
|
|
|
+ goto out;
|
|
|
+
|
|
|
+out_fail:
|
|
|
rpcrdma_buffer_put(req);
|
|
|
- rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
|
|
|
+ r_xprt->rx_stats.failed_marshal_count++;
|
|
|
return NULL;
|
|
|
}
|
|
|
|
|
@@ -531,47 +526,24 @@ xprt_rdma_free(void *buffer)
|
|
|
{
|
|
|
struct rpcrdma_req *req;
|
|
|
struct rpcrdma_xprt *r_xprt;
|
|
|
- struct rpcrdma_rep *rep;
|
|
|
+ struct rpcrdma_regbuf *rb;
|
|
|
int i;
|
|
|
|
|
|
if (buffer == NULL)
|
|
|
return;
|
|
|
|
|
|
- req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
|
|
|
- if (req->rl_iov.length == 0) { /* see allocate above */
|
|
|
- r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
|
|
|
- struct rpcrdma_xprt, rx_buf);
|
|
|
- } else
|
|
|
- r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
|
|
|
- rep = req->rl_reply;
|
|
|
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
|
|
|
+ req = rb->rg_owner;
|
|
|
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
|
|
|
|
|
|
- dprintk("RPC: %s: called on 0x%p%s\n",
|
|
|
- __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
|
|
|
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
|
|
|
|
|
|
- /*
|
|
|
- * Finish the deregistration. The process is considered
|
|
|
- * complete when the rr_func vector becomes NULL - this
|
|
|
- * was put in place during rpcrdma_reply_handler() - the wait
|
|
|
- * call below will not block if the dereg is "done". If
|
|
|
- * interrupted, our framework will clean up.
|
|
|
- */
|
|
|
for (i = 0; req->rl_nchunks;) {
|
|
|
--req->rl_nchunks;
|
|
|
i += rpcrdma_deregister_external(
|
|
|
&req->rl_segments[i], r_xprt);
|
|
|
}
|
|
|
|
|
|
- if (req->rl_iov.length == 0) { /* see allocate above */
|
|
|
- struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
|
|
|
- oreq->rl_reply = req->rl_reply;
|
|
|
- (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
|
|
|
- req->rl_handle,
|
|
|
- &req->rl_iov);
|
|
|
- kfree(req);
|
|
|
- req = oreq;
|
|
|
- }
|
|
|
-
|
|
|
- /* Put back request+reply buffers */
|
|
|
rpcrdma_buffer_put(req);
|
|
|
}
|
|
|
|