py/stream: Refactor mp_stream_writeall() to be suitable for non-blocking streams.

stream-refactor
Paul Sokolovsky 2016-05-12 03:30:39 +03:00
parent 92a342a011
commit d3576f5957
4 changed files with 26 additions and 19 deletions

View File

@ -251,7 +251,7 @@ STATIC mp_uint_t _webrepl_read(mp_obj_t self_in, void *buf, mp_uint_t size, int
DEBUG_printf("webrepl: Writing %lu bytes to file\n", buf_sz);
int err;
mp_uint_t res = mp_stream_writeall(self->cur_file, filebuf, buf_sz, &err);
if(res == MP_STREAM_ERROR) {
if (err != 0) {
assert(0);
}

View File

@ -241,7 +241,7 @@ STATIC mp_uint_t websocket_write(mp_obj_t self_in, const void *buf, mp_uint_t si
}
mp_uint_t out_sz = mp_stream_writeall(self->sock, header, hdr_sz, errcode);
if (out_sz != MP_STREAM_ERROR) {
if (*errcode == 0) {
out_sz = mp_stream_writeall(self->sock, buf, size, errcode);
}
@ -250,6 +250,9 @@ STATIC mp_uint_t websocket_write(mp_obj_t self_in, const void *buf, mp_uint_t si
mp_call_method_n_kw(1, 0, dest);
}
if (*errcode != 0) {
return MP_STREAM_ERROR;
}
return out_sz;
}

View File

@ -79,7 +79,7 @@ STATIC mp_uint_t bufwriter_write(mp_obj_t self_in, const void *buf, mp_uint_t si
buf = (byte*)buf + rem;
size -= rem;
mp_uint_t out_sz = mp_stream_writeall(self->stream, self->buf, self->alloc, errcode);
if (out_sz == MP_STREAM_ERROR) {
if (*errcode != 0) {
return MP_STREAM_ERROR;
}
self->len = 0;
@ -95,7 +95,7 @@ STATIC mp_obj_t bufwriter_flush(mp_obj_t self_in) {
int err;
mp_uint_t out_sz = mp_stream_writeall(self->stream, self->buf, self->len, &err);
self->len = 0;
if (out_sz == MP_STREAM_ERROR) {
if (err != 0) {
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(err)));
}
}

View File

@ -49,6 +49,25 @@ STATIC mp_obj_t stream_readall(mp_obj_t self_in);
#define STREAM_CONTENT_TYPE(stream) (((stream)->is_text) ? &mp_type_str : &mp_type_bytes)
// Returns error condition in *errcode, if non-zero, return value is number of bytes written
// before error condition occured. If *errcode == 0, returns total bytes written (which will
// be equal to input size).
mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode) {
mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
*errcode = 0;
mp_uint_t written = 0;
while (size > 0) {
mp_uint_t out_sz = s->type->stream_p->write(stream, buf, size, errcode);
if (out_sz == MP_STREAM_ERROR) {
return written;
}
buf += out_sz;
size -= out_sz;
written += out_sz;
}
return written;
}
const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) {
mp_obj_base_t *o = (mp_obj_base_t*)MP_OBJ_TO_PTR(self_in);
const mp_stream_p_t *stream_p = o->type->stream_p;
@ -209,21 +228,6 @@ void mp_stream_write_adaptor(void *self, const char *buf, size_t len) {
mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len);
}
// Works only with blocking streams
mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode) {
mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
mp_uint_t org_size = size;
while (size > 0) {
mp_uint_t out_sz = s->type->stream_p->write(stream, buf, size, errcode);
if (out_sz == MP_STREAM_ERROR) {
return MP_STREAM_ERROR;
}
buf += out_sz;
size -= out_sz;
}
return org_size;
}
STATIC mp_obj_t stream_write_method(mp_obj_t self_in, mp_obj_t arg) {
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ);