瀏覽代碼

Merge tag 'rxrpc-rewrite-20160917-2' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Tracepoint addition and improvement

Here is a set of patches that add some more tracepoints and improve a couple
of existing ones.  New additions include:

 (1) Connection refcount tracking.

 (2) Client connection state machine tracking.

 (3) Tx and Rx packet lifecycle.

 (4) ACK reception and transmission.

 (5) recvmsg processing.

Updates include:

 (1) Print the symbolic packet name in the Rx packet tracepoint.

 (2) Additional call refcount trace events.

 (3) Improvements to sk_buff tracking with AF_RXRPC.

In addition:

 (1) Config option to inject packet loss during both transmission and
     reception.

 (2) Removal of some printks.

This series needs to be applied on top of the previously posted fixes.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
David S. Miller 9 年之前
父節點
當前提交
e867e87ae8

+ 216 - 10
include/trace/events/rxrpc.h

@@ -16,6 +16,66 @@
 
 #include <linux/tracepoint.h>
 
+TRACE_EVENT(rxrpc_conn,
+	    TP_PROTO(struct rxrpc_connection *conn, enum rxrpc_conn_trace op,
+		     int usage, const void *where),
+
+	    TP_ARGS(conn, op, usage, where),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_connection *,	conn		)
+		    __field(int,			op		)
+		    __field(int,			usage		)
+		    __field(const void *,		where		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->conn = conn;
+		    __entry->op = op;
+		    __entry->usage = usage;
+		    __entry->where = where;
+			   ),
+
+	    TP_printk("C=%p %s u=%d sp=%pSR",
+		      __entry->conn,
+		      rxrpc_conn_traces[__entry->op],
+		      __entry->usage,
+		      __entry->where)
+	    );
+
+TRACE_EVENT(rxrpc_client,
+	    TP_PROTO(struct rxrpc_connection *conn, int channel,
+		     enum rxrpc_client_trace op),
+
+	    TP_ARGS(conn, channel, op),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_connection *,	conn		)
+		    __field(u32,			cid		)
+		    __field(int,			channel		)
+		    __field(int,			usage		)
+		    __field(enum rxrpc_client_trace,	op		)
+		    __field(enum rxrpc_conn_cache_state, cs		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->conn = conn;
+		    __entry->channel = channel;
+		    __entry->usage = atomic_read(&conn->usage);
+		    __entry->op = op;
+		    __entry->cid = conn->proto.cid;
+		    __entry->cs = conn->cache_state;
+			   ),
+
+	    TP_printk("C=%p h=%2d %s %s i=%08x u=%d",
+		      __entry->conn,
+		      __entry->channel,
+		      rxrpc_client_traces[__entry->op],
+		      rxrpc_conn_cache_states[__entry->cs],
+		      __entry->cid,
+		      __entry->usage)
+	    );
+
 TRACE_EVENT(rxrpc_call,
 	    TP_PROTO(struct rxrpc_call *call, enum rxrpc_call_trace op,
 		     int usage, const void *where, const void *aux),
@@ -47,14 +107,14 @@ TRACE_EVENT(rxrpc_call,
 	    );
 
 TRACE_EVENT(rxrpc_skb,
-	    TP_PROTO(struct sk_buff *skb, int op, int usage, int mod_count,
-		     const void *where),
+	    TP_PROTO(struct sk_buff *skb, enum rxrpc_skb_trace op,
+		     int usage, int mod_count, const void *where),
 
 	    TP_ARGS(skb, op, usage, mod_count, where),
 
 	    TP_STRUCT__entry(
 		    __field(struct sk_buff *,		skb		)
-		    __field(int,			op		)
+		    __field(enum rxrpc_skb_trace,	op		)
 		    __field(int,			usage		)
 		    __field(int,			mod_count	)
 		    __field(const void *,		where		)
@@ -70,11 +130,7 @@ TRACE_EVENT(rxrpc_skb,
 
 	    TP_printk("s=%p %s u=%d m=%d p=%pSR",
 		      __entry->skb,
-		      (__entry->op == 0 ? "NEW" :
-		       __entry->op == 1 ? "SEE" :
-		       __entry->op == 2 ? "GET" :
-		       __entry->op == 3 ? "FRE" :
-		       "PUR"),
+		      rxrpc_skb_traces[__entry->op],
 		      __entry->usage,
 		      __entry->mod_count,
 		      __entry->where)
@@ -93,11 +149,12 @@ TRACE_EVENT(rxrpc_rx_packet,
 		    memcpy(&__entry->hdr, &sp->hdr, sizeof(__entry->hdr));
 			   ),
 
-	    TP_printk("%08x:%08x:%08x:%04x %08x %08x %02x %02x",
+	    TP_printk("%08x:%08x:%08x:%04x %08x %08x %02x %02x %s",
 		      __entry->hdr.epoch, __entry->hdr.cid,
 		      __entry->hdr.callNumber, __entry->hdr.serviceId,
 		      __entry->hdr.serial, __entry->hdr.seq,
-		      __entry->hdr.type, __entry->hdr.flags)
+		      __entry->hdr.type, __entry->hdr.flags,
+		      __entry->hdr.type <= 15 ? rxrpc_pkts[__entry->hdr.type] : "?UNK")
 	    );
 
 TRACE_EVENT(rxrpc_rx_done,
@@ -147,6 +204,155 @@ TRACE_EVENT(rxrpc_abort,
 		      __entry->abort_code, __entry->error, __entry->why)
 	    );
 
+TRACE_EVENT(rxrpc_transmit,
+	    TP_PROTO(struct rxrpc_call *call, enum rxrpc_transmit_trace why),
+
+	    TP_ARGS(call, why),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_call *,	call		)
+		    __field(enum rxrpc_transmit_trace,	why		)
+		    __field(rxrpc_seq_t,		tx_hard_ack	)
+		    __field(rxrpc_seq_t,		tx_top		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call = call;
+		    __entry->why = why;
+		    __entry->tx_hard_ack = call->tx_hard_ack;
+		    __entry->tx_top = call->tx_top;
+			   ),
+
+	    TP_printk("c=%p %s f=%08x n=%u",
+		      __entry->call,
+		      rxrpc_transmit_traces[__entry->why],
+		      __entry->tx_hard_ack + 1,
+		      __entry->tx_top - __entry->tx_hard_ack)
+	    );
+
+TRACE_EVENT(rxrpc_rx_ack,
+	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t first, u8 reason, u8 n_acks),
+
+	    TP_ARGS(call, first, reason, n_acks),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_call *,	call		)
+		    __field(rxrpc_seq_t,		first		)
+		    __field(u8,				reason		)
+		    __field(u8,				n_acks		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call = call;
+		    __entry->first = first;
+		    __entry->reason = reason;
+		    __entry->n_acks = n_acks;
+			   ),
+
+	    TP_printk("c=%p %s f=%08x n=%u",
+		      __entry->call,
+		      rxrpc_acks(__entry->reason),
+		      __entry->first,
+		      __entry->n_acks)
+	    );
+
+TRACE_EVENT(rxrpc_tx_ack,
+	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t first,
+		     rxrpc_serial_t serial, u8 reason, u8 n_acks),
+
+	    TP_ARGS(call, first, serial, reason, n_acks),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_call *,	call		)
+		    __field(rxrpc_seq_t,		first		)
+		    __field(rxrpc_serial_t,		serial		)
+		    __field(u8,				reason		)
+		    __field(u8,				n_acks		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call = call;
+		    __entry->first = first;
+		    __entry->serial = serial;
+		    __entry->reason = reason;
+		    __entry->n_acks = n_acks;
+			   ),
+
+	    TP_printk("c=%p %s f=%08x r=%08x n=%u",
+		      __entry->call,
+		      rxrpc_acks(__entry->reason),
+		      __entry->first,
+		      __entry->serial,
+		      __entry->n_acks)
+	    );
+
+TRACE_EVENT(rxrpc_receive,
+	    TP_PROTO(struct rxrpc_call *call, enum rxrpc_receive_trace why,
+		     rxrpc_serial_t serial, rxrpc_seq_t seq),
+
+	    TP_ARGS(call, why, serial, seq),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_call *,	call		)
+		    __field(enum rxrpc_receive_trace,	why		)
+		    __field(rxrpc_serial_t,		serial		)
+		    __field(rxrpc_seq_t,		seq		)
+		    __field(rxrpc_seq_t,		hard_ack	)
+		    __field(rxrpc_seq_t,		top		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call = call;
+		    __entry->why = why;
+		    __entry->serial = serial;
+		    __entry->seq = seq;
+		    __entry->hard_ack = call->rx_hard_ack;
+		    __entry->top = call->rx_top;
+			   ),
+
+	    TP_printk("c=%p %s r=%08x q=%08x w=%08x-%08x",
+		      __entry->call,
+		      rxrpc_receive_traces[__entry->why],
+		      __entry->serial,
+		      __entry->seq,
+		      __entry->hard_ack,
+		      __entry->top)
+	    );
+
+TRACE_EVENT(rxrpc_recvmsg,
+	    TP_PROTO(struct rxrpc_call *call, enum rxrpc_recvmsg_trace why,
+		     rxrpc_seq_t seq, unsigned int offset, unsigned int len,
+		     int ret),
+
+	    TP_ARGS(call, why, seq, offset, len, ret),
+
+	    TP_STRUCT__entry(
+		    __field(struct rxrpc_call *,	call		)
+		    __field(enum rxrpc_recvmsg_trace,	why		)
+		    __field(rxrpc_seq_t,		seq		)
+		    __field(unsigned int,		offset		)
+		    __field(unsigned int,		len		)
+		    __field(int,			ret		)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call = call;
+		    __entry->why = why;
+		    __entry->seq = seq;
+		    __entry->offset = offset;
+		    __entry->len = len;
+		    __entry->ret = ret;
+			   ),
+
+	    TP_printk("c=%p %s q=%08x o=%u l=%u ret=%d",
+		      __entry->call,
+		      rxrpc_recvmsg_traces[__entry->why],
+		      __entry->seq,
+		      __entry->offset,
+		      __entry->len,
+		      __entry->ret)
+	    );
+
 #endif /* _TRACE_RXRPC_H */
 
 /* This part must be outside protection */

