Selaa lähdekoodia

Merge branch 'RDS-optimized-notification-for-zerocopy-completion'

Sowmini Varadhan says:

====================
RDS: optimized notification for zerocopy completion

Resending with acked-by additions: previous attempt does not show
up in Patchwork. This time with a new mail Message-Id.

RDS applications use predominantly request-response, transacation
based IPC, so that ingress and egress traffic are well-balanced,
and it is possible/desirable to reduce system-call overhead by
piggybacking the notifications for zerocopy completion response
with data.

Moreover, it has been pointed out that socket functions block
if sk_err is non-zero, thus if the RDS code does not plan/need
to use sk_error_queue path for completion notification, it
is preferable to remove the sk_errror_queue related paths in
RDS.

Both of these goals are implemented in this series.

v2: removed sk_error_queue support
v3: incorporated additional code review comments (details in each patch)
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 7 vuotta sitten
vanhempi
commit
f4155eff1f

+ 0 - 2
include/uapi/linux/errqueue.h

@@ -20,13 +20,11 @@ struct sock_extended_err {
 #define SO_EE_ORIGIN_ICMP6	3
 #define SO_EE_ORIGIN_ICMP6	3
 #define SO_EE_ORIGIN_TXSTATUS	4
 #define SO_EE_ORIGIN_TXSTATUS	4
 #define SO_EE_ORIGIN_ZEROCOPY	5
 #define SO_EE_ORIGIN_ZEROCOPY	5
-#define SO_EE_ORIGIN_ZCOOKIE	6
 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
 
 
 #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
 #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
 
 
 #define SO_EE_CODE_ZEROCOPY_COPIED	1
 #define SO_EE_CODE_ZEROCOPY_COPIED	1
-#define	SO_EE_ORIGIN_MAX_ZCOOKIES	8
 
 
 /**
 /**
  *	struct scm_timestamping - timestamps exposed through cmsg
  *	struct scm_timestamping - timestamps exposed through cmsg

+ 7 - 0
include/uapi/linux/rds.h

@@ -104,6 +104,7 @@
 #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
 #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
 #define RDS_CMSG_RXPATH_LATENCY		11
 #define RDS_CMSG_RXPATH_LATENCY		11
 #define	RDS_CMSG_ZCOPY_COOKIE		12
 #define	RDS_CMSG_ZCOPY_COOKIE		12
+#define	RDS_CMSG_ZCOPY_COMPLETION	13
 
 
 #define RDS_INFO_FIRST			10000
 #define RDS_INFO_FIRST			10000
 #define RDS_INFO_COUNTERS		10000
 #define RDS_INFO_COUNTERS		10000
@@ -317,6 +318,12 @@ struct rds_rdma_notify {
 #define RDS_RDMA_DROPPED	3
 #define RDS_RDMA_DROPPED	3
 #define RDS_RDMA_OTHER_ERROR	4
 #define RDS_RDMA_OTHER_ERROR	4
 
 
+#define	RDS_MAX_ZCOOKIES	8
+struct rds_zcopy_cookies {
+	__u32 num;
+	__u32 cookies[RDS_MAX_ZCOOKIES];
+};
+
 /*
 /*
  * Common set of flags for all RDMA related structs
  * Common set of flags for all RDMA related structs
  */
  */

+ 5 - 2
net/rds/af_rds.c

@@ -77,6 +77,7 @@ static int rds_release(struct socket *sock)
 	rds_send_drop_to(rs, NULL);
 	rds_send_drop_to(rs, NULL);
 	rds_rdma_drop_keys(rs);
 	rds_rdma_drop_keys(rs);
 	rds_notify_queue_get(rs, NULL);
 	rds_notify_queue_get(rs, NULL);
+	__skb_queue_purge(&rs->rs_zcookie_queue);
 
 
 	spin_lock_bh(&rds_sock_lock);
 	spin_lock_bh(&rds_sock_lock);
 	list_del_init(&rs->rs_item);
 	list_del_init(&rs->rs_item);
@@ -144,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr,
  *  -	to signal that a previously congested destination may have become
  *  -	to signal that a previously congested destination may have become
  *	uncongested
  *	uncongested
  *  -	A notification has been queued to the socket (this can be a congestion
  *  -	A notification has been queued to the socket (this can be a congestion
- *	update, or a RDMA completion).
+ *	update, or a RDMA completion, or a MSG_ZEROCOPY completion).
  *
  *
  * EPOLLOUT is asserted if there is room on the send queue. This does not mean
  * EPOLLOUT is asserted if there is room on the send queue. This does not mean
  * however, that the next sendmsg() call will succeed. If the application tries
  * however, that the next sendmsg() call will succeed. If the application tries
@@ -178,7 +179,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
 		spin_unlock(&rs->rs_lock);
 		spin_unlock(&rs->rs_lock);
 	}
 	}
 	if (!list_empty(&rs->rs_recv_queue) ||
 	if (!list_empty(&rs->rs_recv_queue) ||
-	    !list_empty(&rs->rs_notify_queue))
+	    !list_empty(&rs->rs_notify_queue) ||
+	    !skb_queue_empty(&rs->rs_zcookie_queue))
 		mask |= (EPOLLIN | EPOLLRDNORM);
 		mask |= (EPOLLIN | EPOLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (EPOLLOUT | EPOLLWRNORM);
 		mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -513,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
+	skb_queue_head_init(&rs->rs_zcookie_queue);
 	spin_lock_init(&rs->rs_rdma_lock);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
 	rs->rs_rdma_keys = RB_ROOT;
 	rs->rs_rx_traces = 0;
 	rs->rs_rx_traces = 0;

+ 16 - 22
net/rds/message.c

@@ -58,32 +58,26 @@ EXPORT_SYMBOL_GPL(rds_message_addref);
 
 
 static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
 static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
 {
 {
-	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
-	int ncookies;
-	u32 *ptr;
+	struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+	int ncookies = ck->num;
 
 
-	if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+	if (ncookies == RDS_MAX_ZCOOKIES)
 		return false;
 		return false;
-	ncookies = serr->ee.ee_data;
-	if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
-		return false;
-	ptr = skb_put(skb, sizeof(u32));
-	*ptr = cookie;
-	serr->ee.ee_data = ++ncookies;
+	ck->cookies[ncookies] = cookie;
+	ck->num =  ++ncookies;
 	return true;
 	return true;
 }
 }
 
 
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 				     struct rds_znotifier *znotif)
 				     struct rds_znotifier *znotif)
 {
 {
-	struct sock *sk = rds_rs_to_sk(rs);
 	struct sk_buff *skb, *tail;
 	struct sk_buff *skb, *tail;
-	struct sock_exterr_skb *serr;
 	unsigned long flags;
 	unsigned long flags;
 	struct sk_buff_head *q;
 	struct sk_buff_head *q;
 	u32 cookie = znotif->z_cookie;
 	u32 cookie = znotif->z_cookie;
+	struct rds_zcopy_cookies *ck;
 
 
-	q = &sk->sk_error_queue;
+	q = &rs->rs_zcookie_queue;
 	spin_lock_irqsave(&q->lock, flags);
 	spin_lock_irqsave(&q->lock, flags);
 	tail = skb_peek_tail(q);
 	tail = skb_peek_tail(q);
 
 
@@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
 		spin_unlock_irqrestore(&q->lock, flags);
 		spin_unlock_irqrestore(&q->lock, flags);
 		mm_unaccount_pinned_pages(&znotif->z_mmp);
 		mm_unaccount_pinned_pages(&znotif->z_mmp);
 		consume_skb(rds_skb_from_znotifier(znotif));
 		consume_skb(rds_skb_from_znotifier(znotif));
-		sk->sk_error_report(sk);
+		/* caller invokes rds_wake_sk_sleep() */
 		return;
 		return;
 	}
 	}
 
 
 	skb = rds_skb_from_znotifier(znotif);
 	skb = rds_skb_from_znotifier(znotif);
