Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: fix ioctl magic
  ceph: Behave better when handling file lock replies.
  ceph: pass lock information by struct file_lock instead of as individual params.
  ceph: Handle file locks in replies from the MDS.
  ceph: avoid possible null deref in readdir after dir llseek
This commit is contained in:
Linus Torvalds 2010-12-14 11:02:15 -08:00
commit e97b71ded9
5 changed files with 111 additions and 61 deletions

View file

@ -114,8 +114,8 @@ static int __dcache_readdir(struct file *filp,
spin_lock(&dcache_lock); spin_lock(&dcache_lock);
/* start at beginning? */ /* start at beginning? */
if (filp->f_pos == 2 || (last && if (filp->f_pos == 2 || last == NULL ||
filp->f_pos < ceph_dentry(last)->offset)) { filp->f_pos < ceph_dentry(last)->offset) {
if (list_empty(&parent->d_subdirs)) if (list_empty(&parent->d_subdirs))
goto out_unlock; goto out_unlock;
p = parent->d_subdirs.prev; p = parent->d_subdirs.prev;

View file

@ -4,7 +4,7 @@
#include <linux/ioctl.h> #include <linux/ioctl.h>
#include <linux/types.h> #include <linux/types.h>
#define CEPH_IOCTL_MAGIC 0x98 #define CEPH_IOCTL_MAGIC 0x97
/* just use u64 to align sanely on all archs */ /* just use u64 to align sanely on all archs */
struct ceph_ioctl_layout { struct ceph_ioctl_layout {

View file

@ -11,40 +11,68 @@
* Implement fcntl and flock locking functions. * Implement fcntl and flock locking functions.
*/ */
static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
u64 pid, u64 pid_ns, int cmd, u8 wait, struct file_lock *fl)
int cmd, u64 start, u64 length, u8 wait)
{ {
struct inode *inode = file->f_dentry->d_inode; struct inode *inode = file->f_dentry->d_inode;
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc; ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_mds_request *req; struct ceph_mds_request *req;
int err; int err;
u64 length = 0;
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req)) if (IS_ERR(req))
return PTR_ERR(req); return PTR_ERR(req);
req->r_inode = igrab(inode); req->r_inode = igrab(inode);
/* mds requires start and length rather than start and end */
if (LLONG_MAX == fl->fl_end)
length = 0;
else
length = fl->fl_end - fl->fl_start + 1;
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d", (int)lock_type, "length: %llu, wait: %d, type`: %d", (int)lock_type,
(int)operation, pid, start, length, wait, cmd); (int)operation, (u64)fl->fl_pid, fl->fl_start,
length, wait, fl->fl_type);
req->r_args.filelock_change.rule = lock_type; req->r_args.filelock_change.rule = lock_type;
req->r_args.filelock_change.type = cmd; req->r_args.filelock_change.type = cmd;
req->r_args.filelock_change.pid = cpu_to_le64(pid); req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid);
/* This should be adjusted, but I'm not sure if /* This should be adjusted, but I'm not sure if
namespaces actually get id numbers*/ namespaces actually get id numbers*/
req->r_args.filelock_change.pid_namespace = req->r_args.filelock_change.pid_namespace =
cpu_to_le64((u64)pid_ns); cpu_to_le64((u64)(unsigned long)fl->fl_nspid);
req->r_args.filelock_change.start = cpu_to_le64(start); req->r_args.filelock_change.start = cpu_to_le64(fl->fl_start);
req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.length = cpu_to_le64(length);
req->r_args.filelock_change.wait = wait; req->r_args.filelock_change.wait = wait;
err = ceph_mdsc_do_request(mdsc, inode, req); err = ceph_mdsc_do_request(mdsc, inode, req);
if ( operation == CEPH_MDS_OP_GETFILELOCK){
fl->fl_pid = le64_to_cpu(req->r_reply_info.filelock_reply->pid);
if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
fl->fl_type = F_RDLCK;
else if (CEPH_LOCK_EXCL == req->r_reply_info.filelock_reply->type)
fl->fl_type = F_WRLCK;
else
fl->fl_type = F_UNLCK;
fl->fl_start = le64_to_cpu(req->r_reply_info.filelock_reply->start);
length = le64_to_cpu(req->r_reply_info.filelock_reply->start) +
le64_to_cpu(req->r_reply_info.filelock_reply->length);
if (length >= 1)
fl->fl_end = length -1;
else
fl->fl_end = 0;
}
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, " dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d err code %d", (int)lock_type, "length: %llu, wait: %d, type`: %d, err code %d", (int)lock_type,
(int)operation, pid, start, length, wait, cmd, err); (int)operation, (u64)fl->fl_pid, fl->fl_start,
length, wait, fl->fl_type, err);
return err; return err;
} }
@ -54,7 +82,6 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
*/ */
int ceph_lock(struct file *file, int cmd, struct file_lock *fl) int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
{ {
u64 length;
u8 lock_cmd; u8 lock_cmd;
int err; int err;
u8 wait = 0; u8 wait = 0;
@ -76,29 +103,20 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
else else
lock_cmd = CEPH_LOCK_UNLOCK; lock_cmd = CEPH_LOCK_UNLOCK;
if (LLONG_MAX == fl->fl_end) err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl);
length = 0;
else
length = fl->fl_end - fl->fl_start + 1;
err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
(u64)fl->fl_pid,
(u64)(unsigned long)fl->fl_nspid,
lock_cmd, fl->fl_start,
length, wait);
if (!err) { if (!err) {
dout("mds locked, locking locally"); if ( op != CEPH_MDS_OP_GETFILELOCK ){
err = posix_lock_file(file, fl, NULL); dout("mds locked, locking locally");
if (err && (CEPH_MDS_OP_SETFILELOCK == op)) { err = posix_lock_file(file, fl, NULL);
/* undo! This should only happen if the kernel detects if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
* local deadlock. */ /* undo! This should only happen if the kernel detects
ceph_lock_message(CEPH_LOCK_FCNTL, op, file, * local deadlock. */
(u64)fl->fl_pid, ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
(u64)(unsigned long)fl->fl_nspid, CEPH_LOCK_UNLOCK, 0, fl);
CEPH_LOCK_UNLOCK, fl->fl_start, dout("got %d on posix_lock_file, undid lock", err);
length, 0); }
dout("got %d on posix_lock_file, undid lock", err);
} }
} else { } else {
dout("mds returned error code %d", err); dout("mds returned error code %d", err);
} }
@ -107,7 +125,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
int ceph_flock(struct file *file, int cmd, struct file_lock *fl) int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
{ {
u64 length;
u8 lock_cmd; u8 lock_cmd;
int err; int err;
u8 wait = 1; u8 wait = 1;
@ -127,26 +144,15 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
lock_cmd = CEPH_LOCK_EXCL; lock_cmd = CEPH_LOCK_EXCL;
else else
lock_cmd = CEPH_LOCK_UNLOCK; lock_cmd = CEPH_LOCK_UNLOCK;
/* mds requires start and length rather than start and end */
if (LLONG_MAX == fl->fl_end)
length = 0;
else
length = fl->fl_end - fl->fl_start + 1;
err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK, err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
file, (u64)fl->fl_pid, file, lock_cmd, wait, fl);
(u64)(unsigned long)fl->fl_nspid,
lock_cmd, fl->fl_start,
length, wait);
if (!err) { if (!err) {
err = flock_lock_file_wait(file, fl); err = flock_lock_file_wait(file, fl);
if (err) { if (err) {
ceph_lock_message(CEPH_LOCK_FLOCK, ceph_lock_message(CEPH_LOCK_FLOCK,
CEPH_MDS_OP_SETFILELOCK, CEPH_MDS_OP_SETFILELOCK,
file, (u64)fl->fl_pid, file, CEPH_LOCK_UNLOCK, 0, fl);
(u64)(unsigned long)fl->fl_nspid,
CEPH_LOCK_UNLOCK, fl->fl_start,
length, 0);
dout("got %d on flock_lock_file_wait, undid lock", err); dout("got %d on flock_lock_file_wait, undid lock", err);
} }
} else { } else {

View file

@ -201,6 +201,38 @@ out_bad:
return err; return err;
} }
/*
* parse fcntl F_GETLK results
*/
static int parse_reply_info_filelock(void **p, void *end,
struct ceph_mds_reply_info_parsed *info)
{
if (*p + sizeof(*info->filelock_reply) > end)
goto bad;
info->filelock_reply = *p;
*p += sizeof(*info->filelock_reply);
if (unlikely(*p != end))
goto bad;
return 0;
bad:
return -EIO;
}
/*
* parse extra results
*/
static int parse_reply_info_extra(void **p, void *end,
struct ceph_mds_reply_info_parsed *info)
{
if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
return parse_reply_info_filelock(p, end, info);
else
return parse_reply_info_dir(p, end, info);
}
/* /*
* parse entire mds reply * parse entire mds reply
*/ */
@ -223,10 +255,10 @@ static int parse_reply_info(struct ceph_msg *msg,
goto out_bad; goto out_bad;
} }
/* dir content */ /* extra */
ceph_decode_32_safe(&p, end, len, bad); ceph_decode_32_safe(&p, end, len, bad);
if (len > 0) { if (len > 0) {
err = parse_reply_info_dir(&p, p+len, info); err = parse_reply_info_extra(&p, p+len, info);
if (err < 0) if (err < 0)
goto out_bad; goto out_bad;
} }
@ -2074,7 +2106,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_lock(&session->s_mutex); mutex_lock(&session->s_mutex);
if (err < 0) { if (err < 0) {
pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds); pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
ceph_msg_dump(msg); ceph_msg_dump(msg);
goto out_err; goto out_err;
} }
@ -2094,7 +2126,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
mutex_lock(&req->r_fill_mutex); mutex_lock(&req->r_fill_mutex);
err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
if (err == 0) { if (err == 0) {
if (result == 0 && rinfo->dir_nr) if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
rinfo->dir_nr)
ceph_readdir_prepopulate(req, req->r_session); ceph_readdir_prepopulate(req, req->r_session);
ceph_unreserve_caps(mdsc, &req->r_caps_reservation); ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
} }