+ 7 - 0
net/rxrpc/Kconfig

@@ -26,6 +26,13 @@ config AF_RXRPC_IPV6
 	  Say Y here to allow AF_RXRPC to use IPV6 UDP as well as IPV4 UDP as
 	  its network transport.
 
+config AF_RXRPC_INJECT_LOSS
+	bool "Inject packet loss into RxRPC packet stream"
+	depends on AF_RXRPC
+	help
+	  Say Y here to inject packet loss by discarding some received and some
+	  transmitted packets.
+
 
 config AF_RXRPC_DEBUG
 	bool "RxRPC dynamic debugging"

+ 3 - 2
net/rxrpc/af_rxrpc.c

@@ -45,7 +45,7 @@ u32 rxrpc_epoch;
 atomic_t rxrpc_debug_id;
 
 /* count of skbs currently in use */
-atomic_t rxrpc_n_skbs;
+atomic_t rxrpc_n_tx_skbs, rxrpc_n_rx_skbs;
 
 struct workqueue_struct *rxrpc_workqueue;
 
@@ -867,7 +867,8 @@ static void __exit af_rxrpc_exit(void)
 	proto_unregister(&rxrpc_proto);
 	rxrpc_destroy_all_calls();
 	rxrpc_destroy_all_connections();
-	ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
+	ASSERTCMP(atomic_read(&rxrpc_n_tx_skbs), ==, 0);
+	ASSERTCMP(atomic_read(&rxrpc_n_rx_skbs), ==, 0);
 	rxrpc_destroy_all_locals();
 
 	remove_proc_entry("rxrpc_conns", init_net.proc_net);

+ 123 - 36
net/rxrpc/ar-internal.h

@@ -314,6 +314,7 @@ enum rxrpc_conn_cache_state {
 	RXRPC_CONN_CLIENT_ACTIVE,	/* Conn is on active list, doing calls */
 	RXRPC_CONN_CLIENT_CULLED,	/* Conn is culled and delisted, doing calls */
 	RXRPC_CONN_CLIENT_IDLE,		/* Conn is on idle list, doing mostly nothing */
+	RXRPC_CONN__NR_CACHE_STATES
 };
 
 /*
@@ -519,6 +520,7 @@ struct rxrpc_call {
 	rxrpc_seq_t		rx_expect_next;	/* Expected next packet sequence number */
 	u8			rx_winsize;	/* Size of Rx window */
 	u8			tx_winsize;	/* Maximum size of Tx window */
+	bool			tx_phase;	/* T if transmission phase, F if receive phase */
 	u8			nr_jumbo_bad;	/* Number of jumbo dups/exceeds-windows */
 
 	/* receive-phase ACK management */
@@ -533,12 +535,73 @@ struct rxrpc_call {
 	rxrpc_serial_t		acks_latest;	/* serial number of latest ACK received */
 };
 
+enum rxrpc_skb_trace {
+	rxrpc_skb_rx_cleaned,
+	rxrpc_skb_rx_freed,
+	rxrpc_skb_rx_got,
+	rxrpc_skb_rx_lost,
+	rxrpc_skb_rx_received,
+	rxrpc_skb_rx_rotated,
+	rxrpc_skb_rx_purged,
+	rxrpc_skb_rx_seen,
+	rxrpc_skb_tx_cleaned,
+	rxrpc_skb_tx_freed,
+	rxrpc_skb_tx_got,
+	rxrpc_skb_tx_lost,
+	rxrpc_skb_tx_new,
+	rxrpc_skb_tx_rotated,
+	rxrpc_skb_tx_seen,
+	rxrpc_skb__nr_trace
+};
+
+extern const char rxrpc_skb_traces[rxrpc_skb__nr_trace][7];
+
+enum rxrpc_conn_trace {
+	rxrpc_conn_new_client,
+	rxrpc_conn_new_service,
+	rxrpc_conn_queued,
+	rxrpc_conn_seen,
+	rxrpc_conn_got,
+	rxrpc_conn_put_client,
+	rxrpc_conn_put_service,
+	rxrpc_conn__nr_trace
+};
+
+extern const char rxrpc_conn_traces[rxrpc_conn__nr_trace][4];
+
+enum rxrpc_client_trace {
+	rxrpc_client_activate_chans,
+	rxrpc_client_alloc,
+	rxrpc_client_chan_activate,
+	rxrpc_client_chan_disconnect,
+	rxrpc_client_chan_pass,
+	rxrpc_client_chan_unstarted,
+	rxrpc_client_cleanup,
+	rxrpc_client_count,
+	rxrpc_client_discard,
+	rxrpc_client_duplicate,
+	rxrpc_client_exposed,
+	rxrpc_client_replace,
+	rxrpc_client_to_active,
+	rxrpc_client_to_culled,
+	rxrpc_client_to_idle,
+	rxrpc_client_to_inactive,
+	rxrpc_client_to_waiting,
+	rxrpc_client_uncount,
+	rxrpc_client__nr_trace
+};
+
+extern const char rxrpc_client_traces[rxrpc_client__nr_trace][7];
+extern const char rxrpc_conn_cache_states[RXRPC_CONN__NR_CACHE_STATES][5];
+
 enum rxrpc_call_trace {
 	rxrpc_call_new_client,
 	rxrpc_call_new_service,
 	rxrpc_call_queued,
 	rxrpc_call_queued_ref,
 	rxrpc_call_seen,
+	rxrpc_call_connected,
+	rxrpc_call_release,
 	rxrpc_call_got,
 	rxrpc_call_got_userid,
 	rxrpc_call_got_kernel,
@@ -546,17 +609,62 @@ enum rxrpc_call_trace {
 	rxrpc_call_put_userid,
 	rxrpc_call_put_kernel,
 	rxrpc_call_put_noqueue,
+	rxrpc_call_error,
 	rxrpc_call__nr_trace
 };
 
 extern const char rxrpc_call_traces[rxrpc_call__nr_trace][4];
 
+enum rxrpc_transmit_trace {
+	rxrpc_transmit_wait,
+	rxrpc_transmit_queue,
+	rxrpc_transmit_queue_reqack,
+	rxrpc_transmit_queue_last,
+	rxrpc_transmit_rotate,
+	rxrpc_transmit_end,
+	rxrpc_transmit__nr_trace
+};
+
+extern const char rxrpc_transmit_traces[rxrpc_transmit__nr_trace][4];
+
+enum rxrpc_receive_trace {
+	rxrpc_receive_incoming,
+	rxrpc_receive_queue,
+	rxrpc_receive_queue_last,
+	rxrpc_receive_front,
+	rxrpc_receive_rotate,
+	rxrpc_receive_end,
+	rxrpc_receive__nr_trace
+};
+
+extern const char rxrpc_receive_traces[rxrpc_receive__nr_trace][4];
+
+enum rxrpc_recvmsg_trace {
+	rxrpc_recvmsg_enter,
+	rxrpc_recvmsg_wait,
+	rxrpc_recvmsg_dequeue,
+	rxrpc_recvmsg_hole,
+	rxrpc_recvmsg_next,
+	rxrpc_recvmsg_cont,
+	rxrpc_recvmsg_full,
+	rxrpc_recvmsg_data_return,
+	rxrpc_recvmsg_terminal,
+	rxrpc_recvmsg_to_be_accepted,
+	rxrpc_recvmsg_return,
+	rxrpc_recvmsg__nr_trace
+};
+
+extern const char rxrpc_recvmsg_traces[rxrpc_recvmsg__nr_trace][5];
+
+extern const char *const rxrpc_pkts[];
+extern const char *rxrpc_acks(u8 reason);
+
 #include <trace/events/rxrpc.h>
 
 /*
  * af_rxrpc.c
  */
-extern atomic_t rxrpc_n_skbs;
+extern atomic_t rxrpc_n_tx_skbs, rxrpc_n_rx_skbs;
 extern u32 rxrpc_epoch;
 extern atomic_t rxrpc_debug_id;
 extern struct workqueue_struct *rxrpc_workqueue;
@@ -728,7 +836,11 @@ struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
 void __rxrpc_disconnect_call(struct rxrpc_connection *, struct rxrpc_call *);
 void rxrpc_disconnect_call(struct rxrpc_call *);
 void rxrpc_kill_connection(struct rxrpc_connection *);
-void __rxrpc_put_connection(struct rxrpc_connection *);
+bool rxrpc_queue_conn(struct rxrpc_connection *);
+void rxrpc_see_connection(struct rxrpc_connection *);
+void rxrpc_get_connection(struct rxrpc_connection *);
+struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *);
+void rxrpc_put_service_conn(struct rxrpc_connection *);
 void __exit rxrpc_destroy_all_connections(void);
 
 static inline bool rxrpc_conn_is_client(const struct rxrpc_connection *conn)
