Browse Source

Merge branch 'for_4.11/net-next/rds_v3' of git://git.kernel.org/pub/scm/linux/kernel/git/ssantosh/linux

Santosh Shilimkar says:

====================
net: RDS updates

v2->v3:
- Re-based against latest net-next head.
- Dropped a user visible change after discussing with David Miller.
  It needs some more work to fully support old/new tools matrix.
- Addressed Dave's comment about bool usage in patch
  "RDS: IB: track and log active side..."

v1->v2:
Re-aligned indentation in patch 'RDS: mark few internal functions..."

Series consist of:
 - RDMA transport fixes for map failure, listen sequence, handler panic and
   composite message notification.
 - Couple of sparse fixes.
 - Message logging improvements for bind failure, use once mr semantics
   and connection remote address, active end point.
 - Performance improvement for RDMA transport by reducing the post send
   pressure on the queue and spreading the CQ vectors.
 - Useful statistics for socket send/recv usage and receive cache usage.
 - Additional RDS CMSG used by application to track the RDS message
   stages for certain type of traffic to find out latency spots.
   Can be enabled/disabled per socket.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 9 years ago
parent
commit
303b0d4be1

+ 33 - 0
include/uapi/linux/rds.h

@@ -52,6 +52,13 @@
 #define RDS_GET_MR_FOR_DEST		7
 #define SO_RDS_TRANSPORT		8
 
+/* Socket option to tap receive path latency
+ *	SO_RDS: SO_RDS_MSG_RXPATH_LATENCY
+ *	Format used struct rds_rx_trace_so
+ */
+#define SO_RDS_MSG_RXPATH_LATENCY	10
+
+
 /* supported values for SO_RDS_TRANSPORT */
 #define	RDS_TRANS_IB	0
 #define	RDS_TRANS_IWARP	1
@@ -77,6 +84,12 @@
  *	the same as for the GET_MR setsockopt.
  * RDS_CMSG_RDMA_STATUS (recvmsg)
  *	Returns the status of a completed RDMA operation.
+ * RDS_CMSG_RXPATH_LATENCY(recvmsg)
+ *	Returns rds message latencies in various stages of receive
+ *	path in nS. Its set per socket using SO_RDS_MSG_RXPATH_LATENCY
+ *	socket option. Legitimate points are defined in
+ *	enum rds_message_rxpath_latency. More points can be added in
+ *	future. CSMG format is struct rds_cmsg_rx_trace.
  */
 #define RDS_CMSG_RDMA_ARGS		1
 #define RDS_CMSG_RDMA_DEST		2
@@ -87,6 +100,7 @@
 #define RDS_CMSG_ATOMIC_CSWP		7
 #define RDS_CMSG_MASKED_ATOMIC_FADD	8
 #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
+#define RDS_CMSG_RXPATH_LATENCY		11
 
 #define RDS_INFO_FIRST			10000
 #define RDS_INFO_COUNTERS		10000
@@ -171,6 +185,25 @@ struct rds_info_rdma_connection {
 	uint32_t	rdma_mr_size;
 };
 
