From eb306cebbb5ac3f69302e277338bf8d14e948ad1 Mon Sep 17 00:00:00 2001 From: Paul Sokolovsky Date: Sun, 19 Jun 2016 15:53:59 +0300 Subject: [PATCH] extmod/modlwip: Store a chain of incoming pbufs, instead of only one. Storing a chain of pbuf was an original design of @pfalcon's lwIP socket module. The problem with storing just one, like modlwip does is that "peer closed connection" notification is completely asynchronous and out of band. So, there may be following sequence of actions: 1. pbuf #1 arrives, and stored in a socket. 2. pbuf #2 arrives, and rejected, which causes lwIP to put it into a queue to re-deliver later. 3. "Peer closed connection" is signaled, and socket is set at such status. 4. pbuf #1 is processed. 5. There's no stored pbufs in teh socket, and socket status is "peer closed connection", so EOF is returned to a client. 6. pbuf #2 gets redelivered. Apparently, there's no easy workaround for this, except to queue all incoming pbufs in a socket. This may lead to increased memory pressure, as number of pending packets would be regulated only by TCP/IP flow control, whereas with previous setup lwIP had a global overlook of number packets waiting for redelivery and could regulate them centrally. --- extmod/modlwip.c | 52 +++++++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/extmod/modlwip.c b/extmod/modlwip.c index dbee7c3d1..80df66264 100644 --- a/extmod/modlwip.c +++ b/extmod/modlwip.c @@ -239,7 +239,7 @@ typedef struct _lwip_socket_obj_t { byte peer[4]; mp_uint_t peer_port; mp_uint_t timeout; - uint16_t leftover_count; + uint16_t recv_offset; uint8_t domain; uint8_t type; @@ -354,11 +354,17 @@ STATIC err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err socket->state = STATE_PEER_CLOSED; exec_user_callback(socket); return ERR_OK; - } else if (socket->incoming.pbuf != NULL) { - // No room in the inn, let LWIP know it's still responsible for delivery later - return ERR_BUF; } - socket->incoming.pbuf = p; + + if (socket->incoming.pbuf == NULL) { + socket->incoming.pbuf = p; + } else { + #ifdef SOCKET_SINGLE_PBUF + return ERR_BUF; + #else + pbuf_cat(socket->incoming.pbuf, p); + #endif + } exec_user_callback(socket); @@ -536,22 +542,28 @@ STATIC mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_ struct pbuf *p = socket->incoming.pbuf; - if (socket->leftover_count == 0) { - socket->leftover_count = p->tot_len; + mp_uint_t remaining = p->len - socket->recv_offset; + if (len > remaining) { + len = remaining; } - u16_t result = pbuf_copy_partial(p, buf, ((socket->leftover_count >= len) ? len : socket->leftover_count), (p->tot_len - socket->leftover_count)); - if (socket->leftover_count > len) { - // More left over... - socket->leftover_count -= len; - } else { + memcpy(buf, (byte*)p->payload + socket->recv_offset, len); + + remaining -= len; + if (remaining == 0) { + socket->incoming.pbuf = p->next; + // If we don't ref here, free() will free the entire chain, + // if we ref, it does what we need: frees 1st buf, and decrements + // next buf's refcount back to 1. + pbuf_ref(p->next); pbuf_free(p); - socket->incoming.pbuf = NULL; - socket->leftover_count = 0; + socket->recv_offset = 0; + } else { + socket->recv_offset += len; } + tcp_recved(socket->pcb.tcp, len); - tcp_recved(socket->pcb.tcp, result); - return (mp_uint_t) result; + return len; } /*******************************************************************************/ @@ -561,8 +573,8 @@ STATIC const mp_obj_type_t lwip_socket_type; STATIC void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) { lwip_socket_obj_t *self = self_in; - mp_printf(print, "", self->state, self->timeout, - self->incoming.pbuf, self->leftover_count); + mp_printf(print, "", self->state, self->timeout, + self->incoming.pbuf, self->recv_offset); } // FIXME: Only supports two arguments at present @@ -612,7 +624,7 @@ STATIC mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, mp_uint_t n_args socket->incoming.pbuf = NULL; socket->timeout = -1; socket->state = STATE_NEW; - socket->leftover_count = 0; + socket->recv_offset = 0; return socket; } @@ -749,7 +761,7 @@ STATIC mp_obj_t lwip_socket_accept(mp_obj_t self_in) { socket2->incoming.pbuf = NULL; socket2->timeout = socket->timeout; socket2->state = STATE_CONNECTED; - socket2->leftover_count = 0; + socket2->recv_offset = 0; socket2->callback = MP_OBJ_NULL; tcp_arg(socket2->pcb.tcp, (void*)socket2); tcp_err(socket2->pcb.tcp, _lwip_tcp_error);