autobahn add same serverside rxflow cache to client

Server side has had immediate RX flow control for quite a while.

But client side made do with RX continuing until what had been received was exhausted.

For what Autobahn tests, that's not enough.

This patch gives clientside RX flow control the same immediate effect as the server
side enjoys, re-using the same code.

Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/lib/client.c b/lib/client.c
index d712e02..51e30f0 100644
--- a/lib/client.c
+++ b/lib/client.c
@@ -25,21 +25,41 @@
 {
 	unsigned int n;
 
+	lwsl_debug("%s: len %u\n", __func__, len);
+
 	switch (wsi->mode) {
 	case LWSCM_WSCL_WAITING_PROXY_REPLY:
 	case LWSCM_WSCL_ISSUE_HANDSHAKE:
 	case LWSCM_WSCL_WAITING_SERVER_REPLY:
 	case LWSCM_WSCL_WAITING_EXTENSION_CONNECT:
 	case LWSCM_WS_CLIENT:
-		for (n = 0; n < len; n++)
+		for (n = 0; n < len; n++) {
+			/*
+			 * we were accepting input but now we stopped doing so
+			 */
+			if (!(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+				lwsl_debug("%s: caching %d\n", __func__, len - n);
+				lws_rxflow_cache(wsi, *buf, 0, len - n);
+				return 0;
+			}
+
+			/* account for what we're using in rxflow buffer */
+			if (wsi->rxflow_buffer)
+				wsi->rxflow_pos++;
+
 			if (lws_client_rx_sm(wsi, *(*buf)++)) {
 				lwsl_debug("client_rx_sm failed\n");
 				return 1;
 			}
+		}
+		lwsl_debug("%s: finished with %d\n", __func__, len);
 		return 0;
 	default:
 		break;
 	}
+
+	lwsl_debug("%s: did nothing\n", __func__);
+
 	return 0;
 }
 
diff --git a/lib/handshake.c b/lib/handshake.c
index bf9437f..00b543a 100644
--- a/lib/handshake.c
+++ b/lib/handshake.c
@@ -108,12 +108,15 @@
 			/* Handshake indicates this session is done. */
 			goto bail;
 
-		/* It's possible that we've exhausted our data already, but
-		 * lws_handshake_server doesn't update len for us.
+		/*
+		 * It's possible that we've exhausted our data already, or
+		 * rx flow control has stopped us dealing with this early,
+		 * but lws_handshake_server doesn't update len for us.
 		 * Figure out how much was read, so that we can proceed
 		 * appropriately:
 		 */
 		len -= (buf - last_char);
+		lwsl_debug("%s: thinks we have used %d\n", __func__, len);
 
 		if (!wsi->hdr_parsing_completed)
 			/* More header content on the way */
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index 380f227..82f317a 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -974,3 +974,38 @@
 
 	wsi->u.ws.close_in_ping_buffer_len = p - start;
 }
+
+LWS_EXTERN int
+_lws_rx_flow_control(struct lws *wsi)
+{
+	/* there is no pending change */
+	if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
+		return 0;
+
+	/* stuff is still buffered, not ready to really accept new input */
+	if (wsi->rxflow_buffer) {
+		/* get ourselves called back to deal with stashed buffer */
+		lws_callback_on_writable(wsi);
+		return 0;
+	}
+
+	/* pending is cleared, we can change rxflow state */
+
+	wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
+
+	lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
+			      wsi->rxflow_change_to & LWS_RXFLOW_ALLOW);
+
+	/* adjust the pollfd for this wsi */
+
+	if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) {
+		if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
+			lwsl_info("%s: fail\n", __func__);
+			return -1;
+		}
+	} else
+		if (lws_change_pollfd(wsi, LWS_POLLIN, 0))
+			return -1;
+
+	return 0;
+}
diff --git a/lib/output.c b/lib/output.c
index 5af0e26..2f91352 100644
--- a/lib/output.c
+++ b/lib/output.c
@@ -24,8 +24,13 @@
 static int
 lws_0405_frame_mask_generate(struct lws *wsi)
 {
-	int n;
-
+#if 0
+	wsi->u.ws.mask_nonce[0] = 0;
+	wsi->u.ws.mask_nonce[1] = 0;
+	wsi->u.ws.mask_nonce[2] = 0;
+	wsi->u.ws.mask_nonce[3] = 0;
+#else
+		int n;
 	/* fetch the per-frame nonce */
 
 	n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask_nonce, 4);
@@ -34,7 +39,7 @@
 			    SYSTEM_RANDOM_FILEPATH, n);
 		return 1;
 	}
-
+#endif
 	/* start masking from first byte of masking key buffer */
 	wsi->u.ws.frame_mask_index = 0;
 
@@ -260,8 +265,10 @@
 
 	/* if we are continuing a frame that already had its header done */
 
-	if (wsi->u.ws.inside_frame)
+	if (wsi->u.ws.inside_frame) {
+		lwsl_debug("INSIDE FRAME\n");
 		goto do_more_inside_frame;
+	}
 
 	wsi->u.ws.clean_buffer = 1;
 
diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h
index 07fb2a1..b91417f 100644
--- a/lib/private-libwebsockets.h
+++ b/lib/private-libwebsockets.h
@@ -1060,9 +1060,6 @@
 LWS_EXTERN int
 lws_issue_raw_ext_access(struct lws *wsi, unsigned char *buf, size_t len);
 
-LWS_EXTERN int
-_lws_rx_flow_control(struct lws *wsi);
-
 LWS_EXTERN void
 lws_union_transition(struct lws *wsi, enum connection_mode mode);
 
@@ -1242,19 +1239,20 @@
 #define lws_context_init_client_ssl(_a, _b) (0)
 #define lws_handshake_client(_a, _b, _c) (0)
 #endif
+
+LWS_EXTERN int
+_lws_rx_flow_control(struct lws *wsi);
+
 #ifndef LWS_NO_SERVER
 LWS_EXTERN int
 lws_server_socket_service(struct lws_context *context, struct lws *wsi,
 			  struct lws_pollfd *pollfd);
 LWS_EXTERN int
-_lws_rx_flow_control(struct lws *wsi);
-LWS_EXTERN int
 lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len);
 LWS_EXTERN int
 _lws_server_listen_accept_flow_control(struct lws_context *context, int on);
 #else
 #define lws_server_socket_service(_a, _b, _c) (0)
-#define _lws_rx_flow_control(_a) (0)
 #define lws_handshake_server(_a, _b, _c) (0)
 #define _lws_server_listen_accept_flow_control(a, b) (0)
 #endif
diff --git a/lib/server.c b/lib/server.c
index fa99f13..c60405e 100644
--- a/lib/server.c
+++ b/lib/server.c
@@ -147,41 +147,6 @@
 }
 
 int
-_lws_rx_flow_control(struct lws *wsi)
-{
-	/* there is no pending change */
-	if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
-		return 0;
-
-	/* stuff is still buffered, not ready to really accept new input */
-	if (wsi->rxflow_buffer) {
-		/* get ourselves called back to deal with stashed buffer */
-		lws_callback_on_writable(wsi);
-		return 0;
-	}
-
-	/* pending is cleared, we can change rxflow state */
-
-	wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
-
-	lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
-			      wsi->rxflow_change_to & LWS_RXFLOW_ALLOW);
-
-	/* adjust the pollfd for this wsi */
-
-	if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) {
-		if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
-			lwsl_info("%s: fail\n", __func__);
-			return -1;
-		}
-	} else
-		if (lws_change_pollfd(wsi, LWS_POLLIN, 0))
-			return -1;
-
-	return 0;
-}
-
-int
 _lws_server_listen_accept_flow_control(struct lws_context *context, int on)
 {
 	struct lws *wsi = context->wsi_listening;
@@ -733,7 +698,7 @@
 
 		/* any incoming data ready? */
 
-		if (!(pollfd->revents & LWS_POLLIN))
+		if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
 			goto try_pollout;
 
 		len = lws_ssl_capable_read(wsi, context->serv_buf,
diff --git a/lib/service.c b/lib/service.c
index 46b924c..8d5c2d5 100644
--- a/lib/service.c
+++ b/lib/service.c
@@ -461,7 +461,7 @@
 	/* handle session socket closed */
 
 	if ((!(pollfd->revents & LWS_POLLIN)) &&
-			(pollfd->revents & LWS_POLLHUP)) {
+	    (pollfd->revents & LWS_POLLHUP)) {
 
 		lwsl_debug("Session Socket %p (fd=%d) dead\n",
 						       (void *)wsi, pollfd->fd);
@@ -511,7 +511,7 @@
 		}
 
 		if (wsi->rxflow_buffer &&
-			      (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+		    (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
 			lwsl_info("draining rxflow\n");
 			/* well, drain it */
 			eff_buf.token = (char *)wsi->rxflow_buffer +
@@ -522,11 +522,11 @@
 		}
 
 		/* any incoming data ready? */
-
-		if (!(pollfd->revents & LWS_POLLIN))
+		/* notice if rx flow going off raced poll(), rx flow wins */
+		if (wsi->rxflow_buffer ||
+		    !(pollfd->revents & pollfd->events & LWS_POLLIN))
 			break;
 read:
-
 		eff_buf.token_len = lws_ssl_capable_read(wsi,
 					context->serv_buf,
 					pending ? pending :
@@ -572,6 +572,13 @@
 			/* service incoming data */
 
 			if (eff_buf.token_len) {
+				/*
+				 * if draining from rxflow buffer, not
+				 * critical to track what was used since at the
+				 * use it bumps wsi->rxflow_pos.  If we come
+				 * around again it will pick up from where it
+				 * left off.
+				 */
 				n = lws_read(wsi, (unsigned char *)eff_buf.token,
 					     eff_buf.token_len);
 				if (n < 0) {