Browse Source

Merge branch 'rds-enable-mprds'

Sowmini Varadhan says:

====================
RDS: TCP: Enable mprds for rds-tcp

The third, and final, installment for mprds-tcp changes.

In Patch 3 of this set, if the transport support t_mp_capable,
we hash outgoing traffic across multiple paths.  Additionally, even if
the transport is MP capable, we may be peering with some node that does
not support mprds, or supports a different number of paths. This
necessitates RDS control plane changes so that both peers agree
on the number of paths to be used for the rds-tcp connection.
Patch 3 implements all these changes, which are documented in patch 5
of the series.

Patch 1 of this series is a bug fix for a race-condition
that has always existed, but is now more easily encountered with mprds.
Patch 2 is code refactoring. Patches 4 and 5 are Documentation updates.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 9 years ago
parent
commit
7a7d1d5733
13 changed files with 342 additions and 49 deletions
  1. 71 1
      Documentation/networking/rds.txt
  2. 6 0
      net/rds/bind.c
  3. 8 9
      net/rds/connection.c
  4. 1 0
      net/rds/message.c
  5. 23 2
      net/rds/rds.h
  6. 75 0
      net/rds/recv.c
  7. 67 4
      net/rds/send.c
  8. 12 19
      net/rds/tcp.c
  9. 1 0
      net/rds/tcp.h
  10. 6 1
      net/rds/tcp_connect.c
  11. 57 8
      net/rds/tcp_listen.c
  12. 13 5
      net/rds/tcp_send.c
  13. 2 0
      net/rds/threads.c

+ 71 - 1
Documentation/networking/rds.txt

@@ -85,7 +85,8 @@ Socket Interface
 
   bind(fd, &sockaddr_in, ...)
         This binds the socket to a local IP address and port, and a
-        transport.
+        transport, if one has not already been selected via the
+	SO_RDS_TRANSPORT socket option
 
   sendmsg(fd, ...)
         Sends a message to the indicated recipient. The kernel will
@@ -146,6 +147,20 @@ Socket Interface
         operation. In this case, it would use RDS_CANCEL_SENT_TO to
         nuke any pending messages.
 
+  setsockopt(fd, SOL_RDS, SO_RDS_TRANSPORT, (int *)&transport ..)
+  getsockopt(fd, SOL_RDS, SO_RDS_TRANSPORT, (int *)&transport ..)
+	Set or read an integer defining  the underlying
+	encapsulating transport to be used for RDS packets on the
+	socket. When setting the option, integer argument may be
+	one of RDS_TRANS_TCP or RDS_TRANS_IB. When retrieving the
+	value, RDS_TRANS_NONE will be returned on an unbound socket.
+	This socket option may only be set exactly once on the socket,
+	prior to binding it via the bind(2) system call. Attempts to
+	set SO_RDS_TRANSPORT on a socket for which the transport has
+	been previously attached explicitly (by SO_RDS_TRANSPORT) or
+	implicitly (via bind(2)) will return an error of EOPNOTSUPP.
+	An attempt to set SO_RDS_TRANSPPORT to RDS_TRANS_NONE will
+	always return EINVAL.
 
 RDMA for RDS
 ============
@@ -350,4 +365,59 @@ The recv path
     handle CMSGs
     return to application
 