+/* RDS message Receive Path Latency points */
+enum rds_message_rxpath_latency {
+	RDS_MSG_RX_HDR_TO_DGRAM_START = 0,
+	RDS_MSG_RX_DGRAM_REASSEMBLE,
+	RDS_MSG_RX_DGRAM_DELIVERED,
+	RDS_MSG_RX_DGRAM_TRACE_MAX
+};
+
+struct rds_rx_trace_so {
+	u8 rx_traces;
+	u8 rx_trace_pos[RDS_MSG_RX_DGRAM_TRACE_MAX];
+};
+
+struct rds_cmsg_rx_trace {
+	u8 rx_traces;
+	u8 rx_trace_pos[RDS_MSG_RX_DGRAM_TRACE_MAX];
+	u64 rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+};
+
 /*
  * Congestion monitoring.
  * Congestion control in RDS happens at the host connection

+ 28 - 0
net/rds/af_rds.c

@@ -298,6 +298,30 @@ static int rds_enable_recvtstamp(struct sock *sk, char __user *optval,
 	return 0;
 }
 
+static int rds_recv_track_latency(struct rds_sock *rs, char __user *optval,
+				  int optlen)
+{
+	struct rds_rx_trace_so trace;
+	int i;
+
+	if (optlen != sizeof(struct rds_rx_trace_so))
+		return -EFAULT;
+
+	if (copy_from_user(&trace, optval, sizeof(trace)))
+		return -EFAULT;
+
+	rs->rs_rx_traces = trace.rx_traces;
+	for (i = 0; i < rs->rs_rx_traces; i++) {
+		if (trace.rx_trace_pos[i] > RDS_MSG_RX_DGRAM_TRACE_MAX) {
+			rs->rs_rx_traces = 0;
+			return -EFAULT;
+		}
+		rs->rs_rx_trace[i] = trace.rx_trace_pos[i];
+	}
+
+	return 0;
+}
+
 static int rds_setsockopt(struct socket *sock, int level, int optname,
 			  char __user *optval, unsigned int optlen)
 {
@@ -338,6 +362,9 @@ static int rds_setsockopt(struct socket *sock, int level, int optname,
 		ret = rds_enable_recvtstamp(sock->sk, optval, optlen);
 		release_sock(sock->sk);
 		break;
+	case SO_RDS_MSG_RXPATH_LATENCY:
+		ret = rds_recv_track_latency(rs, optval, optlen);
+		break;
 	default:
 		ret = -ENOPROTOOPT;
 	}
@@ -484,6 +511,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_cong_list);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
+	rs->rs_rx_traces = 0;
 
 	spin_lock_bh(&rds_sock_lock);
 	list_add_tail(&rs->rs_item, &rds_sock_list);

+ 2 - 2
net/rds/bind.c

@@ -176,8 +176,8 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
 	if (!trans) {
 		ret = -EADDRNOTAVAIL;
 		rds_remove_bound(rs);
-		printk_ratelimited(KERN_INFO "RDS: rds_bind() could not find a transport, "
-				"load rds_tcp or rds_rdma?\n");
+		pr_info_ratelimited("RDS: %s could not find a transport for %pI4, load rds_tcp or rds_rdma?\n",
+				    __func__, &sin->sin_addr.s_addr);
 		goto out;
 	}
 

+ 5 - 5
net/rds/connection.c

@@ -545,11 +545,11 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 }
 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
 
-void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
-			     struct rds_info_iterator *iter,
-			     struct rds_info_lengths *lens,
-			     int (*visitor)(struct rds_conn_path *, void *),
-			     size_t item_len)
+static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
+				    struct rds_info_iterator *iter,
+				    struct rds_info_lengths *lens,
+				    int (*visitor)(struct rds_conn_path *, void *),
+				    size_t item_len)
 {
 	u64  buffer[(item_len + 7) / 8];
 	struct hlist_head *head;

+ 11 - 0
net/rds/ib.c

@@ -111,6 +111,9 @@ static void rds_ib_dev_free(struct work_struct *work)
 		kfree(i_ipaddr);
 	}
 
+	if (rds_ibdev->vector_load)
+		kfree(rds_ibdev->vector_load);
+
 	kfree(rds_ibdev);
 }
 
@@ -159,6 +162,14 @@ static void rds_ib_add_one(struct ib_device *device)
 	rds_ibdev->max_initiator_depth = device->attrs.max_qp_init_rd_atom;
 	rds_ibdev->max_responder_resources = device->attrs.max_qp_rd_atom;
 
+	rds_ibdev->vector_load = kzalloc(sizeof(int) * device->num_comp_vectors,
+					 GFP_KERNEL);
+	if (!rds_ibdev->vector_load) {
+		pr_err("RDS/IB: %s failed to allocate vector memory\n",
+			__func__);
+		goto put_dev;
+	}
+
 	rds_ibdev->dev = device;
 	rds_ibdev->pd = ib_alloc_pd(device, 0);
 	if (IS_ERR(rds_ibdev->pd)) {

+ 20 - 2
net/rds/ib.h

@@ -14,9 +14,10 @@
 
 #define RDS_IB_DEFAULT_RECV_WR		1024
 #define RDS_IB_DEFAULT_SEND_WR		256
-#define RDS_IB_DEFAULT_FR_WR		512
+#define RDS_IB_DEFAULT_FR_WR		256
+#define RDS_IB_DEFAULT_FR_INV_WR	256
 
-#define RDS_IB_DEFAULT_RETRY_COUNT	2
+#define RDS_IB_DEFAULT_RETRY_COUNT	1
 
 #define RDS_IB_SUPPORTED_PROTOCOLS	0x00000003	/* minor versions supported */
 
