Selaa lähdekoodia

Merge branch 'RDS-zerocopy-code-enhancements'

Sowmini Varadhan says:

====================
RDS: zerocopy code enhancements

A couple of enhancements to the rds zerocop code
- patch 1 refactors rds_message_copy_from_user to pull the zcopy logic
  into its own function
- patch 2 drops the usage sk_buff to track MSG_ZEROCOPY cookies and
  uses a simple linked list (enhancement suggested by willemb during
  code review)
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 7 vuotta sitten
vanhempi
commit
3f6be0ebde
4 muutettua tiedostoa jossa 138 lisäystä ja 85 poistoa
  1. 3 3
      net/rds/af_rds.c
  2. 102 69
      net/rds/message.c
  3. 17 6
      net/rds/rds.h
  4. 16 7
      net/rds/recv.c

+ 3 - 3
net/rds/af_rds.c

@@ -77,7 +77,7 @@ static int rds_release(struct socket *sock)
 	rds_send_drop_to(rs, NULL);
 	rds_rdma_drop_keys(rs);
 	rds_notify_queue_get(rs, NULL);
-	__skb_queue_purge(&rs->rs_zcookie_queue);
+	rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue);
 
 	spin_lock_bh(&rds_sock_lock);
 	list_del_init(&rs->rs_item);