-	serr = SKB_EXT_ERR(skb);
-	memset(&serr->ee, 0, sizeof(serr->ee));
-	serr->ee.ee_errno = 0;
-	serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
-	serr->ee.ee_info = 0;
+	ck = (struct rds_zcopy_cookies *)skb->cb;
+	memset(ck, 0, sizeof(*ck));
 	WARN_ON(!skb_zcookie_add(skb, cookie));
 	WARN_ON(!skb_zcookie_add(skb, cookie));
 
 
 	__skb_queue_tail(q, skb);
 	__skb_queue_tail(q, skb);
 
 
 	spin_unlock_irqrestore(&q->lock, flags);
 	spin_unlock_irqrestore(&q->lock, flags);
-	sk->sk_error_report(sk);
+	/* caller invokes rds_wake_sk_sleep() */
 
 
 	mm_unaccount_pinned_pages(&znotif->z_mmp);
 	mm_unaccount_pinned_pages(&znotif->z_mmp);
 }
 }
@@ -129,6 +120,7 @@ static void rds_message_purge(struct rds_message *rm)
 		if (rm->data.op_mmp_znotifier) {
 		if (rm->data.op_mmp_znotifier) {
 			zcopy = true;
 			zcopy = true;
 			rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
 			rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+			rds_wake_sk_sleep(rs);
 			rm->data.op_mmp_znotifier = NULL;
 			rm->data.op_mmp_znotifier = NULL;
 		}
 		}
 		sock_put(rds_rs_to_sk(rs));
 		sock_put(rds_rs_to_sk(rs));