@@ -125,6 +126,7 @@ struct rds_ib_connection {
 
 	/* To control the number of wrs from fastreg */
 	atomic_t		i_fastreg_wrs;
+	atomic_t		i_fastunreg_wrs;
 
 	/* interrupt handling */
 	struct tasklet_struct	i_send_tasklet;
@@ -149,6 +151,7 @@ struct rds_ib_connection {
 	u64			i_ack_recv;	/* last ACK received */
 	struct rds_ib_refill_cache i_cache_incs;
 	struct rds_ib_refill_cache i_cache_frags;
+	atomic_t		i_cache_allocs;
 
 	/* sending acks */
 	unsigned long		i_ack_flags;
@@ -179,6 +182,14 @@ struct rds_ib_connection {
 
 	/* Batched completions */
 	unsigned int		i_unsignaled_wrs;
+
+	/* Endpoint role in connection */
+	bool			i_active_side;
+	atomic_t		i_cq_quiesce;
+
+	/* Send/Recv vectors */
+	int			i_scq_vector;
+	int			i_rcq_vector;
 };
 
 /* This assumes that atomic_t is at least 32 bits */
@@ -221,6 +232,7 @@ struct rds_ib_device {
 	spinlock_t		spinlock;	/* protect the above */
 	atomic_t		refcount;
 	struct work_struct	free_work;
+	int			*vector_load;
 };
 
 #define ibdev_to_node(ibdev) dev_to_node(ibdev->dma_device)
@@ -249,6 +261,8 @@ struct rds_ib_statistics {
 	uint64_t	s_ib_rx_refill_from_cq;
 	uint64_t	s_ib_rx_refill_from_thread;
 	uint64_t	s_ib_rx_alloc_limit;
+	uint64_t	s_ib_rx_total_frags;
+	uint64_t	s_ib_rx_total_incs;
 	uint64_t	s_ib_rx_credit_updates;
 	uint64_t	s_ib_ack_sent;
 	uint64_t	s_ib_ack_send_failure;
@@ -271,6 +285,8 @@ struct rds_ib_statistics {
 	uint64_t	s_ib_rdma_mr_1m_reused;
 	uint64_t	s_ib_atomic_cswp;
 	uint64_t	s_ib_atomic_fadd;
+	uint64_t	s_ib_recv_added_to_cache;
+	uint64_t	s_ib_recv_removed_from_cache;
 };
 
 extern struct workqueue_struct *rds_ib_wq;
@@ -401,6 +417,8 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
 /* ib_stats.c */
 DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
 #define rds_ib_stats_inc(member) rds_stats_inc_which(rds_ib_stats, member)
+#define rds_ib_stats_add(member, count) \
+		rds_stats_add_which(rds_ib_stats, member, count)
 unsigned int rds_ib_stats_info_copy(struct rds_info_iterator *iter,
 				    unsigned int avail);
 

+ 71 - 18
net/rds/ib_cm.c

@@ -113,24 +113,26 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
 	}
 
 	if (conn->c_version < RDS_PROTOCOL(3, 1)) {
-		printk(KERN_NOTICE "RDS/IB: Connection to %pI4 version %u.%u failed,"
-		       " no longer supported\n",
-		       &conn->c_faddr,
-		       RDS_PROTOCOL_MAJOR(conn->c_version),
-		       RDS_PROTOCOL_MINOR(conn->c_version));
+		pr_notice("RDS/IB: Connection <%pI4,%pI4> version %u.%u no longer supported\n",
+			  &conn->c_laddr, &conn->c_faddr,
+			  RDS_PROTOCOL_MAJOR(conn->c_version),
+			  RDS_PROTOCOL_MINOR(conn->c_version));
 		rds_conn_destroy(conn);
 		return;
 	} else {
-		printk(KERN_NOTICE "RDS/IB: connected to %pI4 version %u.%u%s\n",
-		       &conn->c_faddr,
-		       RDS_PROTOCOL_MAJOR(conn->c_version),
-		       RDS_PROTOCOL_MINOR(conn->c_version),
-		       ic->i_flowctl ? ", flow control" : "");
+		pr_notice("RDS/IB: %s conn connected <%pI4,%pI4> version %u.%u%s\n",
+			  ic->i_active_side ? "Active" : "Passive",
+			  &conn->c_laddr, &conn->c_faddr,
+			  RDS_PROTOCOL_MAJOR(conn->c_version),
+			  RDS_PROTOCOL_MINOR(conn->c_version),
+			  ic->i_flowctl ? ", flow control" : "");
 	}
 
-	/*
-	 * Init rings and fill recv. this needs to wait until protocol negotiation
-	 * is complete, since ring layout is different from 3.0 to 3.1.
+	atomic_set(&ic->i_cq_quiesce, 0);
+
+	/* Init rings and fill recv. this needs to wait until protocol
+	 * negotiation is complete, since ring layout is different
+	 * from 3.1 to 4.1.
 	 */
 	rds_ib_send_init_ring(ic);
 	rds_ib_recv_init_ring(ic);
@@ -267,6 +269,10 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
 
 	rds_ib_stats_inc(s_ib_tasklet_call);
 
+	/* if cq has been already reaped, ignore incoming cq event */
+	if (atomic_read(&ic->i_cq_quiesce))
+		return;
+
 	poll_scq(ic, ic->i_send_cq, ic->i_send_wc);
 	ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
 	poll_scq(ic, ic->i_send_cq, ic->i_send_wc);
@@ -308,6 +314,10 @@ static void rds_ib_tasklet_fn_recv(unsigned long data)
 
 	rds_ib_stats_inc(s_ib_tasklet_call);
 
+	/* if cq has been already reaped, ignore incoming cq event */
+	if (atomic_read(&ic->i_cq_quiesce))
+		return;
+
 	memset(&state, 0, sizeof(state));
 	poll_rcq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
 	ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
@@ -358,6 +368,28 @@ static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
 	tasklet_schedule(&ic->i_send_tasklet);
 }
 
+static inline int ibdev_get_unused_vector(struct rds_ib_device *rds_ibdev)
+{
+	int min = rds_ibdev->vector_load[rds_ibdev->dev->num_comp_vectors - 1];
+	int index = rds_ibdev->dev->num_comp_vectors - 1;
+	int i;
+
+	for (i = rds_ibdev->dev->num_comp_vectors - 1; i >= 0; i--) {
+		if (rds_ibdev->vector_load[i] < min) {
+			index = i;
+			min = rds_ibdev->vector_load[i];
+		}
+	}
+
+	rds_ibdev->vector_load[index]++;
+	return index;
+}
+
+static inline void ibdev_put_vector(struct rds_ib_device *rds_ibdev, int index)
+{
+	rds_ibdev->vector_load[index]--;
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -383,7 +415,10 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 	 * completion queue and send queue. This extra space is used for FRMR
 	 * registration and invalidation work requests
 	 */
-	fr_queue_space = (rds_ibdev->use_fastreg ? RDS_IB_DEFAULT_FR_WR : 0);
+	fr_queue_space = rds_ibdev->use_fastreg ?
+			 (RDS_IB_DEFAULT_FR_WR + 1) +
+			 (RDS_IB_DEFAULT_FR_INV_WR + 1)
+			 : 0;
 
 	/* add the conn now so that connection establishment has the dev */
 	rds_ib_add_conn(rds_ibdev, conn);
@@ -396,25 +431,30 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 	/* Protection domain and memory range */
 	ic->i_pd = rds_ibdev->pd;
 
+	ic->i_scq_vector = ibdev_get_unused_vector(rds_ibdev);
 	cq_attr.cqe = ic->i_send_ring.w_nr + fr_queue_space + 1;
-
+	cq_attr.comp_vector = ic->i_scq_vector;
 	ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
 				     rds_ib_cq_event_handler, conn,
 				     &cq_attr);
 	if (IS_ERR(ic->i_send_cq)) {
 		ret = PTR_ERR(ic->i_send_cq);
 		ic->i_send_cq = NULL;
+		ibdev_put_vector(rds_ibdev, ic->i_scq_vector);
 		rdsdebug("ib_create_cq send failed: %d\n", ret);
 		goto out;
 	}
 
+	ic->i_rcq_vector = ibdev_get_unused_vector(rds_ibdev);
 	cq_attr.cqe = ic->i_recv_ring.w_nr;
+	cq_attr.comp_vector = ic->i_rcq_vector;
 	ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
 				     rds_ib_cq_event_handler, conn,
 				     &cq_attr);
 	if (IS_ERR(ic->i_recv_cq)) {
 		ret = PTR_ERR(ic->i_recv_cq);
 		ic->i_recv_cq = NULL;
+		ibdev_put_vector(rds_ibdev, ic->i_rcq_vector);
 		rdsdebug("ib_create_cq recv failed: %d\n", ret);
 		goto out;
 	}
