ceph: combine as many iovec as possile into one OSD request

Both ceph_sync_direct_write and ceph_sync_read iterate iovec elements
one by one, send one OSD request for each iovec. This is sub-optimal,
We can combine serveral iovec into one page vector, and send an OSD
request for the whole page vector.

Signed-off-by: Zhu, Caifeng <zhucaifeng@unissoft-nj.com>
Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
Zhu, Caifeng 2015-10-08 15:26:15 +08:00 committed by Ilya Dryomov
parent 777d738a5e
commit b5b98989dc

View file

@ -34,6 +34,74 @@
* need to wait for MDS acknowledgement.
*/
/*
* Calculate the length sum of direct io vectors that can
* be combined into one page vector.
*/
static size_t dio_get_pagev_size(const struct iov_iter *it)
{
const struct iovec *iov = it->iov;
const struct iovec *iovend = iov + it->nr_segs;
size_t size;
size = iov->iov_len - it->iov_offset;
/*
* An iov can be page vectored when both the current tail
* and the next base are page aligned.
*/
while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
(++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
size += iov->iov_len;
}
dout("dio_get_pagevlen len = %zu\n", size);
return size;
}
/*
* Allocate a page vector based on (@it, @nbytes).
* The return value is the tuple describing a page vector,
* that is (@pages, @page_align, @num_pages).
*/
static struct page **
dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
size_t *page_align, int *num_pages)
{
struct iov_iter tmp_it = *it;
size_t align;
struct page **pages;
int ret = 0, idx, npages;
align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
(PAGE_SIZE - 1);
npages = calc_pages_for(align, nbytes);
pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
if (!pages) {
pages = vmalloc(sizeof(*pages) * npages);
if (!pages)
return ERR_PTR(-ENOMEM);
}
for (idx = 0; idx < npages; ) {
size_t start;
ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
npages - idx, &start);
if (ret < 0)
goto fail;
iov_iter_advance(&tmp_it, ret);
nbytes -= ret;
idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
}
BUG_ON(nbytes != 0);
*num_pages = npages;
*page_align = align;
dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
return pages;
fail:
ceph_put_page_vector(pages, idx, false);
return ERR_PTR(ret);
}
/*
* Prepare an open request. Preallocate ceph_cap to avoid an
@ -458,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
size_t start;
ssize_t n;
n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
if (n < 0)
return n;
num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
n = dio_get_pagev_size(i);
pages = dio_get_pages_alloc(i, n, &start, &num_pages);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, n,
pages, num_pages, checkeof,
@ -592,7 +659,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
CEPH_OSD_FLAG_WRITE;
while (iov_iter_count(from) > 0) {
u64 len = iov_iter_single_seg_count(from);
u64 len = dio_get_pagev_size(from);
size_t start;
ssize_t n;
@ -611,14 +678,14 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
n = iov_iter_get_pages_alloc(from, &pages, len, &start);
if (unlikely(n < 0)) {
ret = n;
n = len;
pages = dio_get_pages_alloc(from, len, &start, &num_pages);
if (IS_ERR(pages)) {
ceph_osdc_put_request(req);
ret = PTR_ERR(pages);
break;
}
num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
/*
* throw out any page cache pages in this range. this
* may block.