@@ -362,10 +354,12 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
 		int total_copied = 0;
 		int total_copied = 0;
 		struct sk_buff *skb;
 		struct sk_buff *skb;
 
 
-		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
-				GFP_KERNEL);
+		skb = alloc_skb(0, GFP_KERNEL);
 		if (!skb)
 		if (!skb)
 			return -ENOMEM;
 			return -ENOMEM;
+		BUILD_BUG_ON(sizeof(skb->cb) <
+			     max_t(int, sizeof(struct rds_znotifier),
+				   sizeof(struct rds_zcopy_cookies)));
 		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
 		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
 		if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
 		if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
 					    length)) {
 					    length)) {

+ 2 - 0
net/rds/rds.h

@@ -603,6 +603,8 @@ struct rds_sock {
 	/* Socket receive path trace points*/
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+	struct sk_buff_head	rs_zcookie_queue;
 };
 };
 
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)

+ 30 - 1
net/rds/recv.c

@@ -577,6 +577,32 @@ out:
 	return ret;
 	return ret;
 }
 }
 
 
+static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
+{
+	struct sk_buff *skb;
+	struct sk_buff_head *q = &rs->rs_zcookie_queue;
+	struct rds_zcopy_cookies *done;
+
+	if (!msg->msg_control)
+		return false;
+
+	if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
+	    msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
+		return false;
+
+	skb = skb_dequeue(q);
+	if (!skb)
+		return false;
+	done = (struct rds_zcopy_cookies *)skb->cb;
+	if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
+		     done)) {
+		skb_queue_head(q, skb);
+		return false;
+	}
+	consume_skb(skb);
+	return true;
+}
+
 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 		int msg_flags)
 		int msg_flags)
 {
 {
@@ -611,7 +637,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 
 
 		if (!rds_next_incoming(rs, &inc)) {
 		if (!rds_next_incoming(rs, &inc)) {
 			if (nonblock) {
 			if (nonblock) {
-				ret = -EAGAIN;
+				bool reaped = rds_recvmsg_zcookie(rs, msg);
+
+				ret = reaped ?  0 : -EAGAIN;
 				break;
 				break;
 			}
 			}
 
 
@@ -660,6 +688,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 			ret = -EFAULT;
 			ret = -EFAULT;
 			goto out;
 			goto out;
 		}
 		}
+		rds_recvmsg_zcookie(rs, msg);
 
 
 		rds_stats_inc(s_recv_delivered);
 		rds_stats_inc(s_recv_delivered);
 
 

+ 51 - 69
tools/testing/selftests/net/msg_zerocopy.c

@@ -344,27 +344,53 @@ static int do_setup_tx(int domain, int type, int protocol)
 	return fd;
 	return fd;
 }
 }
 
 