@@ -445,6 +485,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
 	attr.send_cq = ic->i_send_cq;
 	attr.recv_cq = ic->i_recv_cq;
 	atomic_set(&ic->i_fastreg_wrs, RDS_IB_DEFAULT_FR_WR);
+	atomic_set(&ic->i_fastunreg_wrs, RDS_IB_DEFAULT_FR_INV_WR);
 
 	/*
 	 * XXX this can fail if max_*_wr is too large?  Are we supposed
@@ -682,6 +723,7 @@ out:
 		if (ic->i_cm_id == cm_id)
 			ret = 0;
 	}
+	ic->i_active_side = true;
 	return ret;
 }
 
@@ -767,17 +809,27 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp)
 		wait_event(rds_ib_ring_empty_wait,
 			   rds_ib_ring_empty(&ic->i_recv_ring) &&
 			   (atomic_read(&ic->i_signaled_sends) == 0) &&
-			   (atomic_read(&ic->i_fastreg_wrs) == RDS_IB_DEFAULT_FR_WR));
+			   (atomic_read(&ic->i_fastreg_wrs) == RDS_IB_DEFAULT_FR_WR) &&
+			   (atomic_read(&ic->i_fastunreg_wrs) == RDS_IB_DEFAULT_FR_INV_WR));
 		tasklet_kill(&ic->i_send_tasklet);
 		tasklet_kill(&ic->i_recv_tasklet);
 
+		atomic_set(&ic->i_cq_quiesce, 1);
+
 		/* first destroy the ib state that generates callbacks */
 		if (ic->i_cm_id->qp)
 			rdma_destroy_qp(ic->i_cm_id);