@@ -741,38 +853,15 @@ static inline bool rxrpc_conn_is_service(const struct rxrpc_connection *conn)
 	return !rxrpc_conn_is_client(conn);
 }
 
-static inline void rxrpc_get_connection(struct rxrpc_connection *conn)
-{
-	atomic_inc(&conn->usage);
-}
-
-static inline
-struct rxrpc_connection *rxrpc_get_connection_maybe(struct rxrpc_connection *conn)
-{
-	return atomic_inc_not_zero(&conn->usage) ? conn : NULL;
-}
-
 static inline void rxrpc_put_connection(struct rxrpc_connection *conn)
 {
 	if (!conn)
 		return;
 
-	if (rxrpc_conn_is_client(conn)) {
-		if (atomic_dec_and_test(&conn->usage))
-			rxrpc_put_client_conn(conn);
-	} else {
-		if (atomic_dec_return(&conn->usage) == 1)
-			__rxrpc_put_connection(conn);
-	}
-}
-
-static inline bool rxrpc_queue_conn(struct rxrpc_connection *conn)
-{
-	if (!rxrpc_get_connection_maybe(conn))
-		return false;
-	if (!rxrpc_queue_work(&conn->processor))
-		rxrpc_put_connection(conn);
-	return true;
+	if (rxrpc_conn_is_client(conn))
+		rxrpc_put_client_conn(conn);
+	else
+		rxrpc_put_service_conn(conn);
 }
 
 /*
@@ -851,11 +940,8 @@ extern unsigned int rxrpc_rx_mtu;
 extern unsigned int rxrpc_rx_jumbo_max;
 extern unsigned int rxrpc_resend_timeout;
 
-extern const char *const rxrpc_pkts[];
 extern const s8 rxrpc_ack_priority[];
 
-extern const char *rxrpc_acks(u8 reason);
-
 /*
  * output.c
  */
@@ -936,10 +1022,11 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
  */
 void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
 void rxrpc_packet_destructor(struct sk_buff *);
-void rxrpc_new_skb(struct sk_buff *);
-void rxrpc_see_skb(struct sk_buff *);
-void rxrpc_get_skb(struct sk_buff *);
-void rxrpc_free_skb(struct sk_buff *);
+void rxrpc_new_skb(struct sk_buff *, enum rxrpc_skb_trace);
+void rxrpc_see_skb(struct sk_buff *, enum rxrpc_skb_trace);
+void rxrpc_get_skb(struct sk_buff *, enum rxrpc_skb_trace);
+void rxrpc_free_skb(struct sk_buff *, enum rxrpc_skb_trace);
+void rxrpc_lose_skb(struct sk_buff *, enum rxrpc_skb_trace);
 void rxrpc_purge_queue(struct sk_buff_head *);
 
 /*

+ 7 - 0
net/rxrpc/call_accept.c

@@ -85,6 +85,9 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
 		b->conn_backlog[head] = conn;
 		smp_store_release(&b->conn_backlog_head,
 				  (head + 1) & (size - 1));
+
+		trace_rxrpc_conn(conn, rxrpc_conn_new_service,
+				 atomic_read(&conn->usage), here);
 	}
 
 	/* Now it gets complicated, because calls get registered with the
@@ -290,6 +293,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 		rxrpc_get_local(local);
 		conn->params.local = local;
 		conn->params.peer = peer;
+		rxrpc_see_connection(conn);
 		rxrpc_new_incoming_connection(conn, skb);
 	} else {
 		rxrpc_get_connection(conn);
@@ -363,6 +367,9 @@ found_service:
 		goto out;
 	}
 
+	trace_rxrpc_receive(call, rxrpc_receive_incoming,
+			    sp->hdr.serial, sp->hdr.seq);
+
 	/* Make the call live. */
 	rxrpc_incoming_call(rx, call, skb);
 	conn = call->conn;

+ 4 - 4
net/rxrpc/call_event.c

@@ -170,7 +170,7 @@ static void rxrpc_resend(struct rxrpc_call *call)
 			continue;
 
 		skb = call->rxtx_buffer[ix];
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
 		sp = rxrpc_skb(skb);
 
 		if (annotation == RXRPC_TX_ANNO_UNACK) {
@@ -199,7 +199,7 @@ static void rxrpc_resend(struct rxrpc_call *call)
 			continue;
 
 		skb = call->rxtx_buffer[ix];
-		rxrpc_get_skb(skb);
+		rxrpc_get_skb(skb, rxrpc_skb_tx_got);
 		spin_unlock_bh(&call->lock);
 		sp = rxrpc_skb(skb);
 
@@ -211,7 +211,7 @@ static void rxrpc_resend(struct rxrpc_call *call)
 
 		if (rxrpc_send_data_packet(call->conn, skb) < 0) {
 			call->resend_at = now + 2;
-			rxrpc_free_skb(skb);
+			rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 			return;
 		}
 
@@ -219,7 +219,7 @@ static void rxrpc_resend(struct rxrpc_call *call)
 			rxrpc_expose_client_call(call);
 		sp->resend_at = now + rxrpc_resend_timeout;
 
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 		spin_lock_bh(&call->lock);
 
 		/* We need to clear the retransmit state, but there are two

+ 22 - 9
net/rxrpc/call_object.c

@@ -53,6 +53,8 @@ const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = {
 	[rxrpc_call_new_service]	= "NWs",
 	[rxrpc_call_queued]		= "QUE",
 	[rxrpc_call_queued_ref]		= "QUR",
+	[rxrpc_call_connected]		= "CON",
+	[rxrpc_call_release]		= "RLS",
 	[rxrpc_call_seen]		= "SEE",
 	[rxrpc_call_got]		= "GOT",
 	[rxrpc_call_got_userid]		= "Gus",
@@ -61,6 +63,7 @@ const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = {
 	[rxrpc_call_put_userid]		= "Pus",
 	[rxrpc_call_put_kernel]		= "Pke",
 	[rxrpc_call_put_noqueue]	= "PNQ",
+	[rxrpc_call_error]		= "*E*",
 };
 
 struct kmem_cache *rxrpc_call_jar;
@@ -179,6 +182,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct sockaddr_rxrpc *srx,
 		return ERR_PTR(-ENOMEM);
 	call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
 	call->service_id = srx->srx_service;
+	call->tx_phase = true;
 
 	_leave(" = %p", call);
 	return call;
@@ -222,8 +226,8 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 		return call;
 	}
 
-	trace_rxrpc_call(call, 0, atomic_read(&call->usage), here,
-			 (const void *)user_call_ID);
+	trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
+			 here, (const void *)user_call_ID);
 
 	/* Publish the call, even though it is incompletely set up as yet */
 	write_lock(&rx->call_lock);
@@ -263,6 +267,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	if (ret < 0)
 		goto error;
 
+	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
+			 here, ERR_PTR(ret));
+
 	spin_lock_bh(&call->conn->params.peer->lock);
 	hlist_add_head(&call->error_link,
 		       &call->conn->params.peer->error_targets);
