Răsfoiți Sursa

Merge branch 'udp-receive-path-optimizations'

Eric Dumazet says:

====================
udp: receive path optimizations

This patch series provides about 100 % performance increase under flood.

v2: added Paolo feedback on udp_rmem_release() for tiny sk_rcvbuf
    added the last patch touching sk_rmem_alloc later
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 8 ani în urmă
părinte
comite
524a64c726
3 a modificat fișierele cu 77 adăugiri și 6 ștergeri
  1. 8 1
      include/linux/skbuff.h
  2. 3 0
      include/linux/udp.h
  3. 66 5
      net/ipv4/udp.c

+ 8 - 1
include/linux/skbuff.h

@@ -645,8 +645,15 @@ struct sk_buff {
 		struct rb_node	rbnode; /* used in netem & tcp stack */
 	};
 	struct sock		*sk;
-	struct net_device	*dev;
 
+	union {
+		struct net_device	*dev;
+		/* Some protocols might use this space to store information,
+		 * while device pointer would be NULL.
+		 * UDP receive path is one user.
+		 */
+		unsigned long		dev_scratch;
+	};
 	/*
 	 * This is the control buffer. It is free to use for every
 	 * layer. Please put your private variables there. If you

+ 3 - 0
include/linux/udp.h

@@ -79,6 +79,9 @@ struct udp_sock {
 	int			(*gro_complete)(struct sock *sk,
 						struct sk_buff *skb,
 						int nhoff);
+
+	/* This field is dirtied by udp_recvmsg() */
+	int		forward_deficit;
 };
 
 static inline struct udp_sock *udp_sk(const struct sock *sk)

+ 66 - 5
net/ipv4/udp.c

@@ -1177,28 +1177,71 @@ out:
 /* fully reclaim rmem/fwd memory allocated for skb */
 static void udp_rmem_release(struct sock *sk, int size, int partial)
 {
+	struct udp_sock *up = udp_sk(sk);
 	int amt;
 
-	atomic_sub(size, &sk->sk_rmem_alloc);
+	if (likely(partial)) {
+		up->forward_deficit += size;
+		size = up->forward_deficit;
+		if (size < (sk->sk_rcvbuf >> 2) &&
+		    !skb_queue_empty(&sk->sk_receive_queue))
+			return;
+	} else {
+		size += up->forward_deficit;
+	}
+	up->forward_deficit = 0;
+
 	sk->sk_forward_alloc += size;
 	amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
 	sk->sk_forward_alloc -= amt;
 
 	if (amt)
 		__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
+
+	atomic_sub(size, &sk->sk_rmem_alloc);
 }
 
-/* Note: called with sk_receive_queue.lock held */
+/* Note: called with sk_receive_queue.lock held.
+ * Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
+ * This avoids a cache line miss while receive_queue lock is held.
+ * Look at __udp_enqueue_schedule_skb() to find where this copy is done.
+ */
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-	udp_rmem_release(sk, skb->truesize, 1);
+	udp_rmem_release(sk, skb->dev_scratch, 1);
 }
 EXPORT_SYMBOL(udp_skb_destructor);
 
+/* Idea of busylocks is to let producers grab an extra spinlock
+ * to relieve pressure on the receive_queue spinlock shared by consumer.
+ * Under flood, this means that only one producer can be in line
+ * trying to acquire the receive_queue spinlock.
+ * These busylock can be allocated on a per cpu manner, instead of a
+ * per socket one (that would consume a cache line per socket)
+ */
+static int udp_busylocks_log __read_mostly;
+static spinlock_t *udp_busylocks __read_mostly;
+
+static spinlock_t *busylock_acquire(void *ptr)
+{
+	spinlock_t *busy;
+
+	busy = udp_busylocks + hash_ptr(ptr, udp_busylocks_log);
+	spin_lock(busy);
+	return busy;
+}
+
+static void busylock_release(spinlock_t *busy)
+{
+	if (busy)
+		spin_unlock(busy);
+}
+
 int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 {
 	struct sk_buff_head *list = &sk->sk_receive_queue;
 	int rmem, delta, amt, err = -ENOMEM;
+	spinlock_t *busy = NULL;
 	int size;
 
 	/* try to avoid the costly atomic add/sub pair when the receive
@@ -1214,9 +1257,16 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 	 * - Less cache line misses at copyout() time
 	 * - Less work at consume_skb() (less alien page frag freeing)
 	 */
-	if (rmem > (sk->sk_rcvbuf >> 1))
+	if (rmem > (sk->sk_rcvbuf >> 1)) {
 		skb_condense(skb);
+
+		busy = busylock_acquire(sk);
+	}
 	size = skb->truesize;
+	/* Copy skb->truesize into skb->dev_scratch to avoid a cache line miss
+	 * in udp_skb_destructor()
+	 */
+	skb->dev_scratch = size;
 
 	/* we drop only if the receive buf is full and the receive
 	 * queue contains some other skb
@@ -1243,7 +1293,6 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 	/* no need to setup a destructor, we will explicitly release the
 	 * forward allocated memory on dequeue
 	 */
-	skb->dev = NULL;
 	sock_skb_set_dropcount(sk, skb);
 
 	__skb_queue_tail(list, skb);
@@ -1252,6 +1301,7 @@ int __udp_enqueue_schedule_skb(struct sock *sk, struct sk_buff *skb)
 	if (!sock_flag(sk, SOCK_DEAD))
 		sk->sk_data_ready(sk);
 
+	busylock_release(busy);
 	return 0;
 
 uncharge_drop:
@@ -1259,6 +1309,7 @@ uncharge_drop:
 
 drop:
 	atomic_inc(&sk->sk_drops);
+	busylock_release(busy);
 	return err;
 }
 EXPORT_SYMBOL_GPL(__udp_enqueue_schedule_skb);
@@ -2613,6 +2664,7 @@ EXPORT_SYMBOL(udp_flow_hashrnd);
 void __init udp_init(void)
 {
 	unsigned long limit;
+	unsigned int i;
 
 	udp_table_init(&udp_table, "UDP");
 	limit = nr_free_buffer_pages() / 8;
@@ -2623,4 +2675,13 @@ void __init udp_init(void)
 
 	sysctl_udp_rmem_min = SK_MEM_QUANTUM;
 	sysctl_udp_wmem_min = SK_MEM_QUANTUM;
+
+	/* 16 spinlocks per cpu */
+	udp_busylocks_log = ilog2(nr_cpu_ids) + 4;
+	udp_busylocks = kmalloc(sizeof(spinlock_t) << udp_busylocks_log,
+				GFP_KERNEL);
+	if (!udp_busylocks)
+		panic("UDP: failed to alloc udp_busylocks\n");
+	for (i = 0; i < (1U << udp_busylocks_log); i++)
+		spin_lock_init(udp_busylocks + i);
 }