-		if (ic->i_send_cq)
+		if (ic->i_send_cq) {
+			if (ic->rds_ibdev)
+				ibdev_put_vector(ic->rds_ibdev, ic->i_scq_vector);
 			ib_destroy_cq(ic->i_send_cq);
-		if (ic->i_recv_cq)
+		}
+
+		if (ic->i_recv_cq) {
+			if (ic->rds_ibdev)
+				ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector);
 			ib_destroy_cq(ic->i_recv_cq);
+		}
 
 		/* then free the resources that ib callbacks use */
 		if (ic->i_send_hdrs)
@@ -855,6 +907,7 @@ void rds_ib_conn_path_shutdown(struct rds_conn_path *cp)
 	ic->i_sends = NULL;
 	vfree(ic->i_recvs);
 	ic->i_recvs = NULL;
+	ic->i_active_side = false;
 }
 
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)

+ 9 - 7
net/rds/ib_frmr.c

@@ -104,14 +104,15 @@ static int rds_ib_post_reg_frmr(struct rds_ib_mr *ibmr)
 	struct rds_ib_frmr *frmr = &ibmr->u.frmr;
 	struct ib_send_wr *failed_wr;
 	struct ib_reg_wr reg_wr;
-	int ret;
+	int ret, off = 0;
 
 	while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) {
 		atomic_inc(&ibmr->ic->i_fastreg_wrs);
 		cpu_relax();
 	}
 
-	ret = ib_map_mr_sg_zbva(frmr->mr, ibmr->sg, ibmr->sg_len, 0, PAGE_SIZE);
+	ret = ib_map_mr_sg_zbva(frmr->mr, ibmr->sg, ibmr->sg_len,
+				&off, PAGE_SIZE);
 	if (unlikely(ret != ibmr->sg_len))
 		return ret < 0 ? ret : -EINVAL;
 
@@ -240,8 +241,8 @@ static int rds_ib_post_inv(struct rds_ib_mr *ibmr)
 	if (frmr->fr_state != FRMR_IS_INUSE)
 		goto out;
 
-	while (atomic_dec_return(&ibmr->ic->i_fastreg_wrs) <= 0) {
-		atomic_inc(&ibmr->ic->i_fastreg_wrs);
+	while (atomic_dec_return(&ibmr->ic->i_fastunreg_wrs) <= 0) {
+		atomic_inc(&ibmr->ic->i_fastunreg_wrs);
 		cpu_relax();
 	}
 
@@ -260,7 +261,7 @@ static int rds_ib_post_inv(struct rds_ib_mr *ibmr)
 	if (unlikely(ret)) {
 		frmr->fr_state = FRMR_IS_STALE;
 		frmr->fr_inv = false;
-		atomic_inc(&ibmr->ic->i_fastreg_wrs);
+		atomic_inc(&ibmr->ic->i_fastunreg_wrs);
 		pr_err("RDS/IB: %s returned error(%d)\n", __func__, ret);
 		goto out;
 	}
@@ -288,9 +289,10 @@ void rds_ib_mr_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 	if (frmr->fr_inv) {
 		frmr->fr_state = FRMR_IS_FREE;
 		frmr->fr_inv = false;
+		atomic_inc(&ic->i_fastreg_wrs);
+	} else {
+		atomic_inc(&ic->i_fastunreg_wrs);
 	}
-
-	atomic_inc(&ic->i_fastreg_wrs);
 }
 
 void rds_ib_unreg_frmr(struct list_head *list, unsigned int *nfreed,

+ 12 - 2
net/rds/ib_recv.c

@@ -194,6 +194,8 @@ static void rds_ib_frag_free(struct rds_ib_connection *ic,
 	rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
 
 	rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
+	atomic_add(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs);
+	rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE);
 }
 
 /* Recycle inc after freeing attached frags */
@@ -261,6 +263,7 @@ static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *i
 			atomic_dec(&rds_ib_allocation);
 			return NULL;
 		}
+		rds_ib_stats_inc(s_ib_rx_total_incs);
 	}
 	INIT_LIST_HEAD(&ibinc->ii_frags);
 	rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
@@ -278,6 +281,8 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
 	cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
 	if (cache_item) {
 		frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
+		atomic_sub(RDS_FRAG_SIZE / SZ_1K, &ic->i_cache_allocs);
+		rds_ib_stats_add(s_ib_recv_added_to_cache, RDS_FRAG_SIZE);
 	} else {
 		frag = kmem_cache_alloc(rds_ib_frag_slab, slab_mask);
 		if (!frag)
@@ -290,6 +295,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
 			kmem_cache_free(rds_ib_frag_slab, frag);
 			return NULL;
 		}
+		rds_ib_stats_inc(s_ib_rx_total_frags);
 	}
 
 	INIT_LIST_HEAD(&frag->f_item);
@@ -905,8 +911,12 @@ static void rds_ib_process_recv(struct rds_connection *conn,
 		ic->i_ibinc = ibinc;
 
 		hdr = &ibinc->ii_inc.i_hdr;
+		ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] =
+				local_clock();
 		memcpy(hdr, ihdr, sizeof(*hdr));
 		ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
