1
0
Fork 0

orangefs: support very large directories

This works by maintaining a linked list of pages which the directory
has been read into rather than one giant fixed-size buffer.

This replaces code which limits the total directory size to the total
amount that could be returned in one server request.  Since filenames
are usually considerably shorter than the maximum, the old code could
usually handle several server requests before running out of space.

Signed-off-by: Martin Brandenburg <martin@omnibond.com>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
zero-colors
Martin Brandenburg 2017-04-25 15:38:01 -04:00 committed by Mike Marshall
parent 72f66b8329
commit 480e3e532e
1 changed files with 196 additions and 99 deletions

View File

@ -6,6 +6,22 @@
#include "orangefs-kernel.h"
#include "orangefs-bufmap.h"
struct orangefs_dir_part {
struct orangefs_dir_part *next;
size_t len;
};
struct orangefs_dir {
__u64 token;
struct orangefs_dir_part *part;
loff_t end;
int error;
};
#define PART_SHIFT (24)
#define PART_SIZE (1<<24)
#define PART_MASK (~(PART_SIZE - 1))
/*
* There can be up to 512 directory entries. Each entry is encoded as
* follows:
@ -15,42 +31,39 @@
* padding to 8 bytes
* 16 bytes: khandle
* padding to 8 bytes
*/
#define MAX_DIRECTORY ((4 + 257 + 3 + 16)*512)
struct orangefs_dir {
__u64 token;
void *directory;
size_t len;
int error;
};
/*
* The userspace component sends several directory entries of the
* following format. The first four bytes are the string length not
* including a trailing zero byte. This is followed by the string and a
* trailing zero padded to the next four byte boundry. This is followed
* by the sixteen byte khandle padded to the next eight byte boundry.
*
* The trailer_buf starts with a struct orangefs_readdir_response_s
* which must be skipped to get to the directory data.
*
* The data which is received from the userspace daemon is termed a
* part and is stored in a linked list in case more than one part is
* needed for a large directory.
*
* The position pointer (ctx->pos) encodes the part and offset on which
* to begin reading at. Bits above PART_SHIFT encode the part and bits
* below PART_SHIFT encode the offset. Parts are stored in a linked
* list which grows as data is received from the server. The overhead
* associated with managing the list is presumed to be small compared to
* the overhead of communicating with the server.
*
* As data is received from the server, it is placed at the end of the
* part list. Data is parsed from the current position as it is needed.
* When data is determined to be corrupt, it is either because the
* userspace component has sent back corrupt data or because the file
* pointer has been moved to an invalid location. Since the two cannot
* be differentiated, return EIO.
*
* Part zero is synthesized to contains `.' and `..'. Part one is the
* first part of the part list.
*/
static int orangefs_dir_more(struct orangefs_inode_s *oi,
struct orangefs_dir *od, struct dentry *dentry)
static int do_readdir(struct orangefs_inode_s *oi,
struct orangefs_dir *od, struct dentry *dentry,
struct orangefs_kernel_op_s *op)
{
const size_t offset =
sizeof(struct orangefs_readdir_response_s);
struct orangefs_readdir_response_s *resp;
struct orangefs_kernel_op_s *op;
int bufi, r;
op = op_alloc(ORANGEFS_VFS_OP_READDIR);
if (!op) {
od->error = -ENOMEM;
return -ENOMEM;
}
/*
* Despite the badly named field, readdir does not use shared
* memory. However, there are a limited number of readdir
@ -66,7 +79,6 @@ static int orangefs_dir_more(struct orangefs_inode_s *oi,
again:
bufi = orangefs_readdir_index_get();
if (bufi < 0) {
op_release(op);
od->error = bufi;
return bufi;
}
@ -84,7 +96,6 @@ again:
goto again;
} else if (r == -EIO) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = r;
return r;
}
@ -92,82 +103,166 @@ again:
if (r < 0) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = r;
return r;
} else if (op->downcall.status) {
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = op->downcall.status;
return op->downcall.status;
}
/*
* The maximum size is size per entry times the 512 entries plus
* the header. This is well under the limit.
*/
if (op->downcall.trailer_size > PART_SIZE) {
vfree(op->downcall.trailer_buf);
od->error = -EIO;
return -EIO;
}
resp = (struct orangefs_readdir_response_s *)
op->downcall.trailer_buf;
od->token = resp->token;
return 0;
}
if (od->len + op->downcall.trailer_size - offset <=
MAX_DIRECTORY) {
memcpy(od->directory + od->len,
op->downcall.trailer_buf + offset,
op->downcall.trailer_size - offset);
od->len += op->downcall.trailer_size - offset;
} else {
/* This limit was chosen based on protocol limits. */
gossip_err("orangefs_dir_more: userspace sent too much data\n");
vfree(op->downcall.trailer_buf);
op_release(op);
od->error = -EIO;
return -EIO;
static int parse_readdir(struct orangefs_dir *od,
struct orangefs_kernel_op_s *op)
{
struct orangefs_dir_part *part, *new;
size_t count;
count = 1;
part = od->part;
while (part && part->next) {
part = part->next;
count++;
}
vfree(op->downcall.trailer_buf);
op_release(op);
new = (void *)op->downcall.trailer_buf;
new->next = NULL;
new->len = op->downcall.trailer_size -
sizeof(struct orangefs_readdir_response_s);
if (!od->part)
od->part = new;
else
part->next = new;
count++;
od->end = count << PART_SHIFT;
return 0;
}
static int orangefs_dir_more(struct orangefs_inode_s *oi,
struct orangefs_dir *od, struct dentry *dentry)
{
struct orangefs_kernel_op_s *op;
int r;
op = op_alloc(ORANGEFS_VFS_OP_READDIR);
if (!op) {
od->error = -ENOMEM;
return -ENOMEM;
}
r = do_readdir(oi, od, dentry, op);
if (r) {
od->error = r;
goto out;
}
r = parse_readdir(od, op);
if (r) {
od->error = r;
goto out;
}
od->error = 0;
out:
op_release(op);
return od->error;
}
static int fill_from_part(struct orangefs_dir_part *part,
struct dir_context *ctx)
{
const int offset = sizeof(struct orangefs_readdir_response_s);
struct orangefs_khandle *khandle;
__u32 *len, padlen;
loff_t i;
char *s;
i = ctx->pos & ~PART_MASK;
/* The file offset from userspace is too large. */
if (i > part->len)
return -EIO;
while (i < part->len) {
if (part->len < i + sizeof *len)
return -EIO;
len = (void *)part + offset + i;
/*
* len is the size of the string itself. padlen is the
* total size of the encoded string.
*/
padlen = (sizeof *len + *len + 1) +
(8 - (sizeof *len + *len + 1)%8)%8;
if (part->len < i + padlen + sizeof *khandle)
return -EIO;
s = (void *)part + offset + i + sizeof *len;
if (s[*len] != 0)
return -EIO;
khandle = (void *)part + offset + i + padlen;
if (!dir_emit(ctx, s, *len,
orangefs_khandle_to_ino(khandle),
DT_UNKNOWN))
return 0;
i += padlen + sizeof *khandle;
i = i + (8 - i%8)%8;
BUG_ON(i > part->len);
ctx->pos = (ctx->pos & PART_MASK) | i;
}
return 1;
}
static int orangefs_dir_fill(struct orangefs_inode_s *oi,
struct orangefs_dir *od, struct dentry *dentry,
struct dir_context *ctx)
{
struct orangefs_khandle *khandle;
__u32 *len, padlen;
loff_t i;
char *s;
i = ctx->pos - 2;
while (i < od->len) {
if (od->len < i + sizeof *len)
goto eio;
len = od->directory + i;
/*
* len is the size of the string itself. padlen is the
* total size of the encoded string.
*/
padlen = (sizeof *len + *len + 1) +
(4 - (sizeof *len + *len + 1)%8)%8;
if (od->len < i + padlen + sizeof *khandle)
goto eio;
s = od->directory + i + sizeof *len;
if (s[*len] != 0)
goto eio;
khandle = od->directory + i + padlen;
struct orangefs_dir_part *part;
size_t count;
if (!dir_emit(ctx, s, *len,
orangefs_khandle_to_ino(khandle), DT_UNKNOWN))
return 0;
i += padlen + sizeof *khandle;
i = i + (8 - i%8)%8;
ctx->pos = i + 2;
count = ((ctx->pos & PART_MASK) >> PART_SHIFT) - 1;
part = od->part;
while (part->next && count) {
count--;
part = part->next;
}
/* This means the userspace file offset is invalid. */
if (count) {
od->error = -EIO;
return -EIO;
}
while (part && part->len) {
int r;
r = fill_from_part(part, ctx);
if (r < 0) {
od->error = r;
return r;
} else if (r == 0) {
/* Userspace buffer is full. */
break;
} else {
/*
* The part ran out of data. Move to the next
* part. */
ctx->pos = (ctx->pos & PART_MASK) +
(1 << PART_SHIFT);
part = part->next;
}
}
BUG_ON(i > od->len);
return 0;
eio:
/*
* Here either data from userspace is corrupt or the application
* has sought to an invalid location.
*/
od->error = -EIO;
return -EIO;
}
static int orangefs_dir_iterate(struct file *file,
@ -193,28 +288,33 @@ static int orangefs_dir_iterate(struct file *file,
if (ctx->pos == 1) {
if (!dir_emit_dotdot(file, ctx))
return 0;
ctx->pos++;
ctx->pos = 1 << PART_SHIFT;
}
/*
* The seek position is in the first synthesized part but is not
* valid.
*/
if ((ctx->pos & PART_MASK) == 0)
return -EIO;
r = 0;
/*
* Must read more if the user has sought past what has been read
* so far. Stop a user who has sought past the end.
*/
while (od->token != ORANGEFS_READDIR_END && ctx->pos - 2 >
od->len) {
while (od->token != ORANGEFS_READDIR_END &&
ctx->pos > od->end) {
r = orangefs_dir_more(oi, od, dentry);
if (r)
return r;
}
if (od->token == ORANGEFS_READDIR_END && ctx->pos - 2 >
od->len) {
if (od->token == ORANGEFS_READDIR_END && ctx->pos > od->end)
return -EIO;
}
/* Then try to fill if there's any left in the buffer. */
if (ctx->pos - 2 < od->len) {
if (ctx->pos < od->end) {
r = orangefs_dir_fill(oi, od, dentry, ctx);
if (r)
return r;
@ -240,16 +340,8 @@ static int orangefs_dir_open(struct inode *inode, struct file *file)
return -ENOMEM;
od = file->private_data;
od->token = ORANGEFS_READDIR_START;
/*
* XXX: It seems wasteful to allocate such a large buffer for
* each request. Most will be much smaller.
*/
od->directory = alloc_pages_exact(MAX_DIRECTORY, GFP_KERNEL);
if (!od->directory) {
kfree(file->private_data);
return -ENOMEM;
}
od->len = 0;
od->part = NULL;
od->end = 1 << PART_SHIFT;
od->error = 0;
return 0;
}
@ -257,8 +349,13 @@ static int orangefs_dir_open(struct inode *inode, struct file *file)
static int orangefs_dir_release(struct inode *inode, struct file *file)
{
struct orangefs_dir *od = file->private_data;
struct orangefs_dir_part *part = od->part;
orangefs_flush_inode(inode);
free_pages_exact(od->directory, MAX_DIRECTORY);
while (part) {
struct orangefs_dir_part *next = part->next;
vfree(part);
part = next;
}
kfree(od);
return 0;
}