diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h
index da9e8c3a59..8c5508c40e 100644
--- a/lib/isc/netmgr/netmgr-int.h
+++ b/lib/isc/netmgr/netmgr-int.h
@@ -62,9 +62,10 @@
 #endif
 
 /*
- * The TCP receive buffer can fit one maximum sized DNS message plus its size,
- * the receive buffer here affects TCP, DoT and DoH.
+ * The TCP send and receive buffers can fit one maximum sized DNS message plus
+ * its size, the receive buffer here affects TCP, DoT and DoH.
  */
+#define ISC_NETMGR_TCP_SENDBUF_SIZE (sizeof(uint16_t) + UINT16_MAX)
 #define ISC_NETMGR_TCP_RECVBUF_SIZE (sizeof(uint16_t) + UINT16_MAX)
 
 /* Pick the larger buffer */
@@ -377,9 +378,10 @@ struct isc__nm_uvreq {
 	int magic;
 	isc_nmsocket_t *sock;
 	isc_nmhandle_t *handle;
-	char tcplen[2];	       /* The TCP DNS message length */
-	uv_buf_t uvbuf;	       /* translated isc_region_t, to be
-				* sent or received */
+	char tcplen[2]; /* The TCP DNS message length */
+	uv_buf_t uvbuf; /* translated isc_region_t, to be
+			 * sent or received */
+	isc_region_t userbuf;
 	isc_sockaddr_t local;  /* local address */
 	isc_sockaddr_t peer;   /* peer address */
 	isc__nm_cb_t cb;       /* callback */
@@ -999,7 +1001,6 @@ struct isc_nmsocket {
 			TLS_STATE_ERROR,
 			TLS_STATE_CLOSING
 		} state;
-		isc_region_t senddata;
 		ISC_LIST(isc__nm_uvreq_t) sendreqs;
 		bool cycle;
 		isc_result_t pending_error;
@@ -1064,6 +1065,12 @@ struct isc_nmsocket {
 	 */
 	uint64_t write_timeout;
 
+	/*
+	 * Reading was throttled over TCP as the peer does not read the
+	 * data we are sending back.
+	 */
+	bool reading_throttled;
+
 	/*% outer socket is for 'wrapped' sockets - e.g. tcpdns in tcp */
 	isc_nmsocket_t *outer;
 
@@ -2266,6 +2273,14 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer);
 void
 isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult);
 