+		ibinc->ii_inc.i_rx_lat_trace[RDS_MSG_RX_START] =
+				local_clock();
 
 		rdsdebug("ic %p ibinc %p rem %u flag 0x%x\n", ic, ibinc,
 			 ic->i_recv_data_rem, hdr->h_flags);
@@ -980,8 +990,8 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
 	} else {
 		/* We expect errors as the qp is drained during shutdown */
 		if (rds_conn_up(conn) || rds_conn_connecting(conn))
-			rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
-					  &conn->c_faddr,
+			rds_ib_conn_error(conn, "recv completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n",
+					  &conn->c_laddr, &conn->c_faddr,
 					  wc->status,
 					  ib_wc_status_msg(wc->status));
 	}

+ 17 - 12
net/rds/ib_send.c

@@ -69,16 +69,6 @@ static void rds_ib_send_complete(struct rds_message *rm,
 	complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
-				   struct rm_data_op *op,
-				   int wc_status)
-{
-	if (op->op_nents)
-		ib_dma_unmap_sg(ic->i_cm_id->device,
-				op->op_sg, op->op_nents,
-				DMA_TO_DEVICE);
-}
-
 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
 				   struct rm_rdma_op *op,
 				   int wc_status)
@@ -139,6 +129,21 @@ static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
 		rds_ib_stats_inc(s_ib_atomic_fadd);
 }
 
+static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
+				   struct rm_data_op *op,
+				   int wc_status)
+{
+	struct rds_message *rm = container_of(op, struct rds_message, data);
+
+	if (op->op_nents)
+		ib_dma_unmap_sg(ic->i_cm_id->device,
+				op->op_sg, op->op_nents,
+				DMA_TO_DEVICE);
+
+	if (rm->rdma.op_active && rm->data.op_notify)
+		rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status);
+}
+
 /*
  * Unmap the resources associated with a struct send_work.
  *
@@ -300,8 +305,8 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 
 	/* We expect errors as the qp is drained during shutdown */
 	if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
-		rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
-				  &conn->c_faddr, wc->status,
+		rds_ib_conn_error(conn, "send completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n",
+				  &conn->c_laddr, &conn->c_faddr, wc->status,
 				  ib_wc_status_msg(wc->status));
 	}
 }

+ 2 - 0
net/rds/ib_stats.c

@@ -55,6 +55,8 @@ static const char *const rds_ib_stat_names[] = {
 	"ib_rx_refill_from_cq",
 	"ib_rx_refill_from_thread",
 	"ib_rx_alloc_limit",
+	"ib_rx_total_frags",
+	"ib_rx_total_incs",
 	"ib_rx_credit_updates",
 	"ib_ack_sent",
 	"ib_ack_send_failure",

+ 20 - 2
net/rds/rdma.c

@@ -40,7 +40,6 @@
 /*
  * XXX
  *  - build with sparse
- *  - should we limit the size of a mr region?  let transport return failure?
  *  - should we detect duplicate keys on a socket?  hmm.
  *  - an rdma is an mlock, apply rlimit?
  */
@@ -200,6 +199,14 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 		goto out;
 	}
 
+	/* Restrict the size of mr irrespective of underlying transport
+	 * To account for unaligned mr regions, subtract one from nr_pages
+	 */
+	if ((nr_pages - 1) > (RDS_MAX_MSG_SIZE >> PAGE_SHIFT)) {
+		ret = -EMSGSIZE;
+		goto out;
+	}
+
 	rdsdebug("RDS: get_mr addr %llx len %llu nr_pages %u\n",
 		args->vec.addr, args->vec.bytes, nr_pages);
 
@@ -415,7 +422,8 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
 	spin_lock_irqsave(&rs->rs_rdma_lock, flags);
 	mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
 	if (!mr) {
-		printk(KERN_ERR "rds: trying to unuse MR with unknown r_key %u!\n", r_key);
+		pr_debug("rds: trying to unuse MR with unknown r_key %u!\n",
+			 r_key);
 		spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 		return;
 	}
@@ -626,6 +634,16 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 		}
 		op->op_notifier->n_user_token = args->user_token;
 		op->op_notifier->n_status = RDS_RDMA_SUCCESS;