+Multipath RDS (mprds)
+=====================
+  Mprds is multipathed-RDS, primarily intended for RDS-over-TCP
+  (though the concept can be extended to other transports). The classical
+  implementation of RDS-over-TCP is implemented by demultiplexing multiple
+  PF_RDS sockets between any 2 endpoints (where endpoint == [IP address,
+  port]) over a single TCP socket between the 2 IP addresses involved. This
+  has the limitation that it ends up funneling multiple RDS flows over a
+  single TCP flow, thus it is
+  (a) upper-bounded to the single-flow bandwidth,
+  (b) suffers from head-of-line blocking for all the RDS sockets.
+
+  Better throughput (for a fixed small packet size, MTU) can be achieved
+  by having multiple TCP/IP flows per rds/tcp connection, i.e., multipathed
+  RDS (mprds).  Each such TCP/IP flow constitutes a path for the rds/tcp
+  connection. RDS sockets will be attached to a path based on some hash
+  (e.g., of local address and RDS port number) and packets for that RDS
+  socket will be sent over the attached path using TCP to segment/reassemble
+  RDS datagrams on that path.
+
+  Multipathed RDS is implemented by splitting the struct rds_connection into
+  a common (to all paths) part, and a per-path struct rds_conn_path. All
+  I/O workqs and reconnect threads are driven from the rds_conn_path.
+  Transports such as TCP that are multipath capable may then set up a
+  TPC socket per rds_conn_path, and this is managed by the transport via
+  the transport privatee cp_transport_data pointer.
+
+  Transports announce themselves as multipath capable by setting the
+  t_mp_capable bit during registration with the rds core module. When the
+  transport is multipath-capable, rds_sendmsg() hashes outgoing traffic
+  across multiple paths. The outgoing hash is computed based on the
+  local address and port that the PF_RDS socket is bound to.
+
+  Additionally, even if the transport is MP capable, we may be
+  peering with some node that does not support mprds, or supports
+  a different number of paths. As a result, the peering nodes need
+  to agree on the number of paths to be used for the connection.
+  This is done by sending out a control packet exchange before the
+  first data packet. The control packet exchange must have completed
+  prior to outgoing hash completion in rds_sendmsg() when the transport
+  is mutlipath capable.
+
+  The control packet is an RDS ping packet (i.e., packet to rds dest
+  port 0) with the ping packet having a rds extension header option  of
+  type RDS_EXTHDR_NPATHS, length 2 bytes, and the value is the
+  number of paths supported by the sender. The "probe" ping packet will
+  get sent from some reserved port, RDS_FLAG_PROBE_PORT (in <linux/rds.h>)
+  The receiver of a ping from RDS_FLAG_PROBE_PORT will thus immediately
+  be able to compute the min(sender_paths, rcvr_paths). The pong
+  sent in response to a probe-ping should contain the rcvr's npaths
+  when the rcvr is mprds-capable.
+
+  If the rcvr is not mprds-capable, the exthdr in the ping will be
+  ignored.  In this case the pong will not have any exthdrs, so the sender
+  of the probe-ping can default to single-path mprds.
 

+ 6 - 0
net/rds/bind.c

@@ -81,6 +81,8 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 
 	if (*port != 0) {
 		rover = be16_to_cpu(*port);
+		if (rover == RDS_FLAG_PROBE_PORT)
+			return -EINVAL;
 		last = rover;
 	} else {
 		rover = max_t(u16, prandom_u32(), 2);
@@ -91,12 +93,16 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port)
 		if (rover == 0)
 			rover++;
 
+		if (rover == RDS_FLAG_PROBE_PORT)
+			continue;
 		key = ((u64)addr << 32) | cpu_to_be16(rover);
 		if (rhashtable_lookup_fast(&bind_hash_table, &key, ht_parms))
 			continue;
 
 		rs->rs_bound_key = key;
 		rs->rs_bound_addr = addr;
+		net_get_random_once(&rs->rs_hash_initval,
+				    sizeof(rs->rs_hash_initval));
 		rs->rs_bound_port = cpu_to_be16(rover);
 		rs->rs_bound_node.next = NULL;
 		rds_sock_addref(rs);

+ 8 - 9
net/rds/connection.c

@@ -155,7 +155,7 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 	struct hlist_head *head = rds_conn_bucket(laddr, faddr);
 	struct rds_transport *loop_trans;
 	unsigned long flags;
-	int ret;
+	int ret, i;
 
 	rcu_read_lock();
 	conn = rds_conn_lookup(net, head, laddr, faddr, trans);
@@ -211,6 +211,12 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 
 	conn->c_trans = trans;
 