+/*%<
+ *
+ * Maximum number of simultaneous handles in flight supported for a single
+ * connected TCPDNS socket. This value was chosen arbitrarily, and may be
+ * changed in the future.
+ */
+#define STREAM_CLIENTS_PER_CONN 23
+
 #define UV_RUNTIME_CHECK(func, ret)                                      \
 	if (ret != 0) {                                                  \
 		FATAL_ERROR("%s failed: %s\n", #func, uv_strerror(ret)); \
diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c
index 44ef28e360..9bdd84fe00 100644
--- a/lib/isc/netmgr/netmgr.c
+++ b/lib/isc/netmgr/netmgr.c
@@ -49,8 +49,15 @@
  * How many isc_nmhandles and isc_nm_uvreqs will we be
  * caching for reuse in a socket.
  */
-#define ISC_NM_HANDLES_STACK_SIZE 600
-#define ISC_NM_REQS_STACK_SIZE	  600
+#define ISC_NM_HANDLES_STACK_SIZE 16
+#define ISC_NM_REQS_STACK_SIZE	  16
+
+/*%
+ * Same, but for UDP sockets which tend to need larger values as they
+ * process many requests per socket.
+ */
+#define ISC_NM_HANDLES_STACK_SIZE_UDP 64
+#define ISC_NM_REQS_STACK_SIZE_UDP    64
 
 /*%
  * Shortcut index arrays to get access to statistics counters.
@@ -1508,16 +1515,25 @@ void
 isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
 		    isc_sockaddr_t *iface FLARG) {
 	uint16_t family;
+	size_t inactive_handles_stack_size = ISC_NM_HANDLES_STACK_SIZE;
+	size_t inactive_reqs_stack_size = ISC_NM_REQS_STACK_SIZE;
 
 	REQUIRE(sock != NULL);
 	REQUIRE(mgr != NULL);
 
-	*sock = (isc_nmsocket_t){ .type = type,
-				  .fd = -1,
-				  .inactivehandles = isc_astack_new(
-					  mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),
-				  .inactivereqs = isc_astack_new(
-					  mgr->mctx, ISC_NM_REQS_STACK_SIZE) };
+	if (type == isc_nm_udpsocket) {
+		inactive_handles_stack_size = ISC_NM_HANDLES_STACK_SIZE_UDP;
+		inactive_reqs_stack_size = ISC_NM_REQS_STACK_SIZE_UDP;
+	}
+
+	*sock = (isc_nmsocket_t){
+		.type = type,
+		.fd = -1,
+		.inactivehandles = isc_astack_new(mgr->mctx,
+						  inactive_handles_stack_size),
+		.inactivereqs = isc_astack_new(mgr->mctx,
+					       inactive_reqs_stack_size)
+	};
 
 	ISC_LIST_INIT(sock->tls.sendreqs);
 
@@ -2086,6 +2102,7 @@ isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) {
 
 	sock = req->sock;
 
+	isc__nm_start_reading(sock);
 	isc__nmsocket_reset(sock);
 }
 
@@ -2095,7 +2112,6 @@ isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
 
 	REQUIRE(VALID_NMSOCK(sock));
 	REQUIRE(sock->tid == isc_nm_tid());
-	REQUIRE(atomic_load(&sock->reading));
 
 	if (atomic_load(&sock->client)) {
 		uv_timer_stop(timer);
@@ -2342,8 +2358,10 @@ processbuffer(isc_nmsocket_t *sock) {
  * timers. If we do have a full message, reset the timer.
  *
  * Stop reading if this is a client socket, or if the server socket
- * has been set to sequential mode. In this case we'll be called again
- * later by isc__nm_resume_processing().
+ * has been set to sequential mode, or the number of queries we are
+ * processing simultaneously has reached the clients-per-connection
+ * limit. In this case we'll be called again later by
+ * isc__nm_resume_processing().
  */
 isc_result_t
 isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
@@ -2351,14 +2369,41 @@ isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
 		int_fast32_t ah = atomic_load(&sock->ah);
 		isc_result_t result = processbuffer(sock);
 		switch (result) {
-		case ISC_R_NOMORE:
+		case ISC_R_NOMORE: {
 			/*
 			 * Don't reset the timer until we have a
 			 * full DNS message.
 			 */
-			result = isc__nm_start_reading(sock);
-			if (result != ISC_R_SUCCESS) {
-				return (result);
+
+			/*
+			 * Restart reading if we have less data in the send
+			 * queue than the send buffer size, this means that the
+			 * TCP client has started reading some data again.
+			 * Starting reading when we go under the limit instead
+			 * of waiting for all data has been flushed allows
+			 * faster recovery (in case there was a congestion and
+			 * now there isn't).
+			 */
+			size_t write_queue_size =
+				uv_stream_get_write_queue_size(
+					&sock->uv_handle.stream);
+			if (write_queue_size < ISC_NETMGR_TCP_SENDBUF_SIZE) {
+				if (sock->reading_throttled) {
+					isc_log_write(isc_lctx,
+						      ISC_LOGCATEGORY_GENERAL,
+						      ISC_LOGMODULE_NETMGR,
+						      ISC_LOG_DEBUG(3),
+						      "resuming TCP "
+						      "connection, the other "
+						      "side is reading the "
+						      "data again (%zu)",
+						      write_queue_size);
+					sock->reading_throttled = false;
+				}
+				result = isc__nm_start_reading(sock);
+				if (result != ISC_R_SUCCESS) {
+					return (result);
+				}
 			}
 			/*
 			 * Start the timer only if there are no externally used
@@ -2370,6 +2415,7 @@ isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
 				isc__nmsocket_timer_start(sock);
 			}
 			goto done;
+		}
 		case ISC_R_CANCELED:
 			isc__nmsocket_timer_stop(sock);
 			isc__nm_stop_reading(sock);
@@ -2383,7 +2429,8 @@ isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
 			isc__nmsocket_timer_stop(sock);
 
 			if (atomic_load(&sock->client) ||
-			    atomic_load(&sock->sequential))
+			    atomic_load(&sock->sequential) ||
+			    atomic_load(&sock->ah) >= STREAM_CLIENTS_PER_CONN)
 			{
 				isc__nm_stop_reading(sock);
 				goto done;
diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c
index 16b53cc579..37d44bd9c8 100644
--- a/lib/isc/netmgr/tcp.c
+++ b/lib/isc/netmgr/tcp.c
@@ -766,7 +766,7 @@ isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
 	isc__netievent_tcpstartread_t *ievent =
 		(isc__netievent_tcpstartread_t *)ev0;
 	isc_nmsocket_t *sock = ievent->sock;
-	isc_result_t result;
+	isc_result_t result = ISC_R_SUCCESS;
 
 	REQUIRE(VALID_NMSOCK(sock));
 	REQUIRE(sock->tid == isc_nm_tid());
@@ -774,7 +774,7 @@ isc__nm_async_tcpstartread(isc__networker_t *worker, isc__netievent_t *ev0) {
 
 	if (isc__nmsocket_closing(sock)) {
 		result = ISC_R_CANCELED;
-	} else {
+	} else if (!sock->reading_throttled) {
 		result = isc__nm_start_reading(sock);
 	}
 
@@ -905,6 +905,32 @@ isc__nm_tcp_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
 
 	/* The readcb could have paused the reading */
 	if (atomic_load(&sock->reading)) {
+		if (!sock->client) {
+			/*
+			 * Stop reading if we have accumulated enough bytes in
+			 * the send queue; this means that the TCP client is not
+			 * reading back the data we sending to it, and there's
+			 * no reason to continue processing more incoming DNS
+			 * messages, if the client is not reading back the
+			 * responses.
+			 */
+			size_t write_queue_size =
+				uv_stream_get_write_queue_size(
+					&sock->uv_handle.stream);
+
+			if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) {
+				isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+					      ISC_LOGMODULE_NETMGR,
+					      ISC_LOG_DEBUG(3),
+					      "throttling TCP connection, "
+					      "the other side is "
+					      "not reading the data (%zu)",
+					      write_queue_size);
+				sock->reading_throttled = true;
+				isc__nm_stop_reading(sock);
+			}
+		}
+
 		/* The timer will be updated */
 		isc__nmsocket_timer_restart(sock);
 	}
@@ -1095,6 +1121,34 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, const isc_region_t *region,
 	return;
 }
 
+static void
+tcp_maybe_restart_reading(isc_nmsocket_t *sock) {
+	if (!sock->client && sock->reading_throttled &&
+	    !uv_is_active(&sock->uv_handle.handle))
+	{
+		/*
+		 * Restart reading if we have less data in the send queue than
+		 * the send buffer size, this means that the TCP client has
+		 * started reading some data again.  Starting reading when we go
+		 * under the limit instead of waiting for all data has been
+		 * flushed allows faster recovery (in case there was a
+		 * congestion and now there isn't).
+		 */
+		size_t write_queue_size =
+			uv_stream_get_write_queue_size(&sock->uv_handle.stream);
+		if (write_queue_size < ISC_NETMGR_TCP_SENDBUF_SIZE) {
+			isc_log_write(
+				isc_lctx, ISC_LOGCATEGORY_GENERAL,
+				ISC_LOGMODULE_NETMGR, ISC_LOG_DEBUG(3),
+				"resuming TCP connection, the other side  "
+				"is reading the data again (%zu)",
+				write_queue_size);
+			sock->reading_throttled = false;
+			isc__nm_start_reading(sock);
+		}
+	}
+}
+
 static void
 tcp_send_cb(uv_write_t *req, int status) {
 	isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
@@ -1112,10 +1166,23 @@ tcp_send_cb(uv_write_t *req, int status) {
 		isc__nm_incstats(sock, STATID_SENDFAIL);
 		isc__nm_failed_send_cb(sock, uvreq,
 				       isc__nm_uverr2result(status));
+
+		if (!sock->client &&
+		    (atomic_load(&sock->reading) || sock->reading_throttled))
+		{
+			/*
+			 * As we are resuming reading, it is not throttled
+			 * anymore (technically).
+			 */
+			sock->reading_throttled = false;
+			isc__nm_start_reading(sock);
+			isc__nmsocket_reset(sock);
+		}
 		return;
 	}
 
 	isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
+	tcp_maybe_restart_reading(sock);
 }
 
 /*
diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c
index 1864a45aba..7ef7cb1ed6 100644
--- a/lib/isc/netmgr/tcpdns.c
+++ b/lib/isc/netmgr/tcpdns.c
@@ -734,7 +734,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
 	isc__netievent_tcpdnsread_t *ievent =
 		(isc__netievent_tcpdnsread_t *)ev0;
 	isc_nmsocket_t *sock = ievent->sock;
-	isc_result_t result;
+	isc_result_t result = ISC_R_SUCCESS;
 
 	UNUSED(worker);
 
@@ -743,7 +743,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) {
 
 	if (isc__nmsocket_closing(sock)) {
 		result = ISC_R_CANCELED;
-	} else {
+	} else if (!sock->reading_throttled) {
 		result = isc__nm_process_sock_buffer(sock);
 	}
 
@@ -913,6 +913,28 @@ isc__nm_tcpdns_read_cb(uv_stream_t *stream, ssize_t nread,
 	result = isc__nm_process_sock_buffer(sock);
 	if (result != ISC_R_SUCCESS) {
 		isc__nm_failed_read_cb(sock, result, true);
+	} else if (!sock->client) {
+		/*
+		 * Stop reading if we have accumulated enough bytes in
+		 * the send queue; this means that the TCP client is not
+		 * reading back the data we sending to it, and there's
+		 * no reason to continue processing more incoming DNS
+		 * messages, if the client is not reading back the
+		 * responses.
+		 */
+		size_t write_queue_size =
+			uv_stream_get_write_queue_size(&sock->uv_handle.stream);
+
+		if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) {
+			isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+				      ISC_LOGMODULE_NETMGR, ISC_LOG_DEBUG(3),
+				      "throttling TCP connection, "
+				      "the other side is "
+				      "not reading the data (%zu)",
+				      write_queue_size);
+			sock->reading_throttled = true;
+			isc__nm_stop_reading(sock);
+		}
 	}
 free:
 	if (nread < 0) {
@@ -1135,6 +1157,19 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
 	return;
 }
 