@@ -287,6 +294,8 @@ error_dup_user_ID:
 error:
 	__rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
 				    RX_CALL_DEAD, ret);
+	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
+			 here, ERR_PTR(ret));
 	rxrpc_release_call(rx, call);
 	rxrpc_put_call(call, rxrpc_call_put);
 	_leave(" = %d", ret);
@@ -396,15 +405,17 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
  */
 void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 {
+	const void *here = __builtin_return_address(0);
 	struct rxrpc_connection *conn = call->conn;
 	bool put = false;
 	int i;
 
 	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
 
-	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
+	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
+			 here, (const void *)call->flags);
 
-	rxrpc_see_call(call);
+	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
 	spin_lock_bh(&call->lock);
 	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
@@ -448,7 +459,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 		rxrpc_disconnect_call(call);
 
 	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
-		rxrpc_free_skb(call->rxtx_buffer[i]);
+		rxrpc_free_skb(call->rxtx_buffer[i],
+			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
+				rxrpc_skb_rx_cleaned));
 		call->rxtx_buffer[i] = NULL;
 	}
 
@@ -469,8 +482,6 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 				  struct rxrpc_call, accept_link);
 		list_del(&call->accept_link);
 		rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, ECONNRESET);
-		rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
-		rxrpc_release_call(rx, call);
 		rxrpc_put_call(call, rxrpc_call_put);
 	}
 
@@ -544,9 +555,11 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
 
 	/* Clean up the Rx/Tx buffer */
 	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++)
-		rxrpc_free_skb(call->rxtx_buffer[i]);
+		rxrpc_free_skb(call->rxtx_buffer[i],
+			       (call->tx_phase ? rxrpc_skb_tx_cleaned :
+				rxrpc_skb_rx_cleaned));
 
-	rxrpc_free_skb(call->tx_pending);
+	rxrpc_free_skb(call->tx_pending, rxrpc_skb_tx_cleaned);
 
 	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }

+ 57 - 25
net/rxrpc/conn_client.c

@@ -105,6 +105,14 @@ static void rxrpc_discard_expired_client_conns(struct work_struct *);
 static DECLARE_DELAYED_WORK(rxrpc_client_conn_reap,
 			    rxrpc_discard_expired_client_conns);
 
+const char rxrpc_conn_cache_states[RXRPC_CONN__NR_CACHE_STATES][5] = {
+	[RXRPC_CONN_CLIENT_INACTIVE]	= "Inac",
+	[RXRPC_CONN_CLIENT_WAITING]	= "Wait",
+	[RXRPC_CONN_CLIENT_ACTIVE]	= "Actv",
+	[RXRPC_CONN_CLIENT_CULLED]	= "Cull",
+	[RXRPC_CONN_CLIENT_IDLE]	= "Idle",
+};
+
 /*
  * Get a connection ID and epoch for a client connection from the global pool.
  * The connection struct pointer is then recorded in the idr radix tree.  The
@@ -220,6 +228,9 @@ rxrpc_alloc_client_connection(struct rxrpc_conn_parameters *cp, gfp_t gfp)
 	rxrpc_get_local(conn->params.local);
 	key_get(conn->params.key);
 
+	trace_rxrpc_conn(conn, rxrpc_conn_new_client, atomic_read(&conn->usage),
+			 __builtin_return_address(0));
+	trace_rxrpc_client(conn, -1, rxrpc_client_alloc);
 	_leave(" = %p", conn);
 	return conn;
 
@@ -385,6 +396,7 @@ static int rxrpc_get_client_conn(struct rxrpc_call *call,
 			rb_replace_node(&conn->client_node,
 					&candidate->client_node,
 					&local->client_conns);
+			trace_rxrpc_client(conn, -1, rxrpc_client_replace);
 			goto candidate_published;
 		}
 	}
@@ -409,8 +421,11 @@ found_extant_conn:
 	_debug("found conn");
 	spin_unlock(&local->client_conns_lock);
 
-	rxrpc_put_connection(candidate);
-	candidate = NULL;
+	if (candidate) {
+		trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate);
+		rxrpc_put_connection(candidate);
+		candidate = NULL;
+	}
 
 	spin_lock(&conn->channel_lock);
 	call->conn = conn;
@@ -433,6 +448,7 @@ error:
  */
 static void rxrpc_activate_conn(struct rxrpc_connection *conn)
 {
+	trace_rxrpc_client(conn, -1, rxrpc_client_to_active);
 	conn->cache_state = RXRPC_CONN_CLIENT_ACTIVE;
 	rxrpc_nr_active_client_conns++;
 	list_move_tail(&conn->cache_link, &rxrpc_active_client_conns);
@@ -462,8 +478,10 @@ static void rxrpc_animate_client_conn(struct rxrpc_connection *conn)
 	spin_lock(&rxrpc_client_conn_cache_lock);
 
 	nr_conns = rxrpc_nr_client_conns;
-	if (!test_and_set_bit(RXRPC_CONN_COUNTED, &conn->flags))
+	if (!test_and_set_bit(RXRPC_CONN_COUNTED, &conn->flags)) {
+		trace_rxrpc_client(conn, -1, rxrpc_client_count);
 		rxrpc_nr_client_conns = nr_conns + 1;
+	}
 
 	switch (conn->cache_state) {
 	case RXRPC_CONN_CLIENT_ACTIVE:
@@ -494,6 +512,7 @@ activate_conn:
 
 wait_for_capacity:
 	_debug("wait");
+	trace_rxrpc_client(conn, -1, rxrpc_client_to_waiting);
 	conn->cache_state = RXRPC_CONN_CLIENT_WAITING;
 	list_move_tail(&conn->cache_link, &rxrpc_waiting_client_conns);
 	goto out_unlock;
@@ -524,6 +543,8 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
 					     struct rxrpc_call, chan_wait_link);
 	u32 call_id = chan->call_counter + 1;
 
+	trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
+
 	write_lock_bh(&call->state_lock);
 	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
 	write_unlock_bh(&call->state_lock);
@@ -563,6 +584,8 @@ static void rxrpc_activate_channels(struct rxrpc_connection *conn)
 
 	_enter("%d", conn->debug_id);
 
+	trace_rxrpc_client(conn, -1, rxrpc_client_activate_chans);
+
 	if (conn->cache_state != RXRPC_CONN_CLIENT_ACTIVE ||
 	    conn->active_chans == RXRPC_ACTIVE_CHANS_MASK)
 		return;
@@ -657,10 +680,13 @@ int rxrpc_connect_call(struct rxrpc_call *call,
  * had a chance at re-use (the per-connection security negotiation is
  * expensive).
  */
-static void rxrpc_expose_client_conn(struct rxrpc_connection *conn)
+static void rxrpc_expose_client_conn(struct rxrpc_connection *conn,
+				     unsigned int channel)
 {
-	if (!test_and_set_bit(RXRPC_CONN_EXPOSED, &conn->flags))
+	if (!test_and_set_bit(RXRPC_CONN_EXPOSED, &conn->flags)) {
+		trace_rxrpc_client(conn, channel, rxrpc_client_exposed);
 		rxrpc_get_connection(conn);
+	}
 }
 
 /*
@@ -669,9 +695,9 @@ static void rxrpc_expose_client_conn(struct rxrpc_connection *conn)
  */
 void rxrpc_expose_client_call(struct rxrpc_call *call)
 {
+	unsigned int channel = call->cid & RXRPC_CHANNELMASK;
 	struct rxrpc_connection *conn = call->conn;
-	struct rxrpc_channel *chan =
-		&conn->channels[call->cid & RXRPC_CHANNELMASK];
+	struct rxrpc_channel *chan = &conn->channels[channel];
 
 	if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
 		/* Mark the call ID as being used.  If the callNumber counter
@@ -682,7 +708,7 @@ void rxrpc_expose_client_call(struct rxrpc_call *call)
 		chan->call_counter++;
 		if (chan->call_counter >= INT_MAX)
 			set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
-		rxrpc_expose_client_conn(conn);
+		rxrpc_expose_client_conn(conn, channel);
 	}
 }
 
@@ -695,6 +721,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
 	struct rxrpc_connection *conn = call->conn;
 	struct rxrpc_channel *chan = &conn->channels[channel];
 
+	trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect);
 	call->conn = NULL;
 
 	spin_lock(&conn->channel_lock);
@@ -709,6 +736,8 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
 		ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
 		list_del_init(&call->chan_wait_link);
 
+		trace_rxrpc_client(conn, channel, rxrpc_client_chan_unstarted);
+
 		/* We must deactivate or idle the connection if it's now
 		 * waiting for nothing.
 		 */
@@ -739,7 +768,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
 	/* See if we can pass the channel directly to another call. */
 	if (conn->cache_state == RXRPC_CONN_CLIENT_ACTIVE &&
 	    !list_empty(&conn->waiting_calls)) {
-		_debug("pass chan");
+		trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
 		rxrpc_activate_one_channel(conn, channel);
 		goto out_2;
 	}
@@ -762,7 +791,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *call)
 			goto out;
 		}
 
-		_debug("pass chan 2");
+		trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
 		rxrpc_activate_one_channel(conn, channel);
 		goto out;
 
@@ -794,7 +823,7 @@ idle_connection:
 	 * immediately or moved to the idle list for a short while.
 	 */
 	if (test_bit(RXRPC_CONN_EXPOSED, &conn->flags)) {
-		_debug("make idle");
+		trace_rxrpc_client(conn, channel, rxrpc_client_to_idle);
 		conn->idle_timestamp = jiffies;
 		conn->cache_state = RXRPC_CONN_CLIENT_IDLE;
 		list_move_tail(&conn->cache_link, &rxrpc_idle_client_conns);
@@ -804,7 +833,7 @@ idle_connection:
 					   &rxrpc_client_conn_reap,
 					   rxrpc_conn_idle_client_expiry);
 	} else {
-		_debug("make inactive");
+		trace_rxrpc_client(conn, channel, rxrpc_client_to_inactive);
 		conn->cache_state = RXRPC_CONN_CLIENT_INACTIVE;
 		list_del_init(&conn->cache_link);
 	}