+
+		/* Enable rmda notification on data operation for composite
+		 * rds messages and make sure notification is enabled only
+		 * for the data operation which follows it so that application
+		 * gets notified only after full message gets delivered.
+		 */
+		if (rm->data.op_sg) {
+			rm->rdma.op_notify = 0;
+			rm->data.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+		}
 	}
 
 	/* The cookie contains the R_Key of the remote memory region, and

+ 3 - 8
net/rds/rdma_transport.c

@@ -206,18 +206,13 @@ static int rds_rdma_init(void)
 {
 	int ret;
 
-	ret = rds_rdma_listen_init();
+	ret = rds_ib_init();
 	if (ret)
 		goto out;
 
-	ret = rds_ib_init();
+	ret = rds_rdma_listen_init();
 	if (ret)
-		goto err_ib_init;
-
-	goto out;
-
-err_ib_init:
-	rds_rdma_listen_stop();
+		rds_ib_exit();
 out:
 	return ret;
 }

+ 17 - 0
net/rds/rds.h

@@ -50,6 +50,9 @@ void rdsdebug(char *fmt, ...)
 #define RDS_FRAG_SHIFT	12
 #define RDS_FRAG_SIZE	((unsigned int)(1 << RDS_FRAG_SHIFT))
 
+/* Used to limit both RDMA and non-RDMA RDS message to 1MB */
+#define RDS_MAX_MSG_SIZE	((unsigned int)(1 << 20))
+
 #define RDS_CONG_MAP_BYTES	(65536 / 8)
 #define RDS_CONG_MAP_PAGES	(PAGE_ALIGN(RDS_CONG_MAP_BYTES) / PAGE_SIZE)
 #define RDS_CONG_MAP_PAGE_BITS	(PAGE_SIZE * 8)
@@ -250,6 +253,11 @@ struct rds_ext_header_rdma_dest {
 #define RDS_EXTHDR_GEN_NUM	6
 
 #define __RDS_EXTHDR_MAX	16 /* for now */
+#define RDS_RX_MAX_TRACES	(RDS_MSG_RX_DGRAM_TRACE_MAX + 1)
+#define	RDS_MSG_RX_HDR		0
+#define	RDS_MSG_RX_START	1
+#define	RDS_MSG_RX_END		2
+#define	RDS_MSG_RX_CMSG		3
 
 struct rds_incoming {
 	atomic_t		i_refcount;
@@ -262,6 +270,7 @@ struct rds_incoming {
 
 	rds_rdma_cookie_t	i_rdma_cookie;
 	struct timeval		i_rx_tstamp;
+	u64			i_rx_lat_trace[RDS_RX_MAX_TRACES];
 };
 
 struct rds_mr {
@@ -419,6 +428,7 @@ struct rds_message {
 		} rdma;
 		struct rm_data_op {
 			unsigned int		op_active:1;
+			unsigned int		op_notify:1;
 			unsigned int		op_nents;
 			unsigned int		op_count;
 			unsigned int		op_dmasg;
@@ -571,6 +581,10 @@ struct rds_sock {
 	unsigned char		rs_recverr,
 				rs_cong_monitor;
 	u32			rs_hash_initval;
+
+	/* Socket receive path trace points*/
+	u8			rs_rx_traces;
+	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -630,6 +644,9 @@ struct rds_statistics {
 	uint64_t	s_cong_update_received;
 	uint64_t	s_cong_send_error;
 	uint64_t	s_cong_send_blocked;
+	uint64_t	s_recv_bytes_added_to_socket;
+	uint64_t	s_recv_bytes_removed_from_socket;
+
 };
 
 /* af_rds.c */

+ 33 - 3
net/rds/recv.c

@@ -43,6 +43,8 @@
 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
 		  __be32 saddr)
 {
+	int i;
+
 	atomic_set(&inc->i_refcount, 1);
 	INIT_LIST_HEAD(&inc->i_item);
 	inc->i_conn = conn;
@@ -50,6 +52,9 @@ void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
 	inc->i_rdma_cookie = 0;
 	inc->i_rx_tstamp.tv_sec = 0;
 	inc->i_rx_tstamp.tv_usec = 0;
+
+	for (i = 0; i < RDS_RX_MAX_TRACES; i++)
+		inc->i_rx_lat_trace[i] = 0;
 }
 EXPORT_SYMBOL_GPL(rds_inc_init);
 
@@ -94,6 +99,10 @@ static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
 		return;
 
 	rs->rs_rcv_bytes += delta;
+	if (delta > 0)
+		rds_stats_add(s_recv_bytes_added_to_socket, delta);
+	else
+		rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
 	now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
 
 	rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
@@ -369,6 +378,7 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 		if (sock_flag(sk, SOCK_RCVTSTAMP))
 			do_gettimeofday(&inc->i_rx_tstamp);
 		rds_inc_addref(inc);
+		inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
 		list_add_tail(&inc->i_item, &rs->rs_recv_queue);
 		__rds_wake_sk_sleep(sk);
 	} else {
@@ -530,7 +540,7 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
 				sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
 		if (ret)
-			return ret;
+			goto out;
 	}
 
 	if ((inc->i_rx_tstamp.tv_sec != 0) &&
@@ -539,10 +549,30 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
 			       sizeof(struct timeval),
 			       &inc->i_rx_tstamp);
 		if (ret)
-			return ret;
+			goto out;
 	}
 
-	return 0;
+	if (rs->rs_rx_traces) {
+		struct rds_cmsg_rx_trace t;
+		int i, j;
+
+		inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
+		t.rx_traces =  rs->rs_rx_traces;
+		for (i = 0; i < rs->rs_rx_traces; i++) {
+			j = rs->rs_rx_trace[i];
+			t.rx_trace_pos[i] = j;
+			t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
+					  inc->i_rx_lat_trace[j];
+		}
+
+		ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
+			       sizeof(t), &t);
+		if (ret)
+			goto out;
+	}
+
+out:
+	return ret;
 }
 
 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,