+static void
+tcpdns_maybe_restart_reading(isc_nmsocket_t *sock) {
+	if (!sock->client && sock->reading_throttled &&
+	    !uv_is_active(&sock->uv_handle.handle))
+	{
+		isc_result_t result = isc__nm_process_sock_buffer(sock);
+		if (result != ISC_R_SUCCESS) {
+			atomic_store(&sock->reading, true);
+			isc__nm_failed_read_cb(sock, result, false);
+		}
+	}
+}
+
 static void
 tcpdns_send_cb(uv_write_t *req, int status) {
 	isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
@@ -1152,10 +1187,23 @@ tcpdns_send_cb(uv_write_t *req, int status) {
 		isc__nm_incstats(sock, STATID_SENDFAIL);
 		isc__nm_failed_send_cb(sock, uvreq,
 				       isc__nm_uverr2result(status));
+
+		if (!sock->client &&
+		    (atomic_load(&sock->reading) || sock->reading_throttled))
+		{
+			/*
+			 * As we are resuming reading, it is not throttled
+			 * anymore (technically).
+			 */
+			sock->reading_throttled = false;
+			isc__nm_start_reading(sock);
+			isc__nmsocket_reset(sock);
+		}
 		return;
 	}
 
 	isc__nm_sendcb(sock, uvreq, ISC_R_SUCCESS, false);
+	tcpdns_maybe_restart_reading(sock);
 }
 
 /*
@@ -1221,6 +1269,13 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
 		goto fail;
 	}
 
+	isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
+		      ISC_LOG_DEBUG(3),
+		      "throttling TCP connection, the other side is not "
+		      "reading the data, switching to uv_write()");
+	sock->reading_throttled = true;
+	isc__nm_stop_reading(sock);
+
 	r = uv_write(&uvreq->uv_req.write, &sock->uv_handle.stream, bufs, nbufs,
 		     tcpdns_send_cb);
 	if (r < 0) {
diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c
index 7a005db9b9..b9b3971ac3 100644
--- a/lib/isc/netmgr/tlsdns.c
+++ b/lib/isc/netmgr/tlsdns.c
@@ -88,6 +88,9 @@ tlsdns_set_tls_shutdown(isc_tls_t *tls) {
 	(void)SSL_set_shutdown(tls, SSL_SENT_SHUTDOWN);
 }
 
+static void
+tlsdns_maybe_restart_reading(isc_nmsocket_t *sock);
+
 static bool
 peer_verification_has_failed(isc_nmsocket_t *sock) {
 	if (sock->tls.tls != NULL && sock->tls.state == TLS_STATE_HANDSHAKE &&
@@ -1084,6 +1087,19 @@ tls_cycle_input(isc_nmsocket_t *sock) {
 		size_t len;
 
 		for (;;) {
+			/*
+			 * There is a similar branch in
+			 * isc__nm_process_sock_buffer() which is sufficient to
+			 * stop excessive processing in TCP. However, as we wrap
+			 * this call in a loop, we need to have it here in order
+			 * to limit the number of loop iterations (and,
+			 * consequently, the number of messages processed).
+			 */
+			if (atomic_load(&sock->ah) >= STREAM_CLIENTS_PER_CONN) {
+				isc__nm_stop_reading(sock);
+				break;
+			}
+
 			(void)SSL_peek(sock->tls.tls, &(char){ '\0' }, 0);
 
 			int pending = SSL_pending(sock->tls.tls);
@@ -1261,17 +1277,17 @@ call_pending_send_callbacks(isc_nmsocket_t *sock, const isc_result_t result) {
 }
 
 static void
-free_senddata(isc_nmsocket_t *sock, const isc_result_t result) {
+free_senddata(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
+	      const isc_result_t result) {
 	REQUIRE(VALID_NMSOCK(sock));
-	REQUIRE(sock->tls.senddata.base != NULL);
-	REQUIRE(sock->tls.senddata.length > 0);
+	REQUIRE(req != NULL && req->userbuf.base != NULL &&
+		req->userbuf.length > 0);
 
-	isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base,
-		    sock->tls.senddata.length);
-	sock->tls.senddata.base = NULL;
-	sock->tls.senddata.length = 0;
+	isc_mem_put(sock->mgr->mctx, req->userbuf.base, req->userbuf.length);
 
 	call_pending_send_callbacks(sock, result);
+
+	isc__nm_uvreq_put(&req, sock);
 }
 
 static void
@@ -1284,11 +1300,19 @@ tls_write_cb(uv_write_t *req, int status) {
 	isc_nm_timer_stop(uvreq->timer);
 	isc_nm_timer_detach(&uvreq->timer);
 
-	free_senddata(sock, result);
-
-	isc__nm_uvreq_put(&uvreq, sock);
+	free_senddata(sock, uvreq, result);
 
 	if (status != 0) {
+		if (!sock->client &&
+		    (atomic_load(&sock->reading) || sock->reading_throttled))
+		{
+			/*
+			 * As we are resuming reading, it is not throttled
+			 * anymore (technically).
+			 */
+			sock->reading_throttled = false;
+			isc__nm_start_reading(sock);
+		}
 		tls_error(sock, result);
 		return;
 	}
@@ -1298,6 +1322,8 @@ tls_write_cb(uv_write_t *req, int status) {
 		tls_error(sock, result);
 		return;
 	}
+
+	tlsdns_maybe_restart_reading(sock);
 }
 
 static isc_result_t