@@ -821,6 +850,8 @@ rxrpc_put_one_client_conn(struct rxrpc_connection *conn)
 	struct rxrpc_local *local = conn->params.local;
 	unsigned int nr_conns;
 
+	trace_rxrpc_client(conn, -1, rxrpc_client_cleanup);
+
 	if (test_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags)) {
 		spin_lock(&local->client_conns_lock);
 		if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS,
@@ -834,6 +865,7 @@ rxrpc_put_one_client_conn(struct rxrpc_connection *conn)
 	ASSERTCMP(conn->cache_state, ==, RXRPC_CONN_CLIENT_INACTIVE);
 
 	if (test_bit(RXRPC_CONN_COUNTED, &conn->flags)) {
+		trace_rxrpc_client(conn, -1, rxrpc_client_uncount);
 		spin_lock(&rxrpc_client_conn_cache_lock);
 		nr_conns = --rxrpc_nr_client_conns;
 
@@ -863,20 +895,18 @@ rxrpc_put_one_client_conn(struct rxrpc_connection *conn)
  */
 void rxrpc_put_client_conn(struct rxrpc_connection *conn)
 {
-	struct rxrpc_connection *next;
+	const void *here = __builtin_return_address(0);
+	int n;
 
 	do {
-		_enter("%p{u=%d,d=%d}",
-		       conn, atomic_read(&conn->usage), conn->debug_id);
-
-		next = rxrpc_put_one_client_conn(conn);
-
-		if (!next)
-			break;
-		conn = next;
-	} while (atomic_dec_and_test(&conn->usage));
-
-	_leave("");
+		n = atomic_dec_return(&conn->usage);
+		trace_rxrpc_conn(conn, rxrpc_conn_put_client, n, here);
+		if (n > 0)
+			return;
+		ASSERTCMP(n, >=, 0);
+
+		conn = rxrpc_put_one_client_conn(conn);
+	} while (conn);
 }
 
 /*
@@ -907,9 +937,11 @@ static void rxrpc_cull_active_client_conns(void)
 		ASSERTCMP(conn->cache_state, ==, RXRPC_CONN_CLIENT_ACTIVE);
 
 		if (list_empty(&conn->waiting_calls)) {
+			trace_rxrpc_client(conn, -1, rxrpc_client_to_culled);
 			conn->cache_state = RXRPC_CONN_CLIENT_CULLED;
 			list_del_init(&conn->cache_link);
 		} else {
+			trace_rxrpc_client(conn, -1, rxrpc_client_to_waiting);
 			conn->cache_state = RXRPC_CONN_CLIENT_WAITING;
 			list_move_tail(&conn->cache_link,
 				       &rxrpc_waiting_client_conns);
@@ -983,7 +1015,7 @@ next:
 			goto not_yet_expired;
 	}
 
-	_debug("discard conn %d", conn->debug_id);
+	trace_rxrpc_client(conn, -1, rxrpc_client_discard);
 	if (!test_and_clear_bit(RXRPC_CONN_EXPOSED, &conn->flags))
 		BUG();
 	conn->cache_state = RXRPC_CONN_CLIENT_INACTIVE;

+ 7 - 4
net/rxrpc/conn_event.c

@@ -98,6 +98,9 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
 		pkt.info.rwind		= htonl(rxrpc_rx_window_size);
 		pkt.info.jumbo_max	= htonl(rxrpc_rx_jumbo_max);
 		len += sizeof(pkt.ack) + sizeof(pkt.info);
+
+		trace_rxrpc_tx_ack(NULL, chan->last_seq, 0,
+				   RXRPC_ACK_DUPLICATE, 0);
 		break;
 	}
 
@@ -377,7 +380,7 @@ void rxrpc_process_connection(struct work_struct *work)
 	u32 abort_code = RX_PROTOCOL_ERROR;
 	int ret;
 
-	_enter("{%d}", conn->debug_id);
+	rxrpc_see_connection(conn);
 
 	if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
 		rxrpc_secure_connection(conn);
@@ -385,7 +388,7 @@ void rxrpc_process_connection(struct work_struct *work)
 	/* go through the conn-level event packets, releasing the ref on this
 	 * connection that each one has when we've finished with it */
 	while ((skb = skb_dequeue(&conn->rx_queue))) {
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 		ret = rxrpc_process_event(conn, skb, &abort_code);
 		switch (ret) {
 		case -EPROTO:
@@ -396,7 +399,7 @@ void rxrpc_process_connection(struct work_struct *work)
 			goto requeue_and_leave;
 		case -ECONNABORTED:
 		default:
-			rxrpc_free_skb(skb);
+			rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 			break;
 		}
 	}
@@ -413,7 +416,7 @@ requeue_and_leave:
 protocol_error:
 	if (rxrpc_abort_connection(conn, -ret, abort_code) < 0)
 		goto requeue_and_leave;
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 	_leave(" [EPROTO]");
 	goto out;
 }

+ 69 - 3
net/rxrpc/conn_object.c

@@ -246,11 +246,77 @@ void rxrpc_kill_connection(struct rxrpc_connection *conn)
 }
 
 /*
- * release a virtual connection
+ * Queue a connection's work processor, getting a ref to pass to the work
+ * queue.
  */
