Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Sokolovsky eb306cebbb extmod/modlwip: Store a chain of incoming pbufs, instead of only one.
Storing a chain of pbuf was an original design of @pfalcon's lwIP socket
module. The problem with storing just one, like modlwip does is that
"peer closed connection" notification is completely asynchronous and out of
band. So, there may be following sequence of actions:

1. pbuf #1 arrives, and stored in a socket.
2. pbuf #2 arrives, and rejected, which causes lwIP to put it into a
queue to re-deliver later.
3. "Peer closed connection" is signaled, and socket is set at such status.
4. pbuf #1 is processed.
5. There's no stored pbufs in teh socket, and socket status is "peer closed
connection", so EOF is returned to a client.
6. pbuf #2 gets redelivered.

Apparently, there's no easy workaround for this, except to queue all
incoming pbufs in a socket. This may lead to increased memory pressure,
as number of pending packets would be regulated only by TCP/IP flow
control, whereas with previous setup lwIP had a global overlook of number
packets waiting for redelivery and could regulate them centrally.
2016-06-19 18:50:04 +03:00
1 changed files with 32 additions and 20 deletions

View File

@ -239,7 +239,7 @@ typedef struct _lwip_socket_obj_t {
byte peer[4];
mp_uint_t peer_port;
mp_uint_t timeout;
uint16_t leftover_count;
uint16_t recv_offset;
uint8_t domain;
uint8_t type;
@ -354,11 +354,17 @@ STATIC err_t _lwip_tcp_recv(void *arg, struct tcp_pcb *tcpb, struct pbuf *p, err
socket->state = STATE_PEER_CLOSED;
exec_user_callback(socket);
return ERR_OK;
} else if (socket->incoming.pbuf != NULL) {
// No room in the inn, let LWIP know it's still responsible for delivery later
return ERR_BUF;
}
socket->incoming.pbuf = p;
if (socket->incoming.pbuf == NULL) {
socket->incoming.pbuf = p;
} else {
#ifdef SOCKET_SINGLE_PBUF
return ERR_BUF;
#else
pbuf_cat(socket->incoming.pbuf, p);
#endif
}
exec_user_callback(socket);
@ -536,22 +542,28 @@ STATIC mp_uint_t lwip_tcp_receive(lwip_socket_obj_t *socket, byte *buf, mp_uint_
struct pbuf *p = socket->incoming.pbuf;
if (socket->leftover_count == 0) {
socket->leftover_count = p->tot_len;
mp_uint_t remaining = p->len - socket->recv_offset;
if (len > remaining) {
len = remaining;
}
u16_t result = pbuf_copy_partial(p, buf, ((socket->leftover_count >= len) ? len : socket->leftover_count), (p->tot_len - socket->leftover_count));
if (socket->leftover_count > len) {
// More left over...
socket->leftover_count -= len;
} else {
memcpy(buf, (byte*)p->payload + socket->recv_offset, len);
remaining -= len;
if (remaining == 0) {
socket->incoming.pbuf = p->next;
// If we don't ref here, free() will free the entire chain,
// if we ref, it does what we need: frees 1st buf, and decrements
// next buf's refcount back to 1.
pbuf_ref(p->next);
pbuf_free(p);
socket->incoming.pbuf = NULL;
socket->leftover_count = 0;
socket->recv_offset = 0;
} else {
socket->recv_offset += len;
}
tcp_recved(socket->pcb.tcp, len);
tcp_recved(socket->pcb.tcp, result);
return (mp_uint_t) result;
return len;
}
/*******************************************************************************/
@ -561,8 +573,8 @@ STATIC const mp_obj_type_t lwip_socket_type;
STATIC void lwip_socket_print(const mp_print_t *print, mp_obj_t self_in, mp_print_kind_t kind) {
lwip_socket_obj_t *self = self_in;
mp_printf(print, "<socket state=%d timeout=%d incoming=%p remaining=%d>", self->state, self->timeout,
self->incoming.pbuf, self->leftover_count);
mp_printf(print, "<socket state=%d timeout=%d incoming=%p off=%d>", self->state, self->timeout,
self->incoming.pbuf, self->recv_offset);
}
// FIXME: Only supports two arguments at present
@ -612,7 +624,7 @@ STATIC mp_obj_t lwip_socket_make_new(const mp_obj_type_t *type, mp_uint_t n_args
socket->incoming.pbuf = NULL;
socket->timeout = -1;
socket->state = STATE_NEW;
socket->leftover_count = 0;
socket->recv_offset = 0;
return socket;
}
@ -749,7 +761,7 @@ STATIC mp_obj_t lwip_socket_accept(mp_obj_t self_in) {
socket2->incoming.pbuf = NULL;
socket2->timeout = socket->timeout;
socket2->state = STATE_CONNECTED;
socket2->leftover_count = 0;
socket2->recv_offset = 0;
socket2->callback = MP_OBJ_NULL;
tcp_arg(socket2->pcb.tcp, (void*)socket2);
tcp_err(socket2->pcb.tcp, _lwip_tcp_error);