@@ -1311,23 +1337,18 @@ tls_cycle_output(isc_nmsocket_t *sock) {
 		int rv;
 		int r;
 
-		if (sock->tls.senddata.base != NULL ||
-		    sock->tls.senddata.length > 0)
-		{
-			break;
-		}
-
 		if (pending > (int)ISC_NETMGR_TCP_RECVBUF_SIZE) {
 			pending = (int)ISC_NETMGR_TCP_RECVBUF_SIZE;
 		}
 
-		sock->tls.senddata.base = isc_mem_get(sock->mgr->mctx, pending);
-		sock->tls.senddata.length = pending;
-
 		/* It's a bit misnomer here, but it does the right thing */
 		req = isc__nm_get_read_req(sock, NULL);
-		req->uvbuf.base = (char *)sock->tls.senddata.base;
-		req->uvbuf.len = sock->tls.senddata.length;
+
+		req->userbuf.base = isc_mem_get(sock->mgr->mctx, pending);
+		req->userbuf.length = (size_t)pending;
+
+		req->uvbuf.base = (char *)req->userbuf.base;
+		req->uvbuf.len = (size_t)req->userbuf.length;
 
 		rv = BIO_read_ex(sock->tls.app_rbio, req->uvbuf.base,
 				 req->uvbuf.len, &bytes);
@@ -1339,32 +1360,36 @@ tls_cycle_output(isc_nmsocket_t *sock) {
 
 		if (r == pending) {
 			/* Wrote everything, restart */
-			isc__nm_uvreq_put(&req, sock);
-			free_senddata(sock, ISC_R_SUCCESS);
+			free_senddata(sock, req, ISC_R_SUCCESS);
 			continue;
 		}
 
 		if (r > 0) {
 			/* Partial write, send rest asynchronously */
-			memmove(req->uvbuf.base, req->uvbuf.base + r,
-				req->uvbuf.len - r);
-			req->uvbuf.len = req->uvbuf.len - r;
+			req->uvbuf.base += r;
+			req->uvbuf.len -= r;
 		} else if (r == UV_ENOSYS || r == UV_EAGAIN) {
 			/* uv_try_write is not supported, send
 			 * asynchronously */
 		} else {
 			result = isc__nm_uverr2result(r);
-			isc__nm_uvreq_put(&req, sock);
-			free_senddata(sock, result);
+			free_senddata(sock, req, result);
 			break;
 		}
 
+		isc_log_write(
+			isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
+			ISC_LOG_DEBUG(3),
+			"throttling TCP connection, the other side is not "
+			"reading the data, switching to uv_write()");
+		sock->reading_throttled = true;
+		isc__nm_stop_reading(sock);
+
 		r = uv_write(&req->uv_req.write, &sock->uv_handle.stream,
 			     &req->uvbuf, 1, tls_write_cb);
 		if (r < 0) {
 			result = isc__nm_uverr2result(r);
-			isc__nm_uvreq_put(&req, sock);
-			free_senddata(sock, result);
+			free_senddata(sock, req, result);
 			break;
 		}
 
@@ -1533,6 +1558,28 @@ isc__nm_tlsdns_read_cb(uv_stream_t *stream, ssize_t nread,
 	result = tls_cycle(sock);
 	if (result != ISC_R_SUCCESS) {
 		isc__nm_failed_read_cb(sock, result, true);
+	} else if (!sock->client) {
+		/*
+		 * Stop reading if we have accumulated enough bytes in
+		 * the send queue; this means that the TCP client is not
+		 * reading back the data we sending to it, and there's
+		 * no reason to continue processing more incoming DNS
+		 * messages, if the client is not reading back the
+		 * responses.
+		 */
+		size_t write_queue_size =
+			uv_stream_get_write_queue_size(&sock->uv_handle.stream);
+
+		if (write_queue_size >= ISC_NETMGR_TCP_SENDBUF_SIZE) {
+			isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
+				      ISC_LOGMODULE_NETMGR, ISC_LOG_DEBUG(3),
+				      "throttling TCP connection, "
+				      "the other side is "
+				      "not reading the data (%zu)",
+				      write_queue_size);
+			sock->reading_throttled = true;
+			isc__nm_stop_reading(sock);
+		}
 	}
 free:
 	async_tlsdns_cycle(sock);
@@ -1776,6 +1823,19 @@ isc__nm_tlsdns_send(isc_nmhandle_t *handle, isc_region_t *region,
 	return;
 }
 
+static void
+tlsdns_maybe_restart_reading(isc_nmsocket_t *sock) {
+	if (!sock->client && sock->reading_throttled &&
+	    !uv_is_active(&sock->uv_handle.handle))
+	{
+		isc_result_t result = isc__nm_process_sock_buffer(sock);
+		if (result != ISC_R_SUCCESS) {
+			atomic_store(&sock->reading, true);
+			isc__nm_failed_read_cb(sock, result, false);
+		}
+	}
+}
+
 /*
  * Handle 'tcpsend' async event - send a packet on the socket
  */
diff --git a/lib/ns/client.c b/lib/ns/client.c
index a62343bc7b..8981222840 100644
--- a/lib/ns/client.c
+++ b/lib/ns/client.c
@@ -101,6 +101,9 @@
 #define COOKIE_SIZE 24U /* 8 + 4 + 4 + 8 */
 #define ECS_SIZE    20U /* 2 + 1 + 1 + [0..16] */
 
+#define TCPBUFFERS_FILLCOUNT 1U
+#define TCPBUFFERS_FREEMAX   8U
+
 #define WANTNSID(x)	(((x)->attributes & NS_CLIENTATTR_WANTNSID) != 0)
 #define WANTEXPIRE(x)	(((x)->attributes & NS_CLIENTATTR_WANTEXPIRE) != 0)
 #define WANTPAD(x)	(((x)->attributes & NS_CLIENTATTR_WANTPAD) != 0)
@@ -330,12 +333,36 @@ client_senddone(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
 				      NS_LOGMODULE_CLIENT, ISC_LOG_DEBUG(3),
 				      "send failed: %s",
 				      isc_result_totext(result));
+			isc_nm_bad_request(handle);
 		}
 	}
 
 	isc_nmhandle_detach(&handle);
 }
 