-void __rxrpc_put_connection(struct rxrpc_connection *conn)
+bool rxrpc_queue_conn(struct rxrpc_connection *conn)
 {
-	rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
+	const void *here = __builtin_return_address(0);
+	int n = __atomic_add_unless(&conn->usage, 1, 0);
+	if (n == 0)
+		return false;
+	if (rxrpc_queue_work(&conn->processor))
+		trace_rxrpc_conn(conn, rxrpc_conn_queued, n + 1, here);
+	else
+		rxrpc_put_connection(conn);
+	return true;
+}
+
+/*
+ * Note the re-emergence of a connection.
+ */
+void rxrpc_see_connection(struct rxrpc_connection *conn)
+{
+	const void *here = __builtin_return_address(0);
+	if (conn) {
+		int n = atomic_read(&conn->usage);
+
+		trace_rxrpc_conn(conn, rxrpc_conn_seen, n, here);
+	}
+}
+
+/*
+ * Get a ref on a connection.
+ */
+void rxrpc_get_connection(struct rxrpc_connection *conn)
+{
+	const void *here = __builtin_return_address(0);
+	int n = atomic_inc_return(&conn->usage);
+
+	trace_rxrpc_conn(conn, rxrpc_conn_got, n, here);
+}
+
+/*
+ * Try to get a ref on a connection.
+ */
+struct rxrpc_connection *
+rxrpc_get_connection_maybe(struct rxrpc_connection *conn)
+{
+	const void *here = __builtin_return_address(0);
+
+	if (conn) {
+		int n = __atomic_add_unless(&conn->usage, 1, 0);
+		if (n > 0)
+			trace_rxrpc_conn(conn, rxrpc_conn_got, n + 1, here);
+		else
+			conn = NULL;
+	}
+	return conn;
+}
+
+/*
+ * Release a service connection
+ */
+void rxrpc_put_service_conn(struct rxrpc_connection *conn)
+{
+	const void *here = __builtin_return_address(0);
+	int n;
+
+	n = atomic_dec_return(&conn->usage);
+	trace_rxrpc_conn(conn, rxrpc_conn_put_service, n, here);
+	ASSERTCMP(n, >=, 0);
+	if (n == 0)
+		rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
 }
 
 /*

+ 4 - 0
net/rxrpc/conn_service.c

@@ -136,6 +136,10 @@ struct rxrpc_connection *rxrpc_prealloc_service_connection(gfp_t gfp)
 		list_add_tail(&conn->link, &rxrpc_connections);
 		list_add_tail(&conn->proc_link, &rxrpc_connection_proc_list);
 		write_unlock(&rxrpc_connection_lock);
+
+		trace_rxrpc_conn(conn, rxrpc_conn_new_service,
+				 atomic_read(&conn->usage),
+				 __builtin_return_address(0));
 	}
 
 	return conn;

+ 24 - 7
net/rxrpc/input.c

@@ -50,7 +50,7 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to)
 		call->tx_hard_ack++;
 		ix = call->tx_hard_ack & RXRPC_RXTX_BUFF_MASK;
 		skb = call->rxtx_buffer[ix];
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_tx_rotated);
 		call->rxtx_buffer[ix] = NULL;
 		call->rxtx_annotations[ix] = 0;
 		skb->next = list;
@@ -59,13 +59,14 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to)
 
 	spin_unlock(&call->lock);
 
+	trace_rxrpc_transmit(call, rxrpc_transmit_rotate);
 	wake_up(&call->waitq);
 
 	while (list) {
 		skb = list;
 		list = skb->next;
 		skb->next = NULL;
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 	}
 }
 
@@ -98,6 +99,7 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, const char *abort_why)
 	default:
 		break;
 	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
+		call->tx_phase = false;
 		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 		break;
 	case RXRPC_CALL_SERVER_AWAIT_ACK:
@@ -107,6 +109,7 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, const char *abort_why)
 	}
 
 	write_unlock(&call->state_lock);
+	trace_rxrpc_transmit(call, rxrpc_transmit_end);
 	_leave(" = ok");
 	return true;
 }
@@ -276,14 +279,18 @@ next_subpacket:
 	 * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
 	 * and also rxrpc_fill_out_ack().
 	 */
-	rxrpc_get_skb(skb);
+	rxrpc_get_skb(skb, rxrpc_skb_rx_got);
 	call->rxtx_annotations[ix] = annotation;
 	smp_wmb();
 	call->rxtx_buffer[ix] = skb;
 	if (after(seq, call->rx_top))
 		smp_store_release(&call->rx_top, seq);
-	if (flags & RXRPC_LAST_PACKET)
+	if (flags & RXRPC_LAST_PACKET) {
 		set_bit(RXRPC_CALL_RX_LAST, &call->flags);
+		trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+	} else {
+		trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
+	}
 	queued = true;
 
 	if (after_eq(seq, call->rx_expect_next)) {
@@ -438,6 +445,8 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
 	hard_ack = first_soft_ack - 1;
 	nr_acks = buf.ack.nAcks;
 
+	trace_rxrpc_rx_ack(call, first_soft_ack, buf.ack.reason, nr_acks);
+
 	_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 	       sp->hdr.serial,
 	       ntohs(buf.ack.maxSkew),
@@ -683,13 +692,13 @@ void rxrpc_data_ready(struct sock *udp_sk)
 		return;
 	}
 
-	rxrpc_new_skb(skb);
+	rxrpc_new_skb(skb, rxrpc_skb_rx_received);
 
 	_net("recv skb %p", skb);
 
 	/* we'll probably need to checksum it (didn't call sock_recvmsg) */
 	if (skb_checksum_complete(skb)) {
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 		__UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0);
 		_leave(" [CSUM failed]");
 		return;
@@ -703,6 +712,14 @@ void rxrpc_data_ready(struct sock *udp_sk)
 	skb_orphan(skb);
 	sp = rxrpc_skb(skb);
 
+	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
+		static int lose;
+		if ((lose++ & 7) == 7) {
+			rxrpc_lose_skb(skb, rxrpc_skb_rx_lost);
+			return;
+		}
+	}
+
 	_net("Rx UDP packet from %08x:%04hu",
 	     ntohl(ip_hdr(skb)->saddr), ntohs(udp_hdr(skb)->source));
 
@@ -813,7 +830,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
 discard_unlock:
 	rcu_read_unlock();
 discard:
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 out:
 	trace_rxrpc_rx_done(0, 0);
 	return;

+ 2 - 2
net/rxrpc/local_event.c

@@ -90,7 +90,7 @@ void rxrpc_process_local_events(struct rxrpc_local *local)
 	if (skb) {
 		struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 		_debug("{%d},{%u}", local->debug_id, sp->hdr.type);
 
 		switch (sp->hdr.type) {
@@ -107,7 +107,7 @@ void rxrpc_process_local_events(struct rxrpc_local *local)
 			break;
 		}
 
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 	}
 
 	_leave("");

+ 81 - 0
net/rxrpc/misc.c

@@ -101,3 +101,84 @@ const char *rxrpc_acks(u8 reason)
 		reason = ARRAY_SIZE(str) - 1;
 	return str[reason];
 }
+
+const char rxrpc_skb_traces[rxrpc_skb__nr_trace][7] = {
+	[rxrpc_skb_rx_cleaned]		= "Rx CLN",
+	[rxrpc_skb_rx_freed]		= "Rx FRE",
+	[rxrpc_skb_rx_got]		= "Rx GOT",
+	[rxrpc_skb_rx_lost]		= "Rx *L*",
+	[rxrpc_skb_rx_received]		= "Rx RCV",
+	[rxrpc_skb_rx_purged]		= "Rx PUR",
+	[rxrpc_skb_rx_rotated]		= "Rx ROT",
+	[rxrpc_skb_rx_seen]		= "Rx SEE",
+	[rxrpc_skb_tx_cleaned]		= "Tx CLN",
+	[rxrpc_skb_tx_freed]		= "Tx FRE",
+	[rxrpc_skb_tx_got]		= "Tx GOT",
+	[rxrpc_skb_tx_lost]		= "Tx *L*",
+	[rxrpc_skb_tx_new]		= "Tx NEW",
+	[rxrpc_skb_tx_rotated]		= "Tx ROT",
+	[rxrpc_skb_tx_seen]		= "Tx SEE",
+};
+
+const char rxrpc_conn_traces[rxrpc_conn__nr_trace][4] = {
+	[rxrpc_conn_new_client]		= "NWc",
+	[rxrpc_conn_new_service]	= "NWs",
+	[rxrpc_conn_queued]		= "QUE",
+	[rxrpc_conn_seen]		= "SEE",
+	[rxrpc_conn_got]		= "GOT",
+	[rxrpc_conn_put_client]		= "PTc",
+	[rxrpc_conn_put_service]	= "PTs",
+};
+
+const char rxrpc_client_traces[rxrpc_client__nr_trace][7] = {
+	[rxrpc_client_activate_chans]	= "Activa",
+	[rxrpc_client_alloc]		= "Alloc ",
+	[rxrpc_client_chan_activate]	= "ChActv",
+	[rxrpc_client_chan_disconnect]	= "ChDisc",
+	[rxrpc_client_chan_pass]	= "ChPass",
+	[rxrpc_client_chan_unstarted]	= "ChUnst",
+	[rxrpc_client_cleanup]		= "Clean ",
+	[rxrpc_client_count]		= "Count ",
+	[rxrpc_client_discard]		= "Discar",
+	[rxrpc_client_duplicate]	= "Duplic",
+	[rxrpc_client_exposed]		= "Expose",
+	[rxrpc_client_replace]		= "Replac",
+	[rxrpc_client_to_active]	= "->Actv",
+	[rxrpc_client_to_culled]	= "->Cull",
+	[rxrpc_client_to_idle]		= "->Idle",
+	[rxrpc_client_to_inactive]	= "->Inac",
+	[rxrpc_client_to_waiting]	= "->Wait",
+	[rxrpc_client_uncount]		= "Uncoun",
+};
+
+const char rxrpc_transmit_traces[rxrpc_transmit__nr_trace][4] = {
+	[rxrpc_transmit_wait]		= "WAI",
+	[rxrpc_transmit_queue]		= "QUE",
+	[rxrpc_transmit_queue_reqack]	= "QRA",
+	[rxrpc_transmit_queue_last]	= "QLS",
+	[rxrpc_transmit_rotate]		= "ROT",
+	[rxrpc_transmit_end]		= "END",
+};
+
+const char rxrpc_receive_traces[rxrpc_receive__nr_trace][4] = {
+	[rxrpc_receive_incoming]	= "INC",
+	[rxrpc_receive_queue]		= "QUE",
+	[rxrpc_receive_queue_last]	= "QLS",
+	[rxrpc_receive_front]		= "FRN",
+	[rxrpc_receive_rotate]		= "ROT",
+	[rxrpc_receive_end]		= "END",
+};
+
+const char rxrpc_recvmsg_traces[rxrpc_recvmsg__nr_trace][5] = {
+	[rxrpc_recvmsg_enter]		= "ENTR",
+	[rxrpc_recvmsg_wait]		= "WAIT",
+	[rxrpc_recvmsg_dequeue]		= "DEQU",
+	[rxrpc_recvmsg_hole]		= "HOLE",
+	[rxrpc_recvmsg_next]		= "NEXT",
+	[rxrpc_recvmsg_cont]		= "CONT",
+	[rxrpc_recvmsg_full]		= "FULL",
+	[rxrpc_recvmsg_data_return]	= "DATA",
+	[rxrpc_recvmsg_terminal]	= "TERM",
+	[rxrpc_recvmsg_to_be_accepted]	= "TBAC",
+	[rxrpc_recvmsg_return]		= "RETN",
+};