+	init_waitqueue_head(&conn->c_hs_waitq);
+	for (i = 0; i < RDS_MPATH_WORKERS; i++) {
+		__rds_conn_path_init(conn, &conn->c_path[i],
+				     is_outgoing);
+		conn->c_path[i].cp_index = i;
+	}
 	ret = trans->conn_alloc(conn, gfp);
 	if (ret) {
 		kmem_cache_free(rds_conn_slab, conn);
@@ -263,14 +269,6 @@ static struct rds_connection *__rds_conn_create(struct net *net,
 			kmem_cache_free(rds_conn_slab, conn);
 			conn = found;
 		} else {
-			int i;
-
-			for (i = 0; i < RDS_MPATH_WORKERS; i++) {
-				__rds_conn_path_init(conn, &conn->c_path[i],
-						     is_outgoing);
-				conn->c_path[i].cp_index = i;
-			}
-
 			hlist_add_head_rcu(&conn->c_hash_node, head);
 			rds_cong_add_conn(conn);
 			rds_conn_count++;
@@ -668,6 +666,7 @@ EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 
 void rds_conn_drop(struct rds_connection *conn)
 {
+	WARN_ON(conn->c_trans->t_mp_capable);
 	rds_conn_path_drop(&conn->c_path[0]);
 }
 EXPORT_SYMBOL_GPL(rds_conn_drop);

+ 1 - 0
net/rds/message.c

@@ -41,6 +41,7 @@ static unsigned int	rds_exthdr_size[__RDS_EXTHDR_MAX] = {
 [RDS_EXTHDR_VERSION]	= sizeof(struct rds_ext_header_version),
 [RDS_EXTHDR_RDMA]	= sizeof(struct rds_ext_header_rdma),
 [RDS_EXTHDR_RDMA_DEST]	= sizeof(struct rds_ext_header_rdma_dest),
+[RDS_EXTHDR_NPATHS]	= sizeof(u16),
 };
 
 

+ 23 - 2
net/rds/rds.h

@@ -85,7 +85,9 @@ enum {
 #define RDS_RECV_REFILL		3
 
 /* Max number of multipaths per RDS connection. Must be a power of 2 */
-#define	RDS_MPATH_WORKERS	1
+#define	RDS_MPATH_WORKERS	8
+#define	RDS_MPATH_HASH(rs, n) (jhash_1word((rs)->rs_bound_port, \
+			       (rs)->rs_hash_initval) & ((n) - 1))
 
 /* Per mpath connection state */
 struct rds_conn_path {
@@ -131,7 +133,8 @@ struct rds_connection {
 	__be32			c_laddr;
 	__be32			c_faddr;
 	unsigned int		c_loopback:1,
-				c_pad_to_32:31;
+				c_ping_triggered:1,
+				c_pad_to_32:30;
 	int			c_npaths;
 	struct rds_connection	*c_passive;
 	struct rds_transport	*c_trans;
@@ -147,6 +150,7 @@ struct rds_connection {
 	unsigned long		c_map_queued;
 
 	struct rds_conn_path	c_path[RDS_MPATH_WORKERS];
+	wait_queue_head_t	c_hs_waitq; /* handshake waitq */
 };
 
 static inline
@@ -166,6 +170,17 @@ void rds_conn_net_set(struct rds_connection *conn, struct net *net)
 #define RDS_FLAG_RETRANSMITTED	0x04
 #define RDS_MAX_ADV_CREDIT	255
 
+/* RDS_FLAG_PROBE_PORT is the reserved sport used for sending a ping
+ * probe to exchange control information before establishing a connection.
+ * Currently the control information that is exchanged is the number of
+ * supported paths. If the peer is a legacy (older kernel revision) peer,
+ * it would return a pong message without additional control information
+ * that would then alert the sender that the peer was an older rev.
+ */
+#define RDS_FLAG_PROBE_PORT	1
+#define	RDS_HS_PROBE(sport, dport) \
+		((sport == RDS_FLAG_PROBE_PORT && dport == 0) || \
+		 (sport == 0 && dport == RDS_FLAG_PROBE_PORT))
 /*
  * Maximum space available for extension headers.
  */
@@ -225,6 +240,11 @@ struct rds_ext_header_rdma_dest {
 	__be32			h_rdma_offset;
 };
 
+/* Extension header announcing number of paths.
+ * Implicit length = 2 bytes.
+ */
+#define RDS_EXTHDR_NPATHS	4
+
 #define __RDS_EXTHDR_MAX	16 /* for now */
 
 struct rds_incoming {
@@ -545,6 +565,7 @@ struct rds_sock {
 	/* Socket options - in case there will be more */
 	unsigned char		rs_recverr,
 				rs_cong_monitor;
+	u32			rs_hash_initval;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)

+ 75 - 0
net/rds/recv.c

@@ -156,6 +156,67 @@ static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock
 	}
 }
 
+static void rds_recv_hs_exthdrs(struct rds_header *hdr,
+				struct rds_connection *conn)
+{
+	unsigned int pos = 0, type, len;
+	union {
+		struct rds_ext_header_version version;
+		u16 rds_npaths;
+	} buffer;
+
+	while (1) {
+		len = sizeof(buffer);
+		type = rds_message_next_extension(hdr, &pos, &buffer, &len);
+		if (type == RDS_EXTHDR_NONE)
+			break;
+		/* Process extension header here */
+		switch (type) {
+		case RDS_EXTHDR_NPATHS:
+			conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
+					       buffer.rds_npaths);
+			break;
+		default:
+			pr_warn_ratelimited("ignoring unknown exthdr type "
+					     "0x%x\n", type);
+		}
+	}
+	/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
+	conn->c_npaths = max_t(int, conn->c_npaths, 1);
+}
+
+/* rds_start_mprds() will synchronously start multiple paths when appropriate.
+ * The scheme is based on the following rules:
+ *
+ * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
+ *    sender's npaths (s_npaths)
+ * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
+ *    sends back a probe-pong with r_npaths. After that, if rcvr is the
+ *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
+ *    mprds_paths.
+ * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
+ *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
+ *    called after reception of the probe-pong on all mprds_paths.
+ *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
+ *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
+ * 4. when cp_index > 0, rds_connect_worker must only trigger
+ *    a connection if laddr < faddr.
+ * 5. sender may end up queuing the packet on the cp. will get sent out later.
+ *    when connection is completed.
+ */
+static void rds_start_mprds(struct rds_connection *conn)
+{
+	int i;
+	struct rds_conn_path *cp;
+
+	if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) {
+		for (i = 1; i < conn->c_npaths; i++) {
+			cp = &conn->c_path[i];
+			rds_conn_path_connect_if_down(cp);
+		}
+	}
+}
+
 /*
  * The transport must make sure that this is serialized against other
  * rx and conn reset on this specific conn.
@@ -232,6 +293,20 @@ void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
 		}
 		rds_stats_inc(s_recv_ping);
 		rds_send_pong(cp, inc->i_hdr.h_sport);
+		/* if this is a handshake ping, start multipath if necessary */
+		if (RDS_HS_PROBE(inc->i_hdr.h_sport, inc->i_hdr.h_dport)) {
+			rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
+			rds_start_mprds(cp->cp_conn);
+		}
+		goto out;
+	}
+
+	if (inc->i_hdr.h_dport ==  RDS_FLAG_PROBE_PORT &&
+	    inc->i_hdr.h_sport == 0) {
+		rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
+		/* if this is a handshake pong, start multipath if necessary */
+		rds_start_mprds(cp->cp_conn);
+		wake_up(&cp->cp_conn->c_hs_waitq);
 		goto out;
 	}
 

