rbd: kill rbd_rq_fn() and all other related code

Now that the request function has been replaced by one using the new
request management data structures the old one can go away.
Deleting it makes rbd_dev_do_request() no longer needed, and
deleting that makes other functions unneeded, and so on.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Alex Elder 2012-11-30 17:53:04 -06:00 committed by Sage Weil
parent bf0d5f503d
commit 2250a71b59

View file

@ -621,18 +621,6 @@ static void rbd_put_client(struct rbd_client *rbdc)
kref_put(&rbdc->kref, rbd_client_release);
}
/*
* Destroy requests collection
*/
static void rbd_coll_release(struct kref *kref)
{
struct rbd_req_coll *coll =
container_of(kref, struct rbd_req_coll, kref);
dout("rbd_coll_release %p\n", coll);
kfree(coll);
}
static bool rbd_image_format_valid(u32 image_format)
{
return image_format == 1 || image_format == 2;
@ -876,28 +864,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
return length;
}
static int rbd_get_num_segments(struct rbd_image_header *header,
u64 ofs, u64 len)
{
u64 start_seg;
u64 end_seg;
u64 result;
if (!len)
return 0;
if (len - 1 > U64_MAX - ofs)
return -ERANGE;
start_seg = ofs >> header->obj_order;
end_seg = (ofs + len - 1) >> header->obj_order;
result = end_seg - start_seg + 1;
if (result > (u64) INT_MAX)
return -ERANGE;
return (int) result;
}
/*
* returns the size of an object in the image
*/
@ -1216,52 +1182,6 @@ static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
kfree(op);
}
static void rbd_coll_end_req_index(struct request *rq,
struct rbd_req_coll *coll,
int index,
s32 ret, u64 len)
{
struct request_queue *q;
int min, max, i;
dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
coll, index, (int)ret, (unsigned long long)len);
if (!rq)
return;
if (!coll) {
blk_end_request(rq, ret, len);
return;
}
q = rq->q;
spin_lock_irq(q->queue_lock);
coll->status[index].done = 1;
coll->status[index].rc = ret;
coll->status[index].bytes = len;
max = min = coll->num_done;
while (max < coll->total && coll->status[max].done)
max++;
for (i = min; i<max; i++) {
__blk_end_request(rq, (int)coll->status[i].rc,
coll->status[i].bytes);
coll->num_done++;
kref_put(&coll->kref, rbd_coll_release);
}
spin_unlock_irq(q->queue_lock);
}
static void rbd_coll_end_req(struct rbd_request *rbd_req,
s32 ret, u64 len)
{
rbd_coll_end_req_index(rbd_req->rq,
rbd_req->coll, rbd_req->coll_index,
ret, len);
}
/*
* Send ceph osd request
*/
@ -1361,46 +1281,6 @@ done_osd_req:
return ret;
}
/*
* Ceph osd op callback
*/
static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
{
struct rbd_request *rbd_req = osd_req->r_priv;
struct ceph_osd_reply_head *replyhead;
struct ceph_osd_op *op;
s32 rc;
u64 bytes;
int read_op;
/* parse reply */
replyhead = msg->front.iov_base;
WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
op = (void *)(replyhead + 1);
rc = (s32)le32_to_cpu(replyhead->result);
bytes = le64_to_cpu(op->extent.length);
read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
(unsigned long long) bytes, read_op, (int) rc);
if (rc == (s32)-ENOENT && read_op) {
zero_bio_chain(rbd_req->bio, 0);
rc = 0;
} else if (rc == 0 && read_op && bytes < rbd_req->len) {
zero_bio_chain(rbd_req->bio, bytes);
bytes = rbd_req->len;
}
rbd_coll_end_req(rbd_req, rc, bytes);
if (rbd_req->bio)
bio_chain_put(rbd_req->bio);
ceph_osdc_put_request(osd_req);
kfree(rbd_req);
}
static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
struct ceph_msg *msg)
{
@ -1448,70 +1328,6 @@ done:
return ret;
}
/*
* Do an asynchronous ceph osd operation
*/
static int rbd_do_op(struct request *rq,
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
u64 ofs, u64 len,
struct bio *bio,
struct rbd_req_coll *coll,
int coll_index)
{
const char *seg_name;
u64 seg_ofs;
u64 seg_len;
int ret;
struct ceph_osd_req_op *op;
int opcode;
int flags;
u64 snapid;
seg_name = rbd_segment_name(rbd_dev, ofs);
if (!seg_name)
return -ENOMEM;
seg_len = rbd_segment_length(rbd_dev, ofs, len);
seg_ofs = rbd_segment_offset(rbd_dev, ofs);
if (rq_data_dir(rq) == WRITE) {
opcode = CEPH_OSD_OP_WRITE;
flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
snapid = CEPH_NOSNAP;
} else {
opcode = CEPH_OSD_OP_READ;
flags = CEPH_OSD_FLAG_READ;
rbd_assert(!snapc);
snapid = rbd_dev->spec->snap_id;
}
ret = -ENOMEM;
op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
if (!op)
goto done;
/* we've taken care of segment sizes earlier when we
cloned the bios. We should never have a segment
truncated at this point */
rbd_assert(seg_len == len);
ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
seg_name, seg_ofs, seg_len,
bio,
NULL, 0,
flags,
op,
coll, coll_index,
rbd_req_cb, NULL);
if (ret < 0)
rbd_coll_end_req_index(rq, coll, coll_index,
(s32)ret, seg_len);
rbd_osd_req_op_destroy(op);
done:
kfree(seg_name);
return ret;
}
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
{
@ -1683,78 +1499,6 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
return ret;
}
static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
{
struct rbd_req_coll *coll =
kzalloc(sizeof(struct rbd_req_coll) +
sizeof(struct rbd_req_status) * num_reqs,
GFP_ATOMIC);
if (!coll)
return NULL;
coll->total = num_reqs;
kref_init(&coll->kref);
return coll;
}
static int rbd_dev_do_request(struct request *rq,
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
u64 ofs, unsigned int size,
struct bio *bio_chain)
{
int num_segs;
struct rbd_req_coll *coll;
unsigned int bio_offset;
int cur_seg = 0;
dout("%s 0x%x bytes at 0x%llx\n",
rq_data_dir(rq) == WRITE ? "write" : "read",
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
if (num_segs <= 0)
return num_segs;
coll = rbd_alloc_coll(num_segs);
if (!coll)
return -ENOMEM;
bio_offset = 0;
do {
u64 limit = rbd_segment_length(rbd_dev, ofs, size);
unsigned int clone_size;
struct bio *bio_clone;
BUG_ON(limit > (u64)UINT_MAX);
clone_size = (unsigned int)limit;
dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
kref_get(&coll->kref);
/* Pass a cloned bio chain via an osd request */
bio_clone = bio_chain_clone_range(&bio_chain,
&bio_offset, clone_size,
GFP_ATOMIC);
if (bio_clone)
(void)rbd_do_op(rq, rbd_dev, snapc,
ofs, clone_size,
bio_clone, coll, cur_seg);
else
rbd_coll_end_req_index(rq, coll, cur_seg,
(s32)-ENOMEM,
clone_size);
size -= clone_size;
ofs += clone_size;
cur_seg++;
} while (size > 0);
kref_put(&coll->kref, rbd_coll_release);
return 0;
}
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
struct ceph_osd_op *op)
{
@ -2235,68 +1979,6 @@ end_request:
}
}
/*
* block device queue callback
*/
static void rbd_rq_fn(struct request_queue *q)
{
struct rbd_device *rbd_dev = q->queuedata;
bool read_only = rbd_dev->mapping.read_only;
struct request *rq;
while ((rq = blk_fetch_request(q))) {
struct ceph_snap_context *snapc = NULL;
unsigned int size = 0;
int result;
dout("fetched request\n");
/* Filter out block requests we don't understand */
if ((rq->cmd_type != REQ_TYPE_FS)) {
__blk_end_request_all(rq, 0);
continue;
}
spin_unlock_irq(q->queue_lock);
/* Write requests need a reference to the snapshot context */
if (rq_data_dir(rq) == WRITE) {
result = -EROFS;
if (read_only) /* Can't write to a read-only device */
goto out_end_request;
/*
* Note that each osd request will take its
* own reference to the snapshot context
* supplied. The reference we take here
* just guarantees the one we provide stays
* valid.
*/
down_read(&rbd_dev->header_rwsem);
snapc = ceph_get_snap_context(rbd_dev->header.snapc);
up_read(&rbd_dev->header_rwsem);
rbd_assert(snapc != NULL);
} else if (!atomic_read(&rbd_dev->exists)) {
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
dout("request for non-existent snapshot");
result = -ENXIO;
goto out_end_request;
}
size = blk_rq_bytes(rq);
result = rbd_dev_do_request(rq, rbd_dev, snapc,
blk_rq_pos(rq) * SECTOR_SIZE,
size, rq->bio);
out_end_request:
if (snapc)
ceph_put_snap_context(snapc);
spin_lock_irq(q->queue_lock);
if (!size || result < 0)
__blk_end_request_all(rq, result);
}
}
/*
* a queue callback. Makes sure that we don't create a bio that spans across
* multiple osd objects. One exception would be with a single page bios,
@ -2546,7 +2228,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
disk->fops = &rbd_bd_ops;
disk->private_data = rbd_dev;
(void) rbd_rq_fn; /* avoid a warning */
q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
if (!q)
goto out_disk;