@@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
 	}
 	if (!list_empty(&rs->rs_recv_queue) ||
 	    !list_empty(&rs->rs_notify_queue) ||
-	    !skb_queue_empty(&rs->rs_zcookie_queue))
+	    !list_empty(&rs->rs_zcookie_queue.zcookie_head))
 		mask |= (EPOLLIN | EPOLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
-	skb_queue_head_init(&rs->rs_zcookie_queue);
+	rds_message_zcopy_queue_init(&rs->rs_zcookie_queue);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
 	rs->rs_rx_traces = 0;

+ 102 - 69
net/rds/message.c

@@ -48,7 +48,6 @@ static unsigned int	rds_exthdr_size[__RDS_EXTHDR_MAX] = {
 [RDS_EXTHDR_GEN_NUM]	= sizeof(u32),
 };
 
-
 void rds_message_addref(struct rds_message *rm)
 {
 	rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
@@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
-static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie)
 {
-	struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+	struct rds_zcopy_cookies *ck = &info->zcookies;
 	int ncookies = ck->num;
 
 	if (ncookies == RDS_MAX_ZCOOKIES)
@@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
 	return true;
 }
 
+struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif)
+{
+	return container_of(znotif, struct rds_msg_zcopy_info, znotif);
+}
+
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q)
+{
+	unsigned long flags;
+	LIST_HEAD(copy);
+	struct rds_msg_zcopy_info *info, *tmp;
+
+	spin_lock_irqsave(&q->lock, flags);
+	list_splice(&q->zcookie_head, &copy);
+	INIT_LIST_HEAD(&q->zcookie_head);
+	spin_unlock_irqrestore(&q->lock, flags);
+
+	list_for_each_entry_safe(info, tmp, &copy, rs_zcookie_next) {
+		list_del(&info->rs_zcookie_next);
+		kfree(info);
+	}
+}
+
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 				     struct rds_znotifier *znotif)
 {
-	struct sk_buff *skb, *tail;
-	unsigned long flags;
-	struct sk_buff_head *q;
+	struct rds_msg_zcopy_info *info;
+	struct rds_msg_zcopy_queue *q;
 	u32 cookie = znotif->z_cookie;
 	struct rds_zcopy_cookies *ck;
+	struct list_head *head;
+	unsigned long flags;
 
+	mm_unaccount_pinned_pages(&znotif->z_mmp);
 	q = &rs->rs_zcookie_queue;
 	spin_lock_irqsave(&q->lock, flags);
-	tail = skb_peek_tail(q);
-
-	if (tail && skb_zcookie_add(tail, cookie)) {
-		spin_unlock_irqrestore(&q->lock, flags);
-		mm_unaccount_pinned_pages(&znotif->z_mmp);
-		consume_skb(rds_skb_from_znotifier(znotif));
-		/* caller invokes rds_wake_sk_sleep() */
-		return;
+	head = &q->zcookie_head;
+	if (!list_empty(head)) {
+		info = list_entry(head, struct rds_msg_zcopy_info,
+				  rs_zcookie_next);
+		if (info && rds_zcookie_add(info, cookie)) {
+			spin_unlock_irqrestore(&q->lock, flags);
+			kfree(rds_info_from_znotifier(znotif));
+			/* caller invokes rds_wake_sk_sleep() */
+			return;
+		}
 	}
 
-	skb = rds_skb_from_znotifier(znotif);
-	ck = (struct rds_zcopy_cookies *)skb->cb;
+	info = rds_info_from_znotifier(znotif);
+	ck = &info->zcookies;
 	memset(ck, 0, sizeof(*ck));
-	WARN_ON(!skb_zcookie_add(skb, cookie));
-
-	__skb_queue_tail(q, skb);
+	WARN_ON(!rds_zcookie_add(info, cookie));
+	list_add_tail(&q->zcookie_head, &info->rs_zcookie_next);
 
 	spin_unlock_irqrestore(&q->lock, flags);
 	/* caller invokes rds_wake_sk_sleep() */
-
-	mm_unaccount_pinned_pages(&znotif->z_mmp);
 }
 
 /*
@@ -333,14 +355,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
 	return rm;
 }
 
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
-			       bool zcopy)
+int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
 {
-	unsigned long to_copy, nbytes;
 	unsigned long sg_off;
 	struct scatterlist *sg;
 	int ret = 0;
 	int length = iov_iter_count(from);
+	int total_copied = 0;
+	struct rds_msg_zcopy_info *info;
 
 	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
 
@@ -350,54 +372,65 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
 	sg = rm->data.op_sg;
 	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
 
-	if (zcopy) {
-		int total_copied = 0;
-		struct sk_buff *skb;
-
-		skb = alloc_skb(0, GFP_KERNEL);
-		if (!skb)
-			return -ENOMEM;
-		BUILD_BUG_ON(sizeof(skb->cb) <
-			     max_t(int, sizeof(struct rds_znotifier),
-				   sizeof(struct rds_zcopy_cookies)));
-		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
-		if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
-					    length)) {
-			ret = -ENOMEM;
+	info = kzalloc(sizeof(*info), GFP_KERNEL);
+	if (!info)
+		return -ENOMEM;
+	INIT_LIST_HEAD(&info->rs_zcookie_next);
+	rm->data.op_mmp_znotifier = &info->znotif;
+	if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
+				    length)) {
+		ret = -ENOMEM;
+		goto err;
+	}
+	while (iov_iter_count(from)) {
+		struct page *pages;
+		size_t start;
+		ssize_t copied;
+
+		copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
+					    1, &start);
+		if (copied < 0) {
+			struct mmpin *mmp;
+			int i;
+
+			for (i = 0; i < rm->data.op_nents; i++)
+				put_page(sg_page(&rm->data.op_sg[i]));
+			mmp = &rm->data.op_mmp_znotifier->z_mmp;
+			mm_unaccount_pinned_pages(mmp);
+			ret = -EFAULT;
 			goto err;
 		}
-		while (iov_iter_count(from)) {
-			struct page *pages;
-			size_t start;
-			ssize_t copied;
-
-			copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
-						    1, &start);
-			if (copied < 0) {
-				struct mmpin *mmp;
-				int i;
-
-				for (i = 0; i < rm->data.op_nents; i++)
-					put_page(sg_page(&rm->data.op_sg[i]));
-				mmp = &rm->data.op_mmp_znotifier->z_mmp;
-				mm_unaccount_pinned_pages(mmp);
-				ret = -EFAULT;
-				goto err;
-			}
-			total_copied += copied;
-			iov_iter_advance(from, copied);
-			length -= copied;
-			sg_set_page(sg, pages, copied, start);
-			rm->data.op_nents++;
-			sg++;
-		}
-		WARN_ON_ONCE(length != 0);
-		return ret;
+		total_copied += copied;
+		iov_iter_advance(from, copied);
+		length -= copied;
+		sg_set_page(sg, pages, copied, start);
+		rm->data.op_nents++;
+		sg++;
+	}
+	WARN_ON_ONCE(length != 0);
+	return ret;
 err:
-		consume_skb(skb);
-		rm->data.op_mmp_znotifier = NULL;
-		return ret;
-	} /* zcopy */
+	kfree(info);
+	rm->data.op_mmp_znotifier = NULL;
+	return ret;
+}
+
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+			       bool zcopy)
+{
+	unsigned long to_copy, nbytes;
+	unsigned long sg_off;
+	struct scatterlist *sg;
+	int ret = 0;
+
+	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
+
+	/* now allocate and copy in the data payload.  */
+	sg = rm->data.op_sg;
+	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
+
+	if (zcopy)
+		return rds_message_zcopy_from_user(rm, from);
 
 	while (iov_iter_count(from)) {
 		if (!sg_page(sg)) {

+ 17 - 6
net/rds/rds.h

@@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_FLUSH		8
 
 struct rds_znotifier {
-	struct list_head	z_list;
 	struct mmpin		z_mmp;
 	u32			z_cookie;
 };
 
-#define	RDS_ZCOPY_SKB(__skb)	((struct rds_znotifier *)&((__skb)->cb[0]))
+struct rds_msg_zcopy_info {
+	struct list_head rs_zcookie_next;
+	union {
+		struct rds_znotifier znotif;
+		struct rds_zcopy_cookies zcookies;
+	};
+};
 
-static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+struct rds_msg_zcopy_queue {
+	struct list_head zcookie_head;
+	spinlock_t lock; /* protects zcookie_head queue */
+};
+
+static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q)
 {
-	return container_of((void *)z, struct sk_buff, cb);
+	spin_lock_init(&q->lock);
+	INIT_LIST_HEAD(&q->zcookie_head);
 }
 
 struct rds_message {
@@ -603,8 +614,7 @@ struct rds_sock {
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
-
-	struct sk_buff_head	rs_zcookie_queue;
+	struct rds_msg_zcopy_queue rs_zcookie_queue;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm);
 void rds_message_put(struct rds_message *rm);
 void rds_message_wait(struct rds_message *rm);
 void rds_message_unmapped(struct rds_message *rm);
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info);
 
 static inline void rds_message_make_checksum(struct rds_header *hdr)
 {

+ 16 - 7
net/rds/recv.c

@@ -579,9 +579,10 @@ out:
 
 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 {
-	struct sk_buff *skb;
-	struct sk_buff_head *q = &rs->rs_zcookie_queue;
+	struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
+	struct rds_msg_zcopy_info *info = NULL;
 	struct rds_zcopy_cookies *done;
+	unsigned long flags;
 
 	if (!msg->msg_control)
 		return false;
@@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
 		return false;
 
-	skb = skb_dequeue(q);
-	if (!skb)
+	spin_lock_irqsave(&q->lock, flags);
+	if (!list_empty(&q->zcookie_head)) {
+		info = list_entry(q->zcookie_head.next,
+				  struct rds_msg_zcopy_info, rs_zcookie_next);
+		list_del(&info->rs_zcookie_next);
+	}
+	spin_unlock_irqrestore(&q->lock, flags);
+	if (!info)
 		return false;
-	done = (struct rds_zcopy_cookies *)skb->cb;
+	done = &info->zcookies;
 	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
 		     done)) {
-		skb_queue_head(q, skb);
+		spin_lock_irqsave(&q->lock, flags);
+		list_add(&info->rs_zcookie_next, &q->zcookie_head);
+		spin_unlock_irqrestore(&q->lock, flags);
 		return false;
 	}
-	consume_skb(skb);
+	kfree(info);
 	return true;
 }