+ 67 - 4
net/rds/send.c

@@ -963,6 +963,29 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 	return ret;
 }
 
+static void rds_send_ping(struct rds_connection *conn);
+
+static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
+{
+	int hash;
+
+	if (conn->c_npaths == 0)
+		hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
+	else
+		hash = RDS_MPATH_HASH(rs, conn->c_npaths);
+	if (conn->c_npaths == 0 && hash != 0) {
+		rds_send_ping(conn);
+
+		if (conn->c_npaths == 0) {
+			wait_event_interruptible(conn->c_hs_waitq,
+						 (conn->c_npaths != 0));
+		}
+		if (conn->c_npaths == 1)
+			hash = 0;
+	}
+	return hash;
+}
+
 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 {
 	struct sock *sk = sock->sk;
@@ -1075,7 +1098,10 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		goto out;
 	}
 
-	cpath = &conn->c_path[0];
+	if (conn->c_trans->t_mp_capable)
+		cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
+	else
+		cpath = &conn->c_path[0];
 
 	rds_conn_path_connect_if_down(cpath);
 
@@ -1135,10 +1161,16 @@ out:
 }
 
 /*
- * Reply to a ping packet.
+ * send out a probe. Can be shared by rds_send_ping,
+ * rds_send_pong, rds_send_hb.
+ * rds_send_hb should use h_flags
+ *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
+ * or
+ *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
  */
 int