View file

@ -42,26 +42,37 @@ struct ceph_mds_reply_info_in {
}; };
/* /*
* parsed info about an mds reply, including information about the * parsed info about an mds reply, including information about
* target inode and/or its parent directory and dentry, and directory * either: 1) the target inode and/or its parent directory and dentry,
* contents (for readdir results). * and directory contents (for readdir results), or
* 2) the file range lock info (for fcntl F_GETLK results).
*/ */
struct ceph_mds_reply_info_parsed { struct ceph_mds_reply_info_parsed {
struct ceph_mds_reply_head *head; struct ceph_mds_reply_head *head;
/* trace */
struct ceph_mds_reply_info_in diri, targeti; struct ceph_mds_reply_info_in diri, targeti;
struct ceph_mds_reply_dirfrag *dirfrag; struct ceph_mds_reply_dirfrag *dirfrag;
char *dname; char *dname;
u32 dname_len; u32 dname_len;
struct ceph_mds_reply_lease *dlease; struct ceph_mds_reply_lease *dlease;
struct ceph_mds_reply_dirfrag *dir_dir; /* extra */
int dir_nr; union {
char **dir_dname; /* for fcntl F_GETLK results */
u32 *dir_dname_len; struct ceph_filelock *filelock_reply;
struct ceph_mds_reply_lease **dir_dlease;
struct ceph_mds_reply_info_in *dir_in; /* for readdir results */
u8 dir_complete, dir_end; struct {
struct ceph_mds_reply_dirfrag *dir_dir;
int dir_nr;
char **dir_dname;
u32 *dir_dname_len;
struct ceph_mds_reply_lease **dir_dlease;
struct ceph_mds_reply_info_in *dir_in;
u8 dir_complete, dir_end;
};
};
/* encoded blob describing snapshot contexts for certain /* encoded blob describing snapshot contexts for certain
operations (e.g., open) */ operations (e.g., open) */