+static void
+client_setup_tcp_buffer(ns_client_t *client) {
+	REQUIRE(client->tcpbuf == NULL);
+
+	client->tcpbuf = client->manager->tcp_buffer;
+	client->tcpbuf_size = NS_CLIENT_TCP_BUFFER_SIZE;
+}
+
+static void
+client_put_tcp_buffer(ns_client_t *client) {
+	if (client->tcpbuf == NULL) {
+		return;
+	}
+
+	if (client->tcpbuf != client->manager->tcp_buffer) {
+		isc_mem_put(client->manager->mctx, client->tcpbuf,
+			    client->tcpbuf_size);
+	}
+
+	client->tcpbuf = NULL;
+	client->tcpbuf_size = 0;
+}
+
 static void
 client_allocsendbuf(ns_client_t *client, isc_buffer_t *buffer,
 		    unsigned char **datap) {
@@ -345,12 +372,9 @@ client_allocsendbuf(ns_client_t *client, isc_buffer_t *buffer,
 	REQUIRE(datap != NULL);
 
 	if (TCP_CLIENT(client)) {
-		INSIST(client->tcpbuf == NULL);
-		client->tcpbuf = isc_mem_get(client->manager->send_mctx,
-					     NS_CLIENT_TCP_BUFFER_SIZE);
-		client->tcpbuf_size = NS_CLIENT_TCP_BUFFER_SIZE;
+		client_setup_tcp_buffer(client);
 		data = client->tcpbuf;
-		isc_buffer_init(buffer, data, NS_CLIENT_TCP_BUFFER_SIZE);
+		isc_buffer_init(buffer, data, client->tcpbuf_size);
 	} else {
 		data = client->sendbuf;
 		if ((client->attributes & NS_CLIENTATTR_HAVECOOKIE) == 0) {
@@ -383,11 +407,49 @@ client_sendpkg(ns_client_t *client, isc_buffer_t *buffer) {
 
 	if (isc_buffer_base(buffer) == client->tcpbuf) {
 		size_t used = isc_buffer_usedlength(buffer);
-		client->tcpbuf = isc_mem_reget(client->manager->send_mctx,
-					       client->tcpbuf,
-					       client->tcpbuf_size, used);
-		client->tcpbuf_size = used;
-		r.base = client->tcpbuf;
+		INSIST(client->tcpbuf_size == NS_CLIENT_TCP_BUFFER_SIZE);
+
+		/*
+		 * Copy the data into a smaller buffer before sending,
+		 * and keep the original big TCP send buffer for reuse
+		 * by other clients.
+		 */
+		if (used > NS_CLIENT_SEND_BUFFER_SIZE) {
+			/*
+			 * We can save space by allocating a new buffer with a
+			 * correct size and freeing the big buffer.
+			 */
+			unsigned char *new_tcpbuf =
+				isc_mem_get(client->manager->mctx, used);
+			memmove(new_tcpbuf, buffer->base, used);
+
+			/*
+			 * Put the big buffer so we can replace the pointer
+			 * and the size with the new ones.
+			 */
+			client_put_tcp_buffer(client);
+
+			/*
+			 * Keep the new buffer's information so it can be freed.
+			 */
+			client->tcpbuf = new_tcpbuf;
+			client->tcpbuf_size = used;
+
+			r.base = new_tcpbuf;
+		} else {
+			/*
+			 * The data fits in the available space in
+			 * 'sendbuf', there is no need for a new buffer.
+			 */
+			memmove(client->sendbuf, buffer->base, used);
+
+			/*
+			 * Put the big buffer, we don't need a dynamic buffer.
+			 */
+			client_put_tcp_buffer(client);
+
+			r.base = client->sendbuf;
+		}
 		r.length = used;
 	} else {
 		isc_buffer_usedregion(buffer, &r);
@@ -461,8 +523,7 @@ ns_client_sendraw(ns_client_t *client, dns_message_t *message) {
 	return;
 done:
 	if (client->tcpbuf != NULL) {
-		isc_mem_put(client->manager->send_mctx, client->tcpbuf,
-			    client->tcpbuf_size);
+		client_put_tcp_buffer(client);
 	}
 
 	ns_client_drop(client, result);
@@ -746,8 +807,7 @@ renderend:
 
 cleanup:
 	if (client->tcpbuf != NULL) {
-		isc_mem_put(client->manager->send_mctx, client->tcpbuf,
-			    client->tcpbuf_size);
+		client_put_tcp_buffer(client);
 	}
 
 	if (cleanup_cctx) {
@@ -1629,8 +1689,7 @@ ns__client_reset_cb(void *client0) {
 
 	ns_client_endrequest(client);
 	if (client->tcpbuf != NULL) {
-		isc_mem_put(client->manager->send_mctx, client->tcpbuf,
-			    client->tcpbuf_size);
+		client_put_tcp_buffer(client);
 	}
 
 	if (client->keytag != NULL) {
@@ -1661,8 +1720,6 @@ ns__client_put_cb(void *client0) {
 	client->magic = 0;
 	client->shuttingdown = true;
 
-	isc_mem_put(client->manager->send_mctx, client->sendbuf,
-		    NS_CLIENT_SEND_BUFFER_SIZE);
 	if (client->opt != NULL) {
 		INSIST(dns_rdataset_isassociated(client->opt));
 		dns_rdataset_disassociate(client->opt);
@@ -2339,8 +2396,6 @@ ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
 		dns_message_create(client->mctx, DNS_MESSAGE_INTENTPARSE,
 				   &client->message);
 
-		client->sendbuf = isc_mem_get(client->manager->send_mctx,
-					      NS_CLIENT_SEND_BUFFER_SIZE);
 		/*
 		 * Set magic earlier than usual because ns_query_init()
 		 * and the functions it calls will require it.
@@ -2357,7 +2412,6 @@ ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
 		ns_clientmgr_t *oldmgr = client->manager;
 		ns_server_t *sctx = client->sctx;
 		isc_task_t *task = client->task;
-		unsigned char *sendbuf = client->sendbuf;
 		dns_message_t *message = client->message;
 		isc_mem_t *oldmctx = client->mctx;
 		ns_query_t query = client->query;
@@ -2372,7 +2426,6 @@ ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
 					 .manager = oldmgr,
 					 .sctx = sctx,
 					 .task = task,
-					 .sendbuf = sendbuf,
 					 .message = message,
 					 .query = query,
 					 .tid = tid };
@@ -2397,8 +2450,6 @@ ns__client_setup(ns_client_t *client, ns_clientmgr_t *mgr, bool new) {
 	return (ISC_R_SUCCESS);
 
 cleanup:
-	isc_mem_put(client->manager->send_mctx, client->sendbuf,
-		    NS_CLIENT_SEND_BUFFER_SIZE);
 	dns_message_detach(&client->message);
 	isc_task_detach(&client->task);
 	ns_clientmgr_detach(&client->manager);
@@ -2461,8 +2512,6 @@ clientmgr_destroy(ns_clientmgr_t *manager) {
 	isc_task_detach(&manager->task);
 	ns_server_detach(&manager->sctx);
 
-	isc_mem_detach(&manager->send_mctx);
-
 	isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
 }
 
@@ -2499,61 +2548,6 @@ ns_clientmgr_create(ns_server_t *sctx, isc_taskmgr_t *taskmgr,
 
 	ISC_LIST_INIT(manager->recursing);
 
-	/*
-	 * We create specialised per-worker memory context specifically
-	 * dedicated and tuned for allocating send buffers as it is a very
-	 * common operation. Not doing so may result in excessive memory
-	 * use in certain workloads.
-	 *
-	 * Please see this thread for more details:
-	 *
-	 * https://github.com/jemalloc/jemalloc/issues/2483
-	 *
-	 * In particular, this information from the jemalloc developers is
-	 * of the most interest:
-	 *
-	 * https://github.com/jemalloc/jemalloc/issues/2483#issuecomment-1639019699
-	 * https://github.com/jemalloc/jemalloc/issues/2483#issuecomment-1698173849
-	 *
-	 * In essence, we use the following memory management strategy:
-	 *
-	 * 1. We use a per-worker memory arena for send buffers memory
-	 * allocation to reduce lock contention (In reality, we create a
-	 * per-client manager arena, but we have one client manager per
-	 * worker).
-	 *
-	 * 2. The automatically created arenas settings remain unchanged
-	 * and may be controlled by users (e.g. by setting the
-	 * "MALLOC_CONF" variable).
-	 *
-	 * 3. We attune the arenas to not use dirty pages cache as the
-	 * cache would have a poor reuse rate, and that is known to
-	 * significantly contribute to excessive memory use.
-	 *
-	 * 4. There is no strict need for the dirty cache, as there is a
-	 * per arena bin for each allocation size, so because we initially
-	 * allocate strictly 64K per send buffer (enough for a DNS
-	 * message), allocations would get directed to one bin (an "object
-	 * pool" or a "slab") maintained within an arena. That is, there
-	 * is an object pool already, specifically to optimise for the
-	 * case of frequent allocations of objects of the given size. The
-	 * object pool should suffice our needs, as we will end up
-	 * recycling the objects from there without the need to back it by
-	 * an additional layer of dirty pages cache. The dirty pages cache
-	 * would have worked better in the case when there are more
-	 * allocation bins involved due to a higher reuse rate (the case
-	 * of a more "generic" memory management).
-	 */
-	isc_mem_create_arena(&manager->send_mctx);
-	isc_mem_setname(manager->send_mctx, "sendbufs");
-	(void)isc_mem_arena_set_dirty_decay_ms(manager->send_mctx, 0);
-	/*
-	 * Disable muzzy pages cache too, as versions < 5.2.0 have it
-	 * enabled by default. The muzzy pages cache goes right below the
-	 * dirty pages cache and backs it.
-	 */
-	(void)isc_mem_arena_set_muzzy_decay_ms(manager->send_mctx, 0);
-
 	manager->magic = MANAGER_MAGIC;
 
 	MTRACE("create");
diff --git a/lib/ns/include/ns/client.h b/lib/ns/include/ns/client.h
index 7a7196f763..ea2d83e079 100644
--- a/lib/ns/include/ns/client.h
+++ b/lib/ns/include/ns/client.h
@@ -144,7 +144,6 @@ struct ns_clientmgr {
 	unsigned int magic;
 
 	isc_mem_t      *mctx;
-	isc_mem_t      *send_mctx;
 	ns_server_t    *sctx;
 	isc_taskmgr_t  *taskmgr;
 	isc_timermgr_t *timermgr;
@@ -159,6 +158,8 @@ struct ns_clientmgr {
 	/* Lock covers the recursing list */
 	isc_mutex_t   reclock;
 	client_list_t recursing; /*%< Recursing clients */
+
+	uint8_t tcp_buffer[NS_CLIENT_TCP_BUFFER_SIZE];
 };
 
 /*% nameserver client structure */
@@ -187,7 +188,6 @@ struct ns_client {
 	unsigned char  *tcpbuf;
 	size_t		tcpbuf_size;
 	dns_message_t  *message;
-	unsigned char  *sendbuf;
 	dns_rdataset_t *opt;
 	dns_ednsopt_t  *ede;
 	uint16_t	udpsize;
@@ -240,6 +240,8 @@ struct ns_client {
 	 * bits will be used as the rcode in the response message.
 	 */
 	int32_t rcode_override;
+
+	uint8_t sendbuf[NS_CLIENT_SEND_BUFFER_SIZE];
 };
 
 #define NS_CLIENT_MAGIC	   ISC_MAGIC('N', 'S', 'C', 'c')