-rds_send_pong(struct rds_conn_path *cp, __be16 dport)
+rds_send_probe(struct rds_conn_path *cp, __be16 sport,
+	       __be16 dport, u8 h_flags)
 {
 	struct rds_message *rm;
 	unsigned long flags;
@@ -1166,9 +1198,18 @@ rds_send_pong(struct rds_conn_path *cp, __be16 dport)
 	rm->m_inc.i_conn = cp->cp_conn;
 	rm->m_inc.i_conn_path = cp;
 
-	rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
+	rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
 				    cp->cp_next_tx_seq);
+	rm->m_inc.i_hdr.h_flags |= h_flags;
 	cp->cp_next_tx_seq++;
+
+	if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) {
+		u16 npaths = RDS_MPATH_WORKERS;
+
+		rds_message_add_extension(&rm->m_inc.i_hdr,
+					  RDS_EXTHDR_NPATHS, &npaths,
+					  sizeof(npaths));
+	}
 	spin_unlock_irqrestore(&cp->cp_lock, flags);
 
 	rds_stats_inc(s_send_queued);
@@ -1185,3 +1226,25 @@ out:
 		rds_message_put(rm);
 	return ret;
 }
+
+int
+rds_send_pong(struct rds_conn_path *cp, __be16 dport)
+{
+	return rds_send_probe(cp, 0, dport, 0);
+}
+
+void
+rds_send_ping(struct rds_connection *conn)
+{
+	unsigned long flags;
+	struct rds_conn_path *cp = &conn->c_path[0];
+
+	spin_lock_irqsave(&cp->cp_lock, flags);
+	if (conn->c_ping_triggered) {
+		spin_unlock_irqrestore(&cp->cp_lock, flags);
+		return;
+	}
+	conn->c_ping_triggered = 1;
+	spin_unlock_irqrestore(&cp->cp_lock, flags);
+	rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0);
+}

+ 12 - 19
net/rds/tcp.c

@@ -38,7 +38,6 @@
 #include <net/net_namespace.h>
 #include <net/netns/generic.h>
 
-#include "rds_single_path.h"
 #include "rds.h"
 #include "tcp.h"
 
@@ -168,35 +167,21 @@ void rds_tcp_reset_callbacks(struct socket *sock,
 	wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 	lock_sock(osock->sk);
 	/* reset receive side state for rds_tcp_data_recv() for osock  */
+	cancel_delayed_work_sync(&cp->cp_send_w);
+	cancel_delayed_work_sync(&cp->cp_recv_w);
 	if (tc->t_tinc) {
 		rds_inc_put(&tc->t_tinc->ti_inc);
 		tc->t_tinc = NULL;
 	}
 	tc->t_tinc_hdr_rem = sizeof(struct rds_header);
 	tc->t_tinc_data_rem = 0;
-	tc->t_sock = NULL;
-
-	write_lock_bh(&osock->sk->sk_callback_lock);
-
-	osock->sk->sk_user_data = NULL;
-	osock->sk->sk_data_ready = tc->t_orig_data_ready;
-	osock->sk->sk_write_space = tc->t_orig_write_space;
-	osock->sk->sk_state_change = tc->t_orig_state_change;
-	write_unlock_bh(&osock->sk->sk_callback_lock);
+	rds_tcp_restore_callbacks(osock, tc);
 	release_sock(osock->sk);
 	sock_release(osock);
 newsock:
 	rds_send_path_reset(cp);
 	lock_sock(sock->sk);
-	write_lock_bh(&sock->sk->sk_callback_lock);
-	tc->t_sock = sock;
-	tc->t_cpath = cp;
-	sock->sk->sk_user_data = cp;
-	sock->sk->sk_data_ready = rds_tcp_data_ready;
-	sock->sk->sk_write_space = rds_tcp_write_space;
-	sock->sk->sk_state_change = rds_tcp_state_change;
-
-	write_unlock_bh(&sock->sk->sk_callback_lock);
+	rds_tcp_set_callbacks(sock, cp);
 	release_sock(sock->sk);
 }
 