+ 46 - 4
net/rds/send.c

@@ -476,12 +476,14 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
 	struct rm_rdma_op *ro;
 	struct rds_notifier *notifier;
 	unsigned long flags;
+	unsigned int notify = 0;
 
 	spin_lock_irqsave(&rm->m_rs_lock, flags);
 
+	notify =  rm->rdma.op_notify | rm->data.op_notify;
 	ro = &rm->rdma;
 	if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
-	    ro->op_active && ro->op_notify && ro->op_notifier) {
+	    ro->op_active && notify && ro->op_notifier) {
 		notifier = ro->op_notifier;
 		rs = rm->m_rs;
 		sock_hold(rds_rs_to_sk(rs));
@@ -945,6 +947,11 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 			ret = rds_cmsg_rdma_map(rs, rm, cmsg);
 			if (!ret)
 				*allocated_mr = 1;
+			else if (ret == -ENODEV)
+				/* Accommodate the get_mr() case which can fail
+				 * if connection isn't established yet.
+				 */
+				ret = -EAGAIN;
 			break;
 		case RDS_CMSG_ATOMIC_CSWP:
 		case RDS_CMSG_ATOMIC_FADD:
@@ -987,6 +994,26 @@ static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
 	return hash;
 }
 
+static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
+{
+	struct rds_rdma_args *args;
+	struct cmsghdr *cmsg;
+
+	for_each_cmsghdr(cmsg, msg) {
+		if (!CMSG_OK(msg, cmsg))
+			return -EINVAL;
+
+		if (cmsg->cmsg_level != SOL_RDS)
+			continue;
+
+		if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
+			args = CMSG_DATA(cmsg);
+			*rdma_bytes += args->remote_vec.bytes;
+		}
+	}
+	return 0;
+}
+
 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 {
 	struct sock *sk = sock->sk;
@@ -1001,6 +1028,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 	int nonblock = msg->msg_flags & MSG_DONTWAIT;
 	long timeo = sock_sndtimeo(sk, nonblock);
 	struct rds_conn_path *cpath;
+	size_t total_payload_len = payload_len, rdma_payload_len = 0;
 
 	/* Mirror Linux UDP mirror of BSD error message compatibility */
 	/* XXX: Perhaps MSG_MORE someday */
@@ -1033,6 +1061,16 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 	}
 	release_sock(sk);
 
+	ret = rds_rdma_bytes(msg, &rdma_payload_len);
+	if (ret)
+		goto out;
+
+	total_payload_len += rdma_payload_len;
+	if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
+		ret = -EMSGSIZE;
+		goto out;
+	}
+
 	if (payload_len > rds_sk_sndbuf(rs)) {
 		ret = -EMSGSIZE;
 		goto out;
@@ -1082,8 +1120,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 
 	/* Parse any control messages the user may have included. */
 	ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
-	if (ret)
+	if (ret) {
+		/* Trigger connection so that its ready for the next retry */
+		if (ret ==  -EAGAIN)
+			rds_conn_connect_if_down(conn);
 		goto out;
+	}
 
 	if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
 		printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
@@ -1169,7 +1211,7 @@ out:
  * or
  *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
  */
-int
+static int
 rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	       __be16 dport, u8 h_flags)
 {
@@ -1238,7 +1280,7 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport)
 	return rds_send_probe(cp, 0, dport, 0);
 }
 
-void
+static void
 rds_send_ping(struct rds_connection *conn)
 {
 	unsigned long flags;

+ 1 - 0
net/rds/tcp_listen.c

@@ -79,6 +79,7 @@ bail:
  * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side
  * by moving them to CONNECTING in this function.
  */
+static
 struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
 {
 	int i;

+ 5 - 0
net/rds/tcp_recv.c

@@ -180,6 +180,9 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
 			rdsdebug("alloced tinc %p\n", tinc);
 			rds_inc_path_init(&tinc->ti_inc, cp,
 					  cp->cp_conn->c_faddr);
+			tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] =
+					local_clock();
+
 			/*
 			 * XXX * we might be able to use the __ variants when
 			 * we've already serialized at a higher level.
@@ -204,6 +207,8 @@ static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
 				/* could be 0 for a 0 len message */
 				tc->t_tinc_data_rem =
 					be32_to_cpu(tinc->ti_inc.i_hdr.h_len);
+				tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_START] =
+					local_clock();
 			}
 		}