+ 17 - 3
net/rxrpc/output.c

@@ -38,12 +38,14 @@ struct rxrpc_pkt_buffer {
 static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
 				 struct rxrpc_pkt_buffer *pkt)
 {
+	rxrpc_serial_t serial;
 	rxrpc_seq_t hard_ack, top, seq;
 	int ix;
 	u32 mtu, jmax;
 	u8 *ackp = pkt->acks;
 
 	/* Barrier against rxrpc_input_data(). */
+	serial = call->ackr_serial;
 	hard_ack = READ_ONCE(call->rx_hard_ack);
 	top = smp_load_acquire(&call->rx_top);
 
@@ -51,7 +53,7 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
 	pkt->ack.maxSkew	= htons(call->ackr_skew);
 	pkt->ack.firstPacket	= htonl(hard_ack + 1);
 	pkt->ack.previousPacket	= htonl(call->ackr_prev_seq);
-	pkt->ack.serial		= htonl(call->ackr_serial);
+	pkt->ack.serial		= htonl(serial);
 	pkt->ack.reason		= call->ackr_reason;
 	pkt->ack.nAcks		= top - hard_ack;
 
@@ -75,6 +77,9 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_call *call,
 	pkt->ackinfo.rwind	= htonl(call->rx_winsize);
 	pkt->ackinfo.jumbo_max	= htonl(jmax);
 
+	trace_rxrpc_tx_ack(call, hard_ack + 1, serial, call->ackr_reason,
+			   top - hard_ack);
+
 	*ackp++ = 0;
 	*ackp++ = 0;
 	*ackp++ = 0;
@@ -220,6 +225,15 @@ int rxrpc_send_data_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
 	msg.msg_controllen = 0;
 	msg.msg_flags = 0;
 
+	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
+		static int lose;
+		if ((lose++ & 7) == 7) {
+			rxrpc_lose_skb(skb, rxrpc_skb_tx_lost);
+			_leave(" = 0 [lose]");
+			return 0;
+		}
+	}
+
 	/* send the packet with the don't fragment bit set if we currently
 	 * think it's small enough */
 	if (skb->len - sizeof(struct rxrpc_wire_header) < conn->params.peer->maxdata) {
@@ -319,7 +333,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
 	whdr.type = RXRPC_PACKET_TYPE_ABORT;
 
 	while ((skb = skb_dequeue(&local->reject_queue))) {
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 		sp = rxrpc_skb(skb);
 
 		if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
@@ -338,7 +352,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
 			kernel_sendmsg(local->socket, &msg, iov, 2, size);
 		}
 
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 	}
 
 	_leave("");

+ 5 - 5
net/rxrpc/peer_event.c

@@ -155,11 +155,11 @@ void rxrpc_error_report(struct sock *sk)
 		_leave("UDP socket errqueue empty");
 		return;
 	}
-	rxrpc_new_skb(skb);
+	rxrpc_new_skb(skb, rxrpc_skb_rx_received);
 	serr = SKB_EXT_ERR(skb);
 	if (!skb->len && serr->ee.ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
 		_leave("UDP empty message");
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 		return;
 	}
 
@@ -169,7 +169,7 @@ void rxrpc_error_report(struct sock *sk)
 		peer = NULL;
 	if (!peer) {
 		rcu_read_unlock();
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 		_leave(" [no peer]");
 		return;
 	}
@@ -179,7 +179,7 @@ void rxrpc_error_report(struct sock *sk)
 	     serr->ee.ee_code == ICMP_FRAG_NEEDED)) {
 		rxrpc_adjust_mtu(peer, serr);
 		rcu_read_unlock();
-		rxrpc_free_skb(skb);
+		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 		rxrpc_put_peer(peer);
 		_leave(" [MTU update]");
 		return;
@@ -187,7 +187,7 @@ void rxrpc_error_report(struct sock *sk)
 
 	rxrpc_store_error(peer, serr);
 	rcu_read_unlock();
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 
 	/* The ref we obtained is passed off to the work item */
 	rxrpc_queue_work(&peer->error_distributor);

+ 41 - 19
net/rxrpc/recvmsg.c

@@ -94,6 +94,8 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
 		break;
 	}
 
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
+			    call->rx_pkt_offset, call->rx_pkt_len, ret);
 	return ret;
 }
 
@@ -124,6 +126,7 @@ static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 		write_unlock(&rx->call_lock);
 	}
 
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 	return ret;
 }
 
@@ -134,6 +137,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call)
 {
 	_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 
+	trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 	ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 
 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
@@ -151,6 +155,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call)
 		break;
 
 	case RXRPC_CALL_SERVER_RECV_REQUEST:
+		call->tx_phase = true;
 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 		break;
 	default:
@@ -167,6 +172,7 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 {
 	struct rxrpc_skb_priv *sp;
 	struct sk_buff *skb;
+	rxrpc_serial_t serial;
 	rxrpc_seq_t hard_ack, top;
 	u8 flags;
 	int ix;
@@ -180,17 +186,22 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 	hard_ack++;
 	ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 	skb = call->rxtx_buffer[ix];
-	rxrpc_see_skb(skb);
+	rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 	sp = rxrpc_skb(skb);
 	flags = sp->hdr.flags;
+	serial = sp->hdr.serial;
+	if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
+		serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
+
 	call->rxtx_buffer[ix] = NULL;
 	call->rxtx_annotations[ix] = 0;
 	/* Barrier against rxrpc_input_data(). */
 	smp_store_release(&call->rx_hard_ack, hard_ack);
 
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 
 	_debug("%u,%u,%02x", hard_ack, top, flags);
+	trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 	if (flags & RXRPC_LAST_PACKET)
 		rxrpc_end_rx_phase(call);
 }
@@ -286,8 +297,6 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 	unsigned int rx_pkt_offset, rx_pkt_len;
 	int ix, copy, ret = -EAGAIN, ret2;
 
-	_enter("");
-
 	rx_pkt_offset = call->rx_pkt_offset;
 	rx_pkt_len = call->rx_pkt_len;
 
@@ -303,12 +312,19 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 	for (seq = hard_ack + 1; before_eq(seq, top); seq++) {
 		ix = seq & RXRPC_RXTX_BUFF_MASK;
 		skb = call->rxtx_buffer[ix];
-		if (!skb)
+		if (!skb) {
+			trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
+					    rx_pkt_offset, rx_pkt_len, 0);
 			break;
+		}
 		smp_rmb();
-		rxrpc_see_skb(skb);
+		rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 		sp = rxrpc_skb(skb);
 
+		if (!(flags & MSG_PEEK))
+			trace_rxrpc_receive(call, rxrpc_receive_front,
+					    sp->hdr.serial, seq);
+
 		if (msg)
 			sock_recv_timestamp(msg, sock->sk, skb);
 
@@ -316,13 +332,16 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 			ret2 = rxrpc_locate_data(call, skb,
 						 &call->rxtx_annotations[ix],
 						 &rx_pkt_offset, &rx_pkt_len);
+			trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
+					    rx_pkt_offset, rx_pkt_len, ret2);
 			if (ret2 < 0) {
 				ret = ret2;
 				goto out;
 			}
+		} else {
+			trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
+					    rx_pkt_offset, rx_pkt_len, 0);
 		}