-static int do_process_zerocopy_cookies(struct sock_extended_err *serr,
-				       uint32_t *ckbuf, size_t nbytes)
+static uint32_t do_process_zerocopy_cookies(struct rds_zcopy_cookies *ck)
 {
 {
-	int ncookies, i;
+	int i;
 
 
-	if (serr->ee_errno != 0)
-		error(1, 0, "serr: wrong error code: %u", serr->ee_errno);
-	ncookies = serr->ee_data;
-	if (ncookies > SO_EE_ORIGIN_MAX_ZCOOKIES)
+	if (ck->num > RDS_MAX_ZCOOKIES)
 		error(1, 0, "Returned %d cookies, max expected %d\n",
 		error(1, 0, "Returned %d cookies, max expected %d\n",
-		      ncookies, SO_EE_ORIGIN_MAX_ZCOOKIES);
-	if (nbytes != ncookies * sizeof(uint32_t))
-		error(1, 0, "Expected %d cookies, got %ld\n",
-		      ncookies, nbytes/sizeof(uint32_t));
-	for (i = 0; i < ncookies; i++)
+		      ck->num, RDS_MAX_ZCOOKIES);
+	for (i = 0; i < ck->num; i++)
 		if (cfg_verbose >= 2)
 		if (cfg_verbose >= 2)
-			fprintf(stderr, "%d\n", ckbuf[i]);
-	return ncookies;
+			fprintf(stderr, "%d\n", ck->cookies[i]);
+	return ck->num;
 }
 }
 
 
-static bool do_recv_completion(int fd)
+static bool do_recvmsg_completion(int fd)
+{
+	char cmsgbuf[CMSG_SPACE(sizeof(struct rds_zcopy_cookies))];
+	struct rds_zcopy_cookies *ck;
+	struct cmsghdr *cmsg;
+	struct msghdr msg;
+	bool ret = false;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.msg_control = cmsgbuf;
+	msg.msg_controllen = sizeof(cmsgbuf);
+
+	if (recvmsg(fd, &msg, MSG_DONTWAIT))
+		return ret;
+
+	if (msg.msg_flags & MSG_CTRUNC)
+		error(1, errno, "recvmsg notification: truncated");
+
+	for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+		if (cmsg->cmsg_level == SOL_RDS &&
+		    cmsg->cmsg_type == RDS_CMSG_ZCOPY_COMPLETION) {
+
+			ck = (struct rds_zcopy_cookies *)CMSG_DATA(cmsg);
+			completions += do_process_zerocopy_cookies(ck);
+			ret = true;
+			break;
+		}
+		error(0, 0, "ignoring cmsg at level %d type %d\n",
+			    cmsg->cmsg_level, cmsg->cmsg_type);
+	}
+	return ret;
+}
+
+static bool do_recv_completion(int fd, int domain)
 {
 {
 	struct sock_extended_err *serr;
 	struct sock_extended_err *serr;
 	struct msghdr msg = {};
 	struct msghdr msg = {};
@@ -372,17 +398,13 @@ static bool do_recv_completion(int fd)
 	uint32_t hi, lo, range;
 	uint32_t hi, lo, range;
 	int ret, zerocopy;
 	int ret, zerocopy;
 	char control[100];
 	char control[100];
-	uint32_t ckbuf[SO_EE_ORIGIN_MAX_ZCOOKIES];
-	struct iovec iov;
+
+	if (domain == PF_RDS)
+		return do_recvmsg_completion(fd);
 
 
 	msg.msg_control = control;
 	msg.msg_control = control;
 	msg.msg_controllen = sizeof(control);
 	msg.msg_controllen = sizeof(control);
 
 
-	iov.iov_base = ckbuf;
-	iov.iov_len = (SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(ckbuf[0]));
-	msg.msg_iov = &iov;
-	msg.msg_iovlen = 1;
-
 	ret = recvmsg(fd, &msg, MSG_ERRQUEUE);
 	ret = recvmsg(fd, &msg, MSG_ERRQUEUE);
 	if (ret == -1 && errno == EAGAIN)
 	if (ret == -1 && errno == EAGAIN)
 		return false;
 		return false;
@@ -402,10 +424,6 @@ static bool do_recv_completion(int fd)
 
 
 	serr = (void *) CMSG_DATA(cm);
 	serr = (void *) CMSG_DATA(cm);
 
 
-	if (serr->ee_origin == SO_EE_ORIGIN_ZCOOKIE) {
-		completions += do_process_zerocopy_cookies(serr, ckbuf, ret);
-		return true;
-	}
 	if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY)
 	if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY)
 		error(1, 0, "serr: wrong origin: %u", serr->ee_origin);
 		error(1, 0, "serr: wrong origin: %u", serr->ee_origin);
 	if (serr->ee_errno != 0)
 	if (serr->ee_errno != 0)