@@ -372,6 +357,7 @@ struct rds_transport rds_tcp_transport = {
 	.t_name			= "tcp",
 	.t_type			= RDS_TRANS_TCP,
 	.t_prefer_loopback	= 1,
+	.t_mp_capable		= 1,
 };
 
 static int rds_tcp_netid;
@@ -551,6 +537,13 @@ static void rds_tcp_kill_sock(struct net *net)
 	}
 }
 
+void *rds_tcp_listen_sock_def_readable(struct net *net)
+{
+	struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
+
+	return rtn->rds_tcp_listen_sock->sk->sk_user_data;
+}
+
 static int rds_tcp_dev_event(struct notifier_block *this,
 			     unsigned long event, void *ptr)
 {

+ 1 - 0
net/rds/tcp.h

@@ -70,6 +70,7 @@ void rds_tcp_listen_stop(struct socket *);
 void rds_tcp_listen_data_ready(struct sock *sk);
 int rds_tcp_accept_one(struct socket *sock);
 int rds_tcp_keepalive(struct socket *sock);
+void *rds_tcp_listen_sock_def_readable(struct net *net);
 
 /* tcp_recv.c */
 int rds_tcp_recv_init(void);

+ 6 - 1
net/rds/tcp_connect.c

@@ -34,7 +34,6 @@
 #include <linux/in.h>
 #include <net/tcp.h>
 
-#include "rds_single_path.h"
 #include "rds.h"
 #include "tcp.h"
 
@@ -82,6 +81,12 @@ int rds_tcp_conn_path_connect(struct rds_conn_path *cp)
 	struct rds_connection *conn = cp->cp_conn;
 	struct rds_tcp_connection *tc = cp->cp_transport_data;
 
+	/* for multipath rds,we only trigger the connection after
+	 * the handshake probe has determined the number of paths.
+	 */
+	if (cp->cp_index > 0 && cp->cp_conn->c_npaths < 2)
+		return -EAGAIN;
+
 	mutex_lock(&tc->t_conn_path_lock);
 
 	if (rds_conn_path_up(cp)) {

+ 57 - 8
net/rds/tcp_listen.c

@@ -35,7 +35,6 @@
 #include <linux/in.h>
 #include <net/tcp.h>
 
-#include "rds_single_path.h"
 #include "rds.h"
 #include "tcp.h"
 
@@ -71,6 +70,52 @@ bail:
 	return ret;
 }
 
+/* rds_tcp_accept_one_path(): if accepting on cp_index > 0, make sure the
+ * client's ipaddr < server's ipaddr. Otherwise, close the accepted
+ * socket and force a reconneect from smaller -> larger ip addr. The reason
+ * we special case cp_index 0 is to allow the rds probe ping itself to itself
+ * get through efficiently.
+ * Since reconnects are only initiated from the node with the numerically
+ * smaller ip address, we recycle conns in RDS_CONN_ERROR on the passive side
+ * by moving them to CONNECTING in this function.
+ */
+struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
+{
+	int i;
+	bool peer_is_smaller = (conn->c_faddr < conn->c_laddr);
+	int npaths = conn->c_npaths;
+
+	if (npaths <= 1) {
+		struct rds_conn_path *cp = &conn->c_path[0];
+		int ret;
+
+		ret = rds_conn_path_transition(cp, RDS_CONN_DOWN,
+					       RDS_CONN_CONNECTING);
+		if (!ret)
+			rds_conn_path_transition(cp, RDS_CONN_ERROR,
+						 RDS_CONN_CONNECTING);
+		return cp->cp_transport_data;
+	}
+
+	/* for mprds, paths with cp_index > 0 MUST be initiated by the peer
+	 * with the smaller address.
+	 */
+	if (!peer_is_smaller)
+		return NULL;
+
+	for (i = 1; i < npaths; i++) {
+		struct rds_conn_path *cp = &conn->c_path[i];
+
+		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
+					     RDS_CONN_CONNECTING) ||
+		    rds_conn_path_transition(cp, RDS_CONN_ERROR,
+					     RDS_CONN_CONNECTING)) {
+			return cp->cp_transport_data;
+		}
+	}
+	return NULL;
+}
+
 int rds_tcp_accept_one(struct socket *sock)
 {
 	struct socket *new_sock = NULL;
@@ -120,12 +165,14 @@ int rds_tcp_accept_one(struct socket *sock)
 	 * If the client reboots, this conn will need to be cleaned up.
 	 * rds_tcp_state_change() will do that cleanup
 	 */
-	rs_tcp = (struct rds_tcp_connection *)conn->c_transport_data;
-	cp = &conn->c_path[0];
-	rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
+	rs_tcp = rds_tcp_accept_one_path(conn);
+	if (!rs_tcp)
+		goto rst_nsk;
 	mutex_lock(&rs_tcp->t_conn_path_lock);
-	conn_state = rds_conn_state(conn);
-	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP)
+	cp = rs_tcp->t_cpath;
+	conn_state = rds_conn_path_state(cp);
+	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_UP &&
+	    conn_state != RDS_CONN_ERROR)
 		goto rst_nsk;
 	if (rs_tcp->t_sock) {
 		/* Need to resolve a duelling SYN between peers.
@@ -135,11 +182,11 @@ int rds_tcp_accept_one(struct socket *sock)
 		 * c_transport_data.
 		 */
 		if (ntohl(inet->inet_saddr) < ntohl(inet->inet_daddr) ||
-		    !conn->c_path[0].cp_outgoing) {
+		    !cp->cp_outgoing) {
 			goto rst_nsk;
 		} else {
 			rds_tcp_reset_callbacks(new_sock, cp);
-			conn->c_path[0].cp_outgoing = 0;
+			cp->cp_outgoing = 0;
 			/* rds_connect_path_complete() marks RDS_CONN_UP */
 			rds_connect_path_complete(cp, RDS_CONN_RESETTING);
 		}
@@ -183,6 +230,8 @@ void rds_tcp_listen_data_ready(struct sock *sk)
 	 */
 	if (sk->sk_state == TCP_LISTEN)
 		rds_tcp_accept_work(sk);
+	else
+		ready = rds_tcp_listen_sock_def_readable(sock_net(sk));
 
 out:
 	read_unlock_bh(&sk->sk_callback_lock);

+ 13 - 5
net/rds/tcp_send.c

@@ -81,7 +81,8 @@ static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		 unsigned int hdr_off, unsigned int sg, unsigned int off)
 {
-	struct rds_tcp_connection *tc = conn->c_transport_data;
+	struct rds_conn_path *cp = rm->m_inc.i_conn_path;
+	struct rds_tcp_connection *tc = cp->cp_transport_data;
 	int done = 0;
 	int ret = 0;
 	int more;
@@ -150,10 +151,17 @@ out:
 			rds_tcp_stats_inc(s_tcp_sndbuf_full);
 			ret = 0;
 		} else {
-			printk(KERN_WARNING "RDS/tcp: send to %pI4 "
-			       "returned %d, disconnecting and reconnecting\n",
-			       &conn->c_faddr, ret);
-			rds_conn_drop(conn);
+			/* No need to disconnect/reconnect if path_drop
+			 * has already been triggered, because, e.g., of
+			 * an incoming RST.
+			 */
+			if (rds_conn_path_up(cp)) {
+				pr_warn("RDS/tcp: send to %pI4 on cp [%d]"
+					"returned %d, "
+					"disconnecting and reconnecting\n",
+					&conn->c_faddr, cp->cp_index, ret);
+				rds_conn_path_drop(cp);
+			}
 		}
 	}
 	if (done == 0)

+ 2 - 0
net/rds/threads.c

@@ -156,6 +156,8 @@ void rds_connect_worker(struct work_struct *work)
 	struct rds_connection *conn = cp->cp_conn;
 	int ret;
 
+	if (cp->cp_index > 1 && cp->cp_conn->c_laddr > cp->cp_conn->c_faddr)
+		return;
 	clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 	ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
 	if (ret) {