-		_debug("recvmsg %x DATA #%u { %d, %d }",
-		       sp->hdr.callNumber, seq, rx_pkt_offset, rx_pkt_len);
 
 		/* We have to handle short, empty and used-up DATA packets. */
 		remain = len - *_offset;
@@ -338,15 +357,14 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 			}
 
 			/* handle piecemeal consumption of data packets */
-			_debug("copied %d @%zu", copy, *_offset);
-
 			rx_pkt_offset += copy;
 			rx_pkt_len -= copy;
 			*_offset += copy;
 		}
 
 		if (rx_pkt_len > 0) {
-			_debug("buffer full");
+			trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
+					    rx_pkt_offset, rx_pkt_len, 0);
 			ASSERTCMP(*_offset, ==, len);
 			ret = 0;
 			break;
@@ -372,7 +390,8 @@ out:
 		call->rx_pkt_len = rx_pkt_len;
 	}
 done:
-	_leave(" = %d [%u/%u]", ret, seq, top);
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
+			    rx_pkt_offset, rx_pkt_len, ret);
 	return ret;
 }
 
@@ -393,7 +412,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 
 	DEFINE_WAIT(wait);
 
-	_enter(",,,%zu,%d", len, flags);
+	trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 
 	if (flags & (MSG_OOB | MSG_TRUNC))
 		return -EOPNOTSUPP;
@@ -413,8 +432,10 @@ try_again:
 
 	if (list_empty(&rx->recvmsg_q)) {
 		ret = -EWOULDBLOCK;
-		if (timeo == 0)
+		if (timeo == 0) {
+			call = NULL;
 			goto error_no_call;
+		}
 
 		release_sock(&rx->sk);
 
@@ -428,6 +449,8 @@ try_again:
 		if (list_empty(&rx->recvmsg_q)) {
 			if (signal_pending(current))
 				goto wait_interrupted;
+			trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
+					    0, 0, 0, 0);
 			timeo = schedule_timeout(timeo);
 		}
 		finish_wait(sk_sleep(&rx->sk), &wait);
@@ -446,7 +469,7 @@ try_again:
 		rxrpc_get_call(call, rxrpc_call_got);
 	write_unlock_bh(&rx->recvmsg_lock);
 
-	_debug("recvmsg call %p", call);
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 
 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 		BUG();
@@ -516,16 +539,15 @@ error:
 	rxrpc_put_call(call, rxrpc_call_put);
 error_no_call:
 	release_sock(&rx->sk);
-	_leave(" = %d", ret);
+	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 	return ret;
 
 wait_interrupted:
 	ret = sock_intr_errno(timeo);
 wait_error:
 	finish_wait(sk_sleep(&rx->sk), &wait);
-	release_sock(&rx->sk);
-	_leave(" = %d [wait]", ret);
-	return ret;
+	call = NULL;
+	goto error_no_call;
 }
 
 /**

+ 13 - 6
net/rxrpc/sendmsg.c

@@ -56,6 +56,7 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
 			break;
 		}
 
+		trace_rxrpc_transmit(call, rxrpc_transmit_wait);
 		release_sock(&rx->sk);
 		*timeo = schedule_timeout(*timeo);
 		lock_sock(&rx->sk);
@@ -99,13 +100,19 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 	ASSERTCMP(seq, ==, call->tx_top + 1);
 
 	ix = seq & RXRPC_RXTX_BUFF_MASK;
-	rxrpc_get_skb(skb);
+	rxrpc_get_skb(skb, rxrpc_skb_tx_got);
 	call->rxtx_annotations[ix] = RXRPC_TX_ANNO_UNACK;
 	smp_wmb();
 	call->rxtx_buffer[ix] = skb;
 	call->tx_top = seq;
-	if (last)
+	if (last) {
 		set_bit(RXRPC_CALL_TX_LAST, &call->flags);
+		trace_rxrpc_transmit(call, rxrpc_transmit_queue_last);
+	} else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
+		trace_rxrpc_transmit(call, rxrpc_transmit_queue_reqack);
+	} else {
+		trace_rxrpc_transmit(call, rxrpc_transmit_queue);
+	}
 
 	if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
 		_debug("________awaiting reply/ACK__________");
@@ -139,7 +146,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 		rxrpc_instant_resend(call, ix);
 	}
 
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 	_leave("");
 }
 
@@ -194,7 +201,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 
 	skb = call->tx_pending;
 	call->tx_pending = NULL;
-	rxrpc_see_skb(skb);
+	rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
 
 	copied = 0;
 	do {
@@ -235,7 +242,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 			if (!skb)
 				goto maybe_error;
 
-			rxrpc_new_skb(skb);
+			rxrpc_new_skb(skb, rxrpc_skb_tx_new);
 
 			_debug("ALLOC SEND %p", skb);
 
@@ -345,7 +352,7 @@ out:
 	return ret;
 
 call_terminated:
-	rxrpc_free_skb(skb);
+	rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 	_leave(" = %d", -call->error);
 	return -call->error;
 

+ 38 - 15
net/rxrpc/skbuff.c

@@ -18,54 +18,76 @@
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
+#define select_skb_count(op) (op >= rxrpc_skb_tx_cleaned ? &rxrpc_n_tx_skbs : &rxrpc_n_rx_skbs)
+
 /*
- * Note the existence of a new-to-us socket buffer (allocated or dequeued).
+ * Note the allocation or reception of a socket buffer.
  */
-void rxrpc_new_skb(struct sk_buff *skb)
+void rxrpc_new_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
 {
 	const void *here = __builtin_return_address(0);
-	int n = atomic_inc_return(&rxrpc_n_skbs);
-	trace_rxrpc_skb(skb, 0, atomic_read(&skb->users), n, here);
+	int n = atomic_inc_return(select_skb_count(op));
+	trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
 }
 
 /*
  * Note the re-emergence of a socket buffer from a queue or buffer.
  */
-void rxrpc_see_skb(struct sk_buff *skb)
+void rxrpc_see_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
 {
 	const void *here = __builtin_return_address(0);
 	if (skb) {
-		int n = atomic_read(&rxrpc_n_skbs);
-		trace_rxrpc_skb(skb, 1, atomic_read(&skb->users), n, here);
+		int n = atomic_read(select_skb_count(op));
+		trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
 	}
 }
 
 /*
  * Note the addition of a ref on a socket buffer.
  */
-void rxrpc_get_skb(struct sk_buff *skb)
+void rxrpc_get_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
 {
 	const void *here = __builtin_return_address(0);
-	int n = atomic_inc_return(&rxrpc_n_skbs);
-	trace_rxrpc_skb(skb, 2, atomic_read(&skb->users), n, here);
+	int n = atomic_inc_return(select_skb_count(op));
+	trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
 	skb_get(skb);
 }
 
 /*
  * Note the destruction of a socket buffer.
  */
-void rxrpc_free_skb(struct sk_buff *skb)
+void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
 {
 	const void *here = __builtin_return_address(0);
 	if (skb) {
 		int n;
 		CHECK_SLAB_OKAY(&skb->users);
-		n = atomic_dec_return(&rxrpc_n_skbs);
-		trace_rxrpc_skb(skb, 3, atomic_read(&skb->users), n, here);
+		n = atomic_dec_return(select_skb_count(op));
+		trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
 		kfree_skb(skb);
 	}
 }
 
+/*
+ * Note the injected loss of a socket buffer.
+ */
+void rxrpc_lose_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
+{
+	const void *here = __builtin_return_address(0);
+	if (skb) {
+		int n;
+		CHECK_SLAB_OKAY(&skb->users);
+		if (op == rxrpc_skb_tx_lost) {
+			n = atomic_read(select_skb_count(op));
+			trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
+		} else {
+			n = atomic_dec_return(select_skb_count(op));
+			trace_rxrpc_skb(skb, op, atomic_read(&skb->users), n, here);
+			kfree_skb(skb);
+		}
+	}
+}
+
 /*
  * Clear a queue of socket buffers.
  */
@@ -74,8 +96,9 @@ void rxrpc_purge_queue(struct sk_buff_head *list)
 	const void *here = __builtin_return_address(0);
 	struct sk_buff *skb;
 	while ((skb = skb_dequeue((list))) != NULL) {
-		int n = atomic_dec_return(&rxrpc_n_skbs);
-		trace_rxrpc_skb(skb, 4, atomic_read(&skb->users), n, here);
+		int n = atomic_dec_return(select_skb_count(rxrpc_skb_rx_purged));
+		trace_rxrpc_skb(skb, rxrpc_skb_rx_purged,
+				atomic_read(&skb->users), n, here);
 		kfree_skb(skb);
 	}
 }