@@ -440,20 +458,20 @@ static bool do_recv_completion(int fd)
 }
 }
 
 
 /* Read all outstanding messages on the errqueue */
 /* Read all outstanding messages on the errqueue */
-static void do_recv_completions(int fd)
+static void do_recv_completions(int fd, int domain)
 {
 {
-	while (do_recv_completion(fd)) {}
+	while (do_recv_completion(fd, domain)) {}
 }
 }
 
 
 /* Wait for all remaining completions on the errqueue */
 /* Wait for all remaining completions on the errqueue */
-static void do_recv_remaining_completions(int fd)
+static void do_recv_remaining_completions(int fd, int domain)
 {
 {
 	int64_t tstop = gettimeofday_ms() + cfg_waittime_ms;
 	int64_t tstop = gettimeofday_ms() + cfg_waittime_ms;
 
 
 	while (completions < expected_completions &&
 	while (completions < expected_completions &&
 	       gettimeofday_ms() < tstop) {
 	       gettimeofday_ms() < tstop) {
-		if (do_poll(fd, POLLERR))
-			do_recv_completions(fd);
+		if (do_poll(fd, domain == PF_RDS ? POLLIN : POLLERR))
+			do_recv_completions(fd, domain);
 	}
 	}
 
 
 	if (completions < expected_completions)
 	if (completions < expected_completions)
@@ -534,13 +552,13 @@ static void do_tx(int domain, int type, int protocol)
 
 
 		while (!do_poll(fd, POLLOUT)) {
 		while (!do_poll(fd, POLLOUT)) {
 			if (cfg_zerocopy)
 			if (cfg_zerocopy)
-				do_recv_completions(fd);
+				do_recv_completions(fd, domain);
 		}
 		}
 
 
 	} while (gettimeofday_ms() < tstop);
 	} while (gettimeofday_ms() < tstop);
 
 
 	if (cfg_zerocopy)
 	if (cfg_zerocopy)
-		do_recv_remaining_completions(fd);
+		do_recv_remaining_completions(fd, domain);
 
 
 	if (close(fd))
 	if (close(fd))
 		error(1, errno, "close");
 		error(1, errno, "close");
@@ -631,40 +649,6 @@ static void do_flush_datagram(int fd, int type)
 	bytes += cfg_payload_len;
 	bytes += cfg_payload_len;
 }
 }
 
 
-
-static void do_recvmsg(int fd)
-{
-	int ret, off = 0;
-	char *buf;
-	struct iovec iov;
-	struct msghdr msg;
-	struct sockaddr_storage din;
-
-	buf = calloc(cfg_payload_len, sizeof(char));
-	iov.iov_base = buf;
-	iov.iov_len = cfg_payload_len;
-
-	memset(&msg, 0, sizeof(msg));
-	msg.msg_name = &din;
-	msg.msg_namelen = sizeof(din);
-	msg.msg_iov = &iov;
-	msg.msg_iovlen = 1;
-
-	ret = recvmsg(fd, &msg, MSG_TRUNC);
-
-	if (ret == -1)
-		error(1, errno, "recv");
-	if (ret != cfg_payload_len)
-		error(1, 0, "recv: ret=%u != %u", ret, cfg_payload_len);
-
-	if (memcmp(buf + off, payload, ret))
-		error(1, 0, "recv: data mismatch");
-
-	free(buf);
-	packets++;
-	bytes += cfg_payload_len;
-}
-
 static void do_rx(int domain, int type, int protocol)
 static void do_rx(int domain, int type, int protocol)
 {
 {
 	uint64_t tstop;
 	uint64_t tstop;
@@ -676,8 +660,6 @@ static void do_rx(int domain, int type, int protocol)
 	do {
 	do {
 		if (type == SOCK_STREAM)
 		if (type == SOCK_STREAM)
 			do_flush_tcp(fd);
 			do_flush_tcp(fd);
-		else if (domain == PF_RDS)
-			do_recvmsg(fd);
 		else
 		else
 			do_flush_datagram(fd, type);
 			do_flush_datagram(fd, type);