Browse Source

Merge branch 'rds'

Sowmini Varadhan says:

====================
RDS: RDS-core fixes

This patch-series updates the RDS core and rds-tcp modules with
some bug fixes that were originally authored by  Andy Grover,
Zach Brown, and Chris Mason.

v2: Code review comment by Sergei Shtylov
V3: DaveM comments:
- dropped patches 3, 5 for "heuristic" changes in rds_send_xmit().
  Investigation into the root-cause of these IB-triggered changes
  produced the feedback: "I don't remember seeing "RDS: Stuck RM"
  message in last 1-1.5 years and checking with other folks. It may very
  well be some old workaround for stale connection for which long term
  fix is already made and this part of code not exercised anymore."

  Any such fixes, *if* they are needed, can/should be done in the
  IB specific RDS transport modules.

- similarly dropped the LL_SEND_FULL patch (patch 6 in v2 set)

v4: Documentation/networking/rds.txt contains incorrect references
    to "missing sysctl values for pf_rds and sol_rds in mainline".
    The sysctl values were never needed in mainline, thus fix the
    documentation.

v5: Clarify comment per http://www.spinics.net/lists/netdev/msg324220.html

v6: Re-added entire version history to cover letter.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 10 years ago
parent
commit
1ec1e23d1d
4 changed files with 38 additions and 8 deletions
  1. 4 5
      Documentation/networking/rds.txt
  2. 2 1
      net/rds/connection.c
  3. 1 0
      net/rds/rds.h
  4. 31 2
      net/rds/send.c

+ 4 - 5
Documentation/networking/rds.txt

@@ -62,11 +62,10 @@ Socket Interface
 ================
 ================
 
 
   AF_RDS, PF_RDS, SOL_RDS
   AF_RDS, PF_RDS, SOL_RDS
-        These constants haven't been assigned yet, because RDS isn't in
-        mainline yet. Currently, the kernel module assigns some constant
-        and publishes it to user space through two sysctl files
-                /proc/sys/net/rds/pf_rds
-                /proc/sys/net/rds/sol_rds
+	AF_RDS and PF_RDS are the domain type to be used with socket(2)
+	to create RDS sockets. SOL_RDS is the socket-level to be used
+	with setsockopt(2) and getsockopt(2) for RDS specific socket
+	options.
 
 
   fd = socket(PF_RDS, SOCK_SEQPACKET, 0);
   fd = socket(PF_RDS, SOCK_SEQPACKET, 0);
         This creates a new, unbound RDS socket.
         This creates a new, unbound RDS socket.

+ 2 - 1
net/rds/connection.c

@@ -130,7 +130,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 	rcu_read_lock();
 	rcu_read_lock();
 	conn = rds_conn_lookup(head, laddr, faddr, trans);
 	conn = rds_conn_lookup(head, laddr, faddr, trans);
 	if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
 	if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
-	    !is_outgoing) {
+	    laddr == faddr && !is_outgoing) {
 		/* This is a looped back IB connection, and we're
 		/* This is a looped back IB connection, and we're
 		 * called by the code handling the incoming connect.
 		 * called by the code handling the incoming connect.
 		 * We need a second connection object into which we
 		 * We need a second connection object into which we
@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
 	}
 	}
 
 
 	atomic_set(&conn->c_state, RDS_CONN_DOWN);
 	atomic_set(&conn->c_state, RDS_CONN_DOWN);
+	conn->c_send_gen = 0;
 	conn->c_reconnect_jiffies = 0;
 	conn->c_reconnect_jiffies = 0;
 	INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
 	INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
 	INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
 	INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);

+ 1 - 0
net/rds/rds.h

@@ -110,6 +110,7 @@ struct rds_connection {
 	void			*c_transport_data;
 	void			*c_transport_data;
 
 
 	atomic_t		c_state;
 	atomic_t		c_state;
+	unsigned long		c_send_gen;
 	unsigned long		c_flags;
 	unsigned long		c_flags;
 	unsigned long		c_reconnect_jiffies;
 	unsigned long		c_reconnect_jiffies;
 	struct delayed_work	c_send_w;
 	struct delayed_work	c_send_w;

+ 31 - 2
net/rds/send.c

@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
 	struct scatterlist *sg;
 	struct scatterlist *sg;
 	int ret = 0;
 	int ret = 0;
 	LIST_HEAD(to_be_dropped);
 	LIST_HEAD(to_be_dropped);
+	int batch_count;
+	unsigned long send_gen = 0;
 
 
 restart:
 restart:
+	batch_count = 0;
 
 
 	/*
 	/*
 	 * sendmsg calls here after having queued its message on the send
 	 * sendmsg calls here after having queued its message on the send
@@ -156,6 +159,17 @@ restart:
 		goto out;
 		goto out;
 	}
 	}
 
 
+	/*
+	 * we record the send generation after doing the xmit acquire.
+	 * if someone else manages to jump in and do some work, we'll use
+	 * this to avoid a goto restart farther down.
+	 *
+	 * The acquire_in_xmit() check above ensures that only one
+	 * caller can increment c_send_gen at any time.
+	 */
+	conn->c_send_gen++;
+	send_gen = conn->c_send_gen;
+
 	/*
 	/*
 	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 	 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 	 * we do the opposite to avoid races.
 	 * we do the opposite to avoid races.
@@ -202,6 +216,16 @@ restart:
 		if (!rm) {
 		if (!rm) {
 			unsigned int len;
 			unsigned int len;
 
 
+			batch_count++;
+
+			/* we want to process as big a batch as we can, but
+			 * we also want to avoid softlockups.  If we've been
+			 * through a lot of messages, lets back off and see
+			 * if anyone else jumps in
+			 */
+			if (batch_count >= 1024)
+				goto over_batch;
+
 			spin_lock_irqsave(&conn->c_lock, flags);
 			spin_lock_irqsave(&conn->c_lock, flags);
 
 
 			if (!list_empty(&conn->c_send_queue)) {
 			if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
 		}
 		}
 	}
 	}
 
 
+over_batch:
 	if (conn->c_trans->xmit_complete)
 	if (conn->c_trans->xmit_complete)
 		conn->c_trans->xmit_complete(conn);
 		conn->c_trans->xmit_complete(conn);
-
 	release_in_xmit(conn);
 	release_in_xmit(conn);
 
 
 	/* Nuke any messages we decided not to retransmit. */
 	/* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
 	 * If the transport cannot continue (i.e ret != 0), then it must
 	 * If the transport cannot continue (i.e ret != 0), then it must
 	 * call us when more room is available, such as from the tx
 	 * call us when more room is available, such as from the tx
 	 * completion handler.
 	 * completion handler.
+	 *
+	 * We have an extra generation check here so that if someone manages
+	 * to jump in after our release_in_xmit, we'll see that they have done
+	 * some work and we will skip our goto
 	 */
 	 */
 	if (ret == 0) {
 	if (ret == 0) {
 		smp_mb();
 		smp_mb();
-		if (!list_empty(&conn->c_send_queue)) {
+		if (!list_empty(&conn->c_send_queue) &&
+		    send_gen == conn->c_send_gen) {
 			rds_stats_inc(s_send_lock_queue_raced);
 			rds_stats_inc(s_send_lock_queue_raced);
 			goto restart;
 			goto restart;
 		}
 		}