1
0
Fork 0
alistair23-linux/drivers/staging/lustre/lustre/obdecho/echo_client.c

3219 lines
76 KiB
C
Raw Normal View History

staging: add Lustre file system client support Lustre is the most deployed distributed file system in the HPC (High Performance Computing) world. The patch adds its client side support. The code is not very clean and needs to live in drivers/staging for some time for continuing cleanup work. See drivers/staging/lustre/TODO for details. The code is based on Lustre master commit faefbfc04 commit faefbfc0460bc00f2ee4c1c1c86aa1e39b9eea49 Author: Alex Zhuravlev <alexey.zhuravlev@intel.com> Date: Tue Apr 30 23:05:21 2013 +0400 LU-3244 utils: tunefs.lustre should preserve virgin label Plus a few under-review patches on Whamcloud gerrit: 3.8 kernel support: http://review.whamcloud.com/#change,5973 http://review.whamcloud.com/#change,5974 http://review.whamcloud.com/#change,5768 http://review.whamcloud.com/#change,5781 http://review.whamcloud.com/#change,5763 http://review.whamcloud.com/#change,5613 http://review.whamcloud.com/#change,5655 3.9 kernel support: http://review.whamcloud.com/#change,5898 http://review.whamcloud.com/#change,5899 Kconfig/Kbuild: http://review.whamcloud.com/#change,4646 http://review.whamcloud.com/#change,4644 libcfs cleanup: http://review.whamcloud.com/#change,2831 http://review.whamcloud.com/#change,4775 http://review.whamcloud.com/#change,4776 http://review.whamcloud.com/#change,4777 http://review.whamcloud.com/#change,4778 http://review.whamcloud.com/#change,4779 http://review.whamcloud.com/#change,4780 All starting/trailing whitespaces are removed, to match kernel coding style. Also ran scripts/cleanfile on all lustre source files. [maked the Kconfig depend on BROKEN as the recent procfs changes causes this to fail - gregkh] Signed-off-by: Peng Tao <tao.peng@emc.com> Signed-off-by: Andreas Dilger <andreas.dilger@intel.com> Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
2013-05-02 02:46:55 -06:00
/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 only,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License version 2 for more details (a copy is included
* in the LICENSE file that accompanied this code).
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
* http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*
* GPL HEADER END
*/
/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
* Copyright (c) 2011, 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_ECHO
#include <linux/libcfs/libcfs.h>
#include <obd.h>
#include <obd_support.h>
#include <obd_class.h>
#include <lustre_debug.h>
#include <lprocfs_status.h>
#include <cl_object.h>
#include <lustre_fid.h>
#include <lustre_acl.h>
#include <lustre_net.h>
#include <obd_lov.h>
#include "echo_internal.h"
/** \defgroup echo_client Echo Client
* @{
*/
struct echo_device {
struct cl_device ed_cl;
struct echo_client_obd *ed_ec;
struct cl_site ed_site_myself;
struct cl_site *ed_site;
struct lu_device *ed_next;
int ed_next_islov;
int ed_next_ismd;
struct lu_client_seq *ed_cl_seq;
};
struct echo_object {
struct cl_object eo_cl;
struct cl_object_header eo_hdr;
struct echo_device *eo_dev;
struct list_head eo_obj_chain;
struct lov_stripe_md *eo_lsm;
atomic_t eo_npages;
int eo_deleted;
};
struct echo_object_conf {
struct cl_object_conf eoc_cl;
struct lov_stripe_md **eoc_md;
};
struct echo_page {
struct cl_page_slice ep_cl;
struct mutex ep_lock;
struct page *ep_vmpage;
};
struct echo_lock {
struct cl_lock_slice el_cl;
struct list_head el_chain;
struct echo_object *el_object;
__u64 el_cookie;
atomic_t el_refcount;
};
struct echo_io {
struct cl_io_slice ei_cl;
};
#if 0
struct echo_req {
struct cl_req_slice er_cl;
};
#endif
static int echo_client_setup(const struct lu_env *env,
struct obd_device *obddev,
struct lustre_cfg *lcfg);
static int echo_client_cleanup(struct obd_device *obddev);
/** \defgroup echo_helpers Helper functions
* @{
*/
static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
{
return container_of0(dev, struct echo_device, ed_cl);
}
static inline struct cl_device *echo_dev2cl(struct echo_device *d)
{
return &d->ed_cl;
}
static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
{
return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
}
static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
{
return &eco->eo_cl;
}
static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
{
return container_of(o, struct echo_object, eo_cl);
}
static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
{
return container_of(s, struct echo_page, ep_cl);
}
static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
{
return container_of(s, struct echo_lock, el_cl);
}
static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
{
return ecl->el_cl.cls_lock;
}
static struct lu_context_key echo_thread_key;
static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
{
struct echo_thread_info *info;
info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
LASSERT(info != NULL);
return info;
}
static inline
struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
{
return container_of(c, struct echo_object_conf, eoc_cl);
}
/** @} echo_helpers */
static struct echo_object *cl_echo_object_find(struct echo_device *d,
struct lov_stripe_md **lsm);
static int cl_echo_object_put(struct echo_object *eco);
static int cl_echo_enqueue (struct echo_object *eco, obd_off start,
obd_off end, int mode, __u64 *cookie);
static int cl_echo_cancel (struct echo_device *d, __u64 cookie);
static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
struct page **pages, int npages, int async);
static struct echo_thread_info *echo_env_info(const struct lu_env *env);
struct echo_thread_info {
struct echo_object_conf eti_conf;
struct lustre_md eti_md;
struct cl_2queue eti_queue;
struct cl_io eti_io;
struct cl_lock_descr eti_descr;
struct lu_fid eti_fid;
struct lu_fid eti_fid2;
struct md_op_spec eti_spec;
struct lov_mds_md_v3 eti_lmm;
struct lov_user_md_v3 eti_lum;
struct md_attr eti_ma;
struct lu_name eti_lname;
/* per-thread values, can be re-used */
void *eti_big_lmm;
int eti_big_lmmsize;
char eti_name[20];
struct lu_buf eti_buf;
char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
};
/* No session used right now */
struct echo_session_info {
unsigned long dummy;
};
static struct kmem_cache *echo_lock_kmem;
static struct kmem_cache *echo_object_kmem;
static struct kmem_cache *echo_thread_kmem;
static struct kmem_cache *echo_session_kmem;
//static struct kmem_cache *echo_req_kmem;
static struct lu_kmem_descr echo_caches[] = {
{
.ckd_cache = &echo_lock_kmem,
.ckd_name = "echo_lock_kmem",
.ckd_size = sizeof (struct echo_lock)
},
{
.ckd_cache = &echo_object_kmem,
.ckd_name = "echo_object_kmem",
.ckd_size = sizeof (struct echo_object)
},
{
.ckd_cache = &echo_thread_kmem,
.ckd_name = "echo_thread_kmem",
.ckd_size = sizeof (struct echo_thread_info)
},
{
.ckd_cache = &echo_session_kmem,
.ckd_name = "echo_session_kmem",
.ckd_size = sizeof (struct echo_session_info)
},
#if 0
{
.ckd_cache = &echo_req_kmem,
.ckd_name = "echo_req_kmem",
.ckd_size = sizeof (struct echo_req)
},
#endif
{
.ckd_cache = NULL
}
};
/** \defgroup echo_page Page operations
*
* Echo page operations.
*
* @{
*/
static struct page *echo_page_vmpage(const struct lu_env *env,
const struct cl_page_slice *slice)
{
return cl2echo_page(slice)->ep_vmpage;
}
static int echo_page_own(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *io, int nonblock)
{
struct echo_page *ep = cl2echo_page(slice);
if (!nonblock)
mutex_lock(&ep->ep_lock);
else if (!mutex_trylock(&ep->ep_lock))
return -EAGAIN;
return 0;
}
static void echo_page_disown(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *io)
{
struct echo_page *ep = cl2echo_page(slice);
LASSERT(mutex_is_locked(&ep->ep_lock));
mutex_unlock(&ep->ep_lock);
}
static void echo_page_discard(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
{
cl_page_delete(env, slice->cpl_page);
}
static int echo_page_is_vmlocked(const struct lu_env *env,
const struct cl_page_slice *slice)
{
if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
return -EBUSY;
return -ENODATA;
}
static void echo_page_completion(const struct lu_env *env,
const struct cl_page_slice *slice,
int ioret)
{
LASSERT(slice->cpl_page->cp_sync_io != NULL);
}
static void echo_page_fini(const struct lu_env *env,
struct cl_page_slice *slice)
{
struct echo_page *ep = cl2echo_page(slice);
struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
struct page *vmpage = ep->ep_vmpage;
ENTRY;
atomic_dec(&eco->eo_npages);
page_cache_release(vmpage);
EXIT;
}
static int echo_page_prep(const struct lu_env *env,
const struct cl_page_slice *slice,
struct cl_io *unused)
{
return 0;
}
static int echo_page_print(const struct lu_env *env,
const struct cl_page_slice *slice,
void *cookie, lu_printer_t printer)
{
struct echo_page *ep = cl2echo_page(slice);
(*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
return 0;
}
static const struct cl_page_operations echo_page_ops = {
.cpo_own = echo_page_own,
.cpo_disown = echo_page_disown,
.cpo_discard = echo_page_discard,
.cpo_vmpage = echo_page_vmpage,
.cpo_fini = echo_page_fini,
.cpo_print = echo_page_print,
.cpo_is_vmlocked = echo_page_is_vmlocked,
.io = {
[CRT_READ] = {
.cpo_prep = echo_page_prep,
.cpo_completion = echo_page_completion,
},
[CRT_WRITE] = {
.cpo_prep = echo_page_prep,
.cpo_completion = echo_page_completion,
}
}
};
/** @} echo_page */
/** \defgroup echo_lock Locking
*
* echo lock operations
*
* @{
*/
static void echo_lock_fini(const struct lu_env *env,
struct cl_lock_slice *slice)
{
struct echo_lock *ecl = cl2echo_lock(slice);
LASSERT(list_empty(&ecl->el_chain));
OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
}
static void echo_lock_delete(const struct lu_env *env,
const struct cl_lock_slice *slice)
{
struct echo_lock *ecl = cl2echo_lock(slice);
LASSERT(list_empty(&ecl->el_chain));
}
static int echo_lock_fits_into(const struct lu_env *env,
const struct cl_lock_slice *slice,
const struct cl_lock_descr *need,
const struct cl_io *unused)
{
return 1;
}
static struct cl_lock_operations echo_lock_ops = {
.clo_fini = echo_lock_fini,
.clo_delete = echo_lock_delete,
.clo_fits_into = echo_lock_fits_into
};
/** @} echo_lock */
/** \defgroup echo_cl_ops cl_object operations
*
* operations for cl_object
*
* @{
*/
static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, struct page *vmpage)
{
struct echo_page *ep = cl_object_page_slice(obj, page);
struct echo_object *eco = cl2echo_obj(obj);
ENTRY;
ep->ep_vmpage = vmpage;
page_cache_get(vmpage);
mutex_init(&ep->ep_lock);
cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
atomic_inc(&eco->eo_npages);
RETURN(0);
}
static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
return 0;
}
static int echo_lock_init(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *unused)
{
struct echo_lock *el;
ENTRY;
OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, __GFP_IO);
if (el != NULL) {
cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
el->el_object = cl2echo_obj(obj);
INIT_LIST_HEAD(&el->el_chain);
atomic_set(&el->el_refcount, 0);
}
RETURN(el == NULL ? -ENOMEM : 0);
}
static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf)
{
return 0;
}
static const struct cl_object_operations echo_cl_obj_ops = {
.coo_page_init = echo_page_init,
.coo_lock_init = echo_lock_init,
.coo_io_init = echo_io_init,
.coo_conf_set = echo_conf_set
};
/** @} echo_cl_ops */
/** \defgroup echo_lu_ops lu_object operations
*
* operations for echo lu object.
*
* @{
*/
static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
const struct lu_object_conf *conf)
{
struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
struct echo_client_obd *ec = ed->ed_ec;
struct echo_object *eco = cl2echo_obj(lu2cl(obj));
ENTRY;
if (ed->ed_next) {
struct lu_object *below;
struct lu_device *under;
under = ed->ed_next;
below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
under);
if (below == NULL)
RETURN(-ENOMEM);
lu_object_add(obj, below);
}
if (!ed->ed_next_ismd) {
const struct cl_object_conf *cconf = lu2cl_conf(conf);
struct echo_object_conf *econf = cl2echo_conf(cconf);
LASSERT(econf->eoc_md);
eco->eo_lsm = *econf->eoc_md;
/* clear the lsm pointer so that it won't get freed. */
*econf->eoc_md = NULL;
} else {
eco->eo_lsm = NULL;
}
eco->eo_dev = ed;
atomic_set(&eco->eo_npages, 0);
cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
spin_lock(&ec->ec_lock);
list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
spin_unlock(&ec->ec_lock);
RETURN(0);
}
/* taken from osc_unpackmd() */
static int echo_alloc_memmd(struct echo_device *ed,
struct lov_stripe_md **lsmp)
{
int lsm_size;
ENTRY;
/* If export is lov/osc then use their obd method */
if (ed->ed_next != NULL)
return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
/* OFD has no unpackmd method, do everything here */
lsm_size = lov_stripe_md_size(1);
LASSERT(*lsmp == NULL);
OBD_ALLOC(*lsmp, lsm_size);
if (*lsmp == NULL)
RETURN(-ENOMEM);
OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
if ((*lsmp)->lsm_oinfo[0] == NULL) {
OBD_FREE(*lsmp, lsm_size);
RETURN(-ENOMEM);
}
loi_init((*lsmp)->lsm_oinfo[0]);
(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
ostid_set_seq_echo(&(*lsmp)->lsm_oi);
RETURN(lsm_size);
}
static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
{
int lsm_size;
ENTRY;
/* If export is lov/osc then use their obd method */
if (ed->ed_next != NULL)
return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
/* OFD has no unpackmd method, do everything here */
lsm_size = lov_stripe_md_size(1);
LASSERT(*lsmp != NULL);
OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
OBD_FREE(*lsmp, lsm_size);
*lsmp = NULL;
RETURN(0);
}
static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
{
struct echo_object *eco = cl2echo_obj(lu2cl(obj));
struct echo_client_obd *ec = eco->eo_dev->ed_ec;
ENTRY;
LASSERT(atomic_read(&eco->eo_npages) == 0);
spin_lock(&ec->ec_lock);
list_del_init(&eco->eo_obj_chain);
spin_unlock(&ec->ec_lock);
lu_object_fini(obj);
lu_object_header_fini(obj->lo_header);
if (eco->eo_lsm)
echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
EXIT;
}
static int echo_object_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
struct echo_object *obj = cl2echo_obj(lu2cl(o));
return (*p)(env, cookie, "echoclient-object@%p", obj);
}
static const struct lu_object_operations echo_lu_obj_ops = {
.loo_object_init = echo_object_init,
.loo_object_delete = NULL,
.loo_object_release = NULL,
.loo_object_free = echo_object_free,
.loo_object_print = echo_object_print,
.loo_object_invariant = NULL
};
/** @} echo_lu_ops */
/** \defgroup echo_lu_dev_ops lu_device operations
*
* Operations for echo lu device.
*
* @{
*/
static struct lu_object *echo_object_alloc(const struct lu_env *env,
const struct lu_object_header *hdr,
struct lu_device *dev)
{
struct echo_object *eco;
struct lu_object *obj = NULL;
ENTRY;
/* we're the top dev. */
LASSERT(hdr == NULL);
OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, __GFP_IO);
if (eco != NULL) {
struct cl_object_header *hdr = &eco->eo_hdr;
obj = &echo_obj2cl(eco)->co_lu;
cl_object_header_init(hdr);
lu_object_init(obj, &hdr->coh_lu, dev);
lu_object_add_top(&hdr->coh_lu, obj);
eco->eo_cl.co_ops = &echo_cl_obj_ops;
obj->lo_ops = &echo_lu_obj_ops;
}
RETURN(obj);
}
static struct lu_device_operations echo_device_lu_ops = {
.ldo_object_alloc = echo_object_alloc,
};
/** @} echo_lu_dev_ops */
static struct cl_device_operations echo_device_cl_ops = {
};
/** \defgroup echo_init Setup and teardown
*
* Init and fini functions for echo client.
*
* @{
*/
static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
{
struct cl_site *site = &ed->ed_site_myself;
int rc;
/* initialize site */
rc = cl_site_init(site, &ed->ed_cl);
if (rc) {
CERROR("Cannot initilize site for echo client(%d)\n", rc);
return rc;
}
rc = lu_site_init_finish(&site->cs_lu);
if (rc)
return rc;
ed->ed_site = site;
return 0;
}
static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
{
if (ed->ed_site) {
if (!ed->ed_next_ismd)
cl_site_fini(ed->ed_site);
ed->ed_site = NULL;
}
}
static void *echo_thread_key_init(const struct lu_context *ctx,
struct lu_context_key *key)
{
struct echo_thread_info *info;
OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, __GFP_IO);
if (info == NULL)
info = ERR_PTR(-ENOMEM);
return info;
}
static void echo_thread_key_fini(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
struct echo_thread_info *info = data;
OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
}
static void echo_thread_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
}
static struct lu_context_key echo_thread_key = {
.lct_tags = LCT_CL_THREAD,
.lct_init = echo_thread_key_init,
.lct_fini = echo_thread_key_fini,
.lct_exit = echo_thread_key_exit
};
static void *echo_session_key_init(const struct lu_context *ctx,
struct lu_context_key *key)
{
struct echo_session_info *session;
OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, __GFP_IO);
if (session == NULL)
session = ERR_PTR(-ENOMEM);
return session;
}
static void echo_session_key_fini(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
struct echo_session_info *session = data;
OBD_SLAB_FREE_PTR(session, echo_session_kmem);
}
static void echo_session_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
}
static struct lu_context_key echo_session_key = {
.lct_tags = LCT_SESSION,
.lct_init = echo_session_key_init,
.lct_fini = echo_session_key_fini,
.lct_exit = echo_session_key_exit
};
LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
#define ECHO_SEQ_WIDTH 0xffffffff
static int echo_fid_init(struct echo_device *ed, char *obd_name,
struct seq_server_site *ss)
{
char *prefix;
int rc;
ENTRY;
OBD_ALLOC_PTR(ed->ed_cl_seq);
if (ed->ed_cl_seq == NULL)
RETURN(-ENOMEM);
OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
if (prefix == NULL)
GOTO(out_free_seq, rc = -ENOMEM);
snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", obd_name);
/* Init client side sequence-manager */
rc = seq_client_init(ed->ed_cl_seq, NULL,
LUSTRE_SEQ_METADATA,
prefix, ss->ss_server_seq);
ed->ed_cl_seq->lcs_width = ECHO_SEQ_WIDTH;
OBD_FREE(prefix, MAX_OBD_NAME + 5);
if (rc)
GOTO(out_free_seq, rc);
RETURN(0);
out_free_seq:
OBD_FREE_PTR(ed->ed_cl_seq);
ed->ed_cl_seq = NULL;
RETURN(rc);
}
static int echo_fid_fini(struct obd_device *obddev)
{
struct echo_device *ed = obd2echo_dev(obddev);
ENTRY;
if (ed->ed_cl_seq != NULL) {
seq_client_fini(ed->ed_cl_seq);
OBD_FREE_PTR(ed->ed_cl_seq);
ed->ed_cl_seq = NULL;
}
RETURN(0);
}
static struct lu_device *echo_device_alloc(const struct lu_env *env,
struct lu_device_type *t,
struct lustre_cfg *cfg)
{
struct lu_device *next;
struct echo_device *ed;
struct cl_device *cd;
struct obd_device *obd = NULL; /* to keep compiler happy */
struct obd_device *tgt;
const char *tgt_type_name;
int rc;
int cleanup = 0;
ENTRY;
OBD_ALLOC_PTR(ed);
if (ed == NULL)
GOTO(out, rc = -ENOMEM);
cleanup = 1;
cd = &ed->ed_cl;
rc = cl_device_init(cd, t);
if (rc)
GOTO(out, rc);
cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
cd->cd_ops = &echo_device_cl_ops;
cleanup = 2;
obd = class_name2obd(lustre_cfg_string(cfg, 0));
LASSERT(obd != NULL);
LASSERT(env != NULL);
tgt = class_name2obd(lustre_cfg_string(cfg, 1));
if (tgt == NULL) {
CERROR("Can not find tgt device %s\n",
lustre_cfg_string(cfg, 1));
GOTO(out, rc = -ENODEV);
}
next = tgt->obd_lu_dev;
if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
ed->ed_next_ismd = 1;
} else {
ed->ed_next_ismd = 0;
rc = echo_site_init(env, ed);
if (rc)
GOTO(out, rc);
}
cleanup = 3;
rc = echo_client_setup(env, obd, cfg);
if (rc)
GOTO(out, rc);
ed->ed_ec = &obd->u.echo_client;
cleanup = 4;
if (ed->ed_next_ismd) {
/* Suppose to connect to some Metadata layer */
struct lu_site *ls;
struct lu_device *ld;
int found = 0;
if (next == NULL) {
CERROR("%s is not lu device type!\n",
lustre_cfg_string(cfg, 1));
GOTO(out, rc = -EINVAL);
}
tgt_type_name = lustre_cfg_string(cfg, 2);
if (!tgt_type_name) {
CERROR("%s no type name for echo %s setup\n",
lustre_cfg_string(cfg, 1),
tgt->obd_type->typ_name);
GOTO(out, rc = -EINVAL);
}
ls = next->ld_site;
spin_lock(&ls->ls_ld_lock);
list_for_each_entry(ld, &ls->ls_ld_linkage, ld_linkage) {
if (strcmp(ld->ld_type->ldt_name, tgt_type_name) == 0) {
found = 1;
break;
}
}
spin_unlock(&ls->ls_ld_lock);
if (found == 0) {
CERROR("%s is not lu device type!\n",
lustre_cfg_string(cfg, 1));
GOTO(out, rc = -EINVAL);
}
next = ld;
/* For MD echo client, it will use the site in MDS stack */
ed->ed_site_myself.cs_lu = *ls;
ed->ed_site = &ed->ed_site_myself;
ed->ed_cl.cd_lu_dev.ld_site = &ed->ed_site_myself.cs_lu;
rc = echo_fid_init(ed, obd->obd_name, lu_site2seq(ls));
if (rc) {
CERROR("echo fid init error %d\n", rc);
GOTO(out, rc);
}
} else {
/* if echo client is to be stacked upon ost device, the next is
* NULL since ost is not a clio device so far */
if (next != NULL && !lu_device_is_cl(next))
next = NULL;
tgt_type_name = tgt->obd_type->typ_name;
if (next != NULL) {
LASSERT(next != NULL);
if (next->ld_site != NULL)
GOTO(out, rc = -EBUSY);
next->ld_site = &ed->ed_site->cs_lu;
rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
next->ld_type->ldt_name,
NULL);
if (rc)
GOTO(out, rc);
/* Tricky case, I have to determine the obd type since
* CLIO uses the different parameters to initialize
* objects for lov & osc. */
if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
ed->ed_next_islov = 1;
else
LASSERT(strcmp(tgt_type_name,
LUSTRE_OSC_NAME) == 0);
} else
LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
}
ed->ed_next = next;
RETURN(&cd->cd_lu_dev);
out:
switch(cleanup) {
case 4: {
int rc2;
rc2 = echo_client_cleanup(obd);
if (rc2)
CERROR("Cleanup obd device %s error(%d)\n",
obd->obd_name, rc2);
}
case 3:
echo_site_fini(env, ed);
case 2:
cl_device_fini(&ed->ed_cl);
case 1:
OBD_FREE_PTR(ed);
case 0:
default:
break;
}
return(ERR_PTR(rc));
}
static int echo_device_init(const struct lu_env *env, struct lu_device *d,
const char *name, struct lu_device *next)
{
LBUG();
return 0;
}
static struct lu_device *echo_device_fini(const struct lu_env *env,
struct lu_device *d)
{
struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
struct lu_device *next = ed->ed_next;
while (next && !ed->ed_next_ismd)
next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
return NULL;
}
static void echo_lock_release(const struct lu_env *env,
struct echo_lock *ecl,
int still_used)
{
struct cl_lock *clk = echo_lock2cl(ecl);
cl_lock_get(clk);
cl_unuse(env, clk);
cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
if (!still_used) {
cl_lock_mutex_get(env, clk);
cl_lock_cancel(env, clk);
cl_lock_delete(env, clk);
cl_lock_mutex_put(env, clk);
}
cl_lock_put(env, clk);
}
static struct lu_device *echo_device_free(const struct lu_env *env,
struct lu_device *d)
{
struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
struct echo_client_obd *ec = ed->ed_ec;
struct echo_object *eco;
struct lu_device *next = ed->ed_next;
CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
ed, next);
lu_site_purge(env, &ed->ed_site->cs_lu, -1);
/* check if there are objects still alive.
* It shouldn't have any object because lu_site_purge would cleanup
* all of cached objects. Anyway, probably the echo device is being
* parallelly accessed.
*/
spin_lock(&ec->ec_lock);
list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
eco->eo_deleted = 1;
spin_unlock(&ec->ec_lock);
/* purge again */
lu_site_purge(env, &ed->ed_site->cs_lu, -1);
CDEBUG(D_INFO,
"Waiting for the reference of echo object to be dropped\n");
/* Wait for the last reference to be dropped. */
spin_lock(&ec->ec_lock);
while (!list_empty(&ec->ec_objects)) {
spin_unlock(&ec->ec_lock);
CERROR("echo_client still has objects at cleanup time, "
"wait for 1 second\n");
schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
cfs_time_seconds(1));
lu_site_purge(env, &ed->ed_site->cs_lu, -1);
spin_lock(&ec->ec_lock);
}
spin_unlock(&ec->ec_lock);
LASSERT(list_empty(&ec->ec_locks));
CDEBUG(D_INFO, "No object exists, exiting...\n");
echo_client_cleanup(d->ld_obd);
echo_fid_fini(d->ld_obd);
while (next && !ed->ed_next_ismd)
next = next->ld_type->ldt_ops->ldto_device_free(env, next);
LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
echo_site_fini(env, ed);
cl_device_fini(&ed->ed_cl);
OBD_FREE_PTR(ed);
return NULL;
}
static const struct lu_device_type_operations echo_device_type_ops = {
.ldto_init = echo_type_init,
.ldto_fini = echo_type_fini,
.ldto_start = echo_type_start,
.ldto_stop = echo_type_stop,
.ldto_device_alloc = echo_device_alloc,
.ldto_device_free = echo_device_free,
.ldto_device_init = echo_device_init,
.ldto_device_fini = echo_device_fini
};
static struct lu_device_type echo_device_type = {
.ldt_tags = LU_DEVICE_CL,
.ldt_name = LUSTRE_ECHO_CLIENT_NAME,
.ldt_ops = &echo_device_type_ops,
.ldt_ctx_tags = LCT_CL_THREAD | LCT_MD_THREAD | LCT_DT_THREAD,
};
/** @} echo_init */
/** \defgroup echo_exports Exported operations
*
* exporting functions to echo client
*
* @{
*/
/* Interfaces to echo client obd device */
static struct echo_object *cl_echo_object_find(struct echo_device *d,
struct lov_stripe_md **lsmp)
{
struct lu_env *env;
struct echo_thread_info *info;
struct echo_object_conf *conf;
struct lov_stripe_md *lsm;
struct echo_object *eco;
struct cl_object *obj;
struct lu_fid *fid;
int refcheck;
int rc;
ENTRY;
LASSERT(lsmp);
lsm = *lsmp;
LASSERT(lsm);
LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
POSTID(&lsm->lsm_oi));
staging: add Lustre file system client support Lustre is the most deployed distributed file system in the HPC (High Performance Computing) world. The patch adds its client side support. The code is not very clean and needs to live in drivers/staging for some time for continuing cleanup work. See drivers/staging/lustre/TODO for details. The code is based on Lustre master commit faefbfc04 commit faefbfc0460bc00f2ee4c1c1c86aa1e39b9eea49 Author: Alex Zhuravlev <alexey.zhuravlev@intel.com> Date: Tue Apr 30 23:05:21 2013 +0400 LU-3244 utils: tunefs.lustre should preserve virgin label Plus a few under-review patches on Whamcloud gerrit: 3.8 kernel support: http://review.whamcloud.com/#change,5973 http://review.whamcloud.com/#change,5974 http://review.whamcloud.com/#change,5768 http://review.whamcloud.com/#change,5781 http://review.whamcloud.com/#change,5763 http://review.whamcloud.com/#change,5613 http://review.whamcloud.com/#change,5655 3.9 kernel support: http://review.whamcloud.com/#change,5898 http://review.whamcloud.com/#change,5899 Kconfig/Kbuild: http://review.whamcloud.com/#change,4646 http://review.whamcloud.com/#change,4644 libcfs cleanup: http://review.whamcloud.com/#change,2831 http://review.whamcloud.com/#change,4775 http://review.whamcloud.com/#change,4776 http://review.whamcloud.com/#change,4777 http://review.whamcloud.com/#change,4778 http://review.whamcloud.com/#change,4779 http://review.whamcloud.com/#change,4780 All starting/trailing whitespaces are removed, to match kernel coding style. Also ran scripts/cleanfile on all lustre source files. [maked the Kconfig depend on BROKEN as the recent procfs changes causes this to fail - gregkh] Signed-off-by: Peng Tao <tao.peng@emc.com> Signed-off-by: Andreas Dilger <andreas.dilger@intel.com> Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
2013-05-02 02:46:55 -06:00
/* Never return an object if the obd is to be freed. */
if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
RETURN(ERR_PTR(-ENODEV));
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN((void *)env);
info = echo_env_info(env);
conf = &info->eti_conf;
if (d->ed_next) {
if (!d->ed_next_islov) {
struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
LASSERT(oinfo != NULL);
oinfo->loi_oi = lsm->lsm_oi;
conf->eoc_cl.u.coc_oinfo = oinfo;
} else {
struct lustre_md *md;
md = &info->eti_md;
memset(md, 0, sizeof *md);
md->lsm = lsm;
conf->eoc_cl.u.coc_md = md;
}
}
conf->eoc_md = lsmp;
fid = &info->eti_fid;
rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
if (rc != 0)
GOTO(out, eco = ERR_PTR(rc));
/* In the function below, .hs_keycmp resolves to
* lu_obj_hop_keycmp() */
/* coverity[overrun-buffer-val] */
obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
if (IS_ERR(obj))
GOTO(out, eco = (void*)obj);
eco = cl2echo_obj(obj);
if (eco->eo_deleted) {
cl_object_put(env, obj);
eco = ERR_PTR(-EAGAIN);
}
out:
cl_env_put(env, &refcheck);
RETURN(eco);
}
static int cl_echo_object_put(struct echo_object *eco)
{
struct lu_env *env;
struct cl_object *obj = echo_obj2cl(eco);
int refcheck;
ENTRY;
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
/* an external function to kill an object? */
if (eco->eo_deleted) {
struct lu_object_header *loh = obj->co_lu.lo_header;
LASSERT(&eco->eo_hdr == luh2coh(loh));
set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
}
cl_object_put(env, obj);
cl_env_put(env, &refcheck);
RETURN(0);
}
static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
obd_off start, obd_off end, int mode,
__u64 *cookie , __u32 enqflags)
{
struct cl_io *io;
struct cl_lock *lck;
struct cl_object *obj;
struct cl_lock_descr *descr;
struct echo_thread_info *info;
int rc = -ENOMEM;
ENTRY;
info = echo_env_info(env);
io = &info->eti_io;
descr = &info->eti_descr;
obj = echo_obj2cl(eco);
descr->cld_obj = obj;
descr->cld_start = cl_index(obj, start);
descr->cld_end = cl_index(obj, end);
descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
descr->cld_enq_flags = enqflags;
io->ci_obj = obj;
lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
if (lck) {
struct echo_client_obd *ec = eco->eo_dev->ed_ec;
struct echo_lock *el;
rc = cl_wait(env, lck);
if (rc == 0) {
el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
spin_lock(&ec->ec_lock);
if (list_empty(&el->el_chain)) {
list_add(&el->el_chain, &ec->ec_locks);
el->el_cookie = ++ec->ec_unique;
}
atomic_inc(&el->el_refcount);
*cookie = el->el_cookie;
spin_unlock(&ec->ec_lock);
} else {
cl_lock_release(env, lck, "ec enqueue", current);
}
}
RETURN(rc);
}
static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
int mode, __u64 *cookie)
{
struct echo_thread_info *info;
struct lu_env *env;
struct cl_io *io;
int refcheck;
int result;
ENTRY;
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
info = echo_env_info(env);
io = &info->eti_io;
io->ci_ignore_layout = 1;
result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
if (result < 0)
GOTO(out, result);
LASSERT(result == 0);
result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
cl_io_fini(env, io);
EXIT;
out:
cl_env_put(env, &refcheck);
return result;
}
static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
__u64 cookie)
{
struct echo_client_obd *ec = ed->ed_ec;
struct echo_lock *ecl = NULL;
struct list_head *el;
int found = 0, still_used = 0;
ENTRY;
LASSERT(ec != NULL);
spin_lock(&ec->ec_lock);
list_for_each (el, &ec->ec_locks) {
ecl = list_entry (el, struct echo_lock, el_chain);
CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
found = (ecl->el_cookie == cookie);
if (found) {
if (atomic_dec_and_test(&ecl->el_refcount))
list_del_init(&ecl->el_chain);
else
still_used = 1;
break;
}
}
spin_unlock(&ec->ec_lock);
if (!found)
RETURN(-ENOENT);
echo_lock_release(env, ecl, still_used);
RETURN(0);
}
static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
{
struct lu_env *env;
int refcheck;
int rc;
ENTRY;
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
rc = cl_echo_cancel0(env, ed, cookie);
cl_env_put(env, &refcheck);
RETURN(rc);
}
static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
enum cl_req_type unused, struct cl_2queue *queue)
{
struct cl_page *clp;
struct cl_page *temp;
int result = 0;
ENTRY;
cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
int rc;
rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
if (rc == 0)
continue;
result = result ?: rc;
}
RETURN(result);
}
static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
struct page **pages, int npages, int async)
{
struct lu_env *env;
struct echo_thread_info *info;
struct cl_object *obj = echo_obj2cl(eco);
struct echo_device *ed = eco->eo_dev;
struct cl_2queue *queue;
struct cl_io *io;
struct cl_page *clp;
struct lustre_handle lh = { 0 };
int page_size = cl_page_size(obj);
int refcheck;
int rc;
int i;
ENTRY;
LASSERT((offset & ~CFS_PAGE_MASK) == 0);
LASSERT(ed->ed_next != NULL);
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
info = echo_env_info(env);
io = &info->eti_io;
queue = &info->eti_queue;
cl_2queue_init(queue);
io->ci_ignore_layout = 1;
rc = cl_io_init(env, io, CIT_MISC, obj);
if (rc < 0)
GOTO(out, rc);
LASSERT(rc == 0);
rc = cl_echo_enqueue0(env, eco, offset,
offset + npages * PAGE_CACHE_SIZE - 1,
rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
CEF_NEVER);
if (rc < 0)
GOTO(error_lock, rc);
for (i = 0; i < npages; i++) {
LASSERT(pages[i]);
clp = cl_page_find(env, obj, cl_index(obj, offset),
pages[i], CPT_TRANSIENT);
if (IS_ERR(clp)) {
rc = PTR_ERR(clp);
break;
}
LASSERT(clp->cp_type == CPT_TRANSIENT);
rc = cl_page_own(env, io, clp);
if (rc) {
LASSERT(clp->cp_state == CPS_FREEING);
cl_page_put(env, clp);
break;
}
cl_2queue_add(queue, clp);
/* drop the reference count for cl_page_find, so that the page
* will be freed in cl_2queue_fini. */
cl_page_put(env, clp);
cl_page_clip(env, clp, 0, page_size);
offset += page_size;
}
if (rc == 0) {
enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
async = async && (typ == CRT_WRITE);
if (async)
rc = cl_echo_async_brw(env, io, typ, queue);
else
rc = cl_io_submit_sync(env, io, typ, queue, 0);
CDEBUG(D_INFO, "echo_client %s write returns %d\n",
async ? "async" : "sync", rc);
}
cl_echo_cancel0(env, ed, lh.cookie);
EXIT;
error_lock:
cl_2queue_discard(env, io, queue);
cl_2queue_disown(env, io, queue);
cl_2queue_fini(env, queue);
cl_io_fini(env, io);
out:
cl_env_put(env, &refcheck);
return rc;
}
/** @} echo_exports */
static obd_id last_object_id;
static int
echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
{
struct lov_stripe_md *ulsm = _ulsm;
int nob, i;
nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
if (nob > ulsm_nob)
return (-EINVAL);
if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
return (-EFAULT);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
sizeof(lsm->lsm_oinfo[0])))
return (-EFAULT);
}
return 0;
}
static int
echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
void *ulsm, int ulsm_nob)
{
struct echo_client_obd *ec = ed->ed_ec;
int i;
if (ulsm_nob < sizeof (*lsm))
return (-EINVAL);
if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
return (-EFAULT);
if (lsm->lsm_stripe_count > ec->ec_nstripes ||
lsm->lsm_magic != LOV_MAGIC ||
(lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
return (-EINVAL);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
if (copy_from_user(lsm->lsm_oinfo[i],
((struct lov_stripe_md *)ulsm)-> \
lsm_oinfo[i],
sizeof(lsm->lsm_oinfo[0])))
return (-EFAULT);
}
return (0);
}
static inline void echo_md_build_name(struct lu_name *lname, char *name,
__u64 id)
{
sprintf(name, LPU64, id);
lname->ln_name = name;
lname->ln_namelen = strlen(name);
}
/* similar to mdt_attr_get_complex */
static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o,
struct md_attr *ma)
{
struct echo_thread_info *info = echo_env_info(env);
int rc;
ENTRY;
LASSERT(ma->ma_lmm_size > 0);
rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV);
if (rc < 0)
RETURN(rc);
/* big_lmm may need to be grown */
if (info->eti_big_lmmsize < rc) {
int size = size_roundup_power2(rc);
if (info->eti_big_lmmsize > 0) {
/* free old buffer */
LASSERT(info->eti_big_lmm);
OBD_FREE_LARGE(info->eti_big_lmm,
info->eti_big_lmmsize);
info->eti_big_lmm = NULL;
info->eti_big_lmmsize = 0;
}
OBD_ALLOC_LARGE(info->eti_big_lmm, size);
if (info->eti_big_lmm == NULL)
RETURN(-ENOMEM);
info->eti_big_lmmsize = size;
}
LASSERT(info->eti_big_lmmsize >= rc);
info->eti_buf.lb_buf = info->eti_big_lmm;
info->eti_buf.lb_len = info->eti_big_lmmsize;
rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV);
if (rc < 0)
RETURN(rc);
ma->ma_valid |= MA_LOV;
ma->ma_lmm = info->eti_big_lmm;
ma->ma_lmm_size = rc;
RETURN(0);
}
int echo_attr_get_complex(const struct lu_env *env, struct md_object *next,
struct md_attr *ma)
{
struct echo_thread_info *info = echo_env_info(env);
struct lu_buf *buf = &info->eti_buf;
umode_t mode = lu_object_attr(&next->mo_lu);
int need = ma->ma_need;
int rc = 0, rc2;
ENTRY;
ma->ma_valid = 0;
if (need & MA_INODE) {
ma->ma_need = MA_INODE;
rc = mo_attr_get(env, next, ma);
if (rc)
GOTO(out, rc);
ma->ma_valid |= MA_INODE;
}
if (need & MA_LOV) {
if (S_ISREG(mode) || S_ISDIR(mode)) {
LASSERT(ma->ma_lmm_size > 0);
buf->lb_buf = ma->ma_lmm;
buf->lb_len = ma->ma_lmm_size;
rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV);
if (rc2 > 0) {
ma->ma_lmm_size = rc2;
ma->ma_valid |= MA_LOV;
} else if (rc2 == -ENODATA) {
/* no LOV EA */
ma->ma_lmm_size = 0;
} else if (rc2 == -ERANGE) {
rc2 = echo_big_lmm_get(env, next, ma);
if (rc2 < 0)
GOTO(out, rc = rc2);
} else {
GOTO(out, rc = rc2);
}
}
}
#ifdef CONFIG_FS_POSIX_ACL
if (need & MA_ACL_DEF && S_ISDIR(mode)) {
buf->lb_buf = ma->ma_acl;
buf->lb_len = ma->ma_acl_size;
rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
if (rc2 > 0) {
ma->ma_acl_size = rc2;
ma->ma_valid |= MA_ACL_DEF;
} else if (rc2 == -ENODATA) {
/* no ACLs */
ma->ma_acl_size = 0;
} else {
GOTO(out, rc = rc2);
}
}
#endif
out:
ma->ma_need = need;
CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
rc, ma->ma_valid, ma->ma_lmm);
RETURN(rc);
}
static int
echo_md_create_internal(const struct lu_env *env, struct echo_device *ed,
struct md_object *parent, struct lu_fid *fid,
struct lu_name *lname, struct md_op_spec *spec,
struct md_attr *ma)
{
struct lu_object *ec_child, *child;
struct lu_device *ld = ed->ed_next;
struct echo_thread_info *info = echo_env_info(env);
struct lu_fid *fid2 = &info->eti_fid2;
struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
int rc;
ENTRY;
rc = mdo_lookup(env, parent, lname, fid2, spec);
if (rc == 0)
return -EEXIST;
else if (rc != -ENOENT)
return rc;
ec_child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev,
fid, &conf);
if (IS_ERR(ec_child)) {
CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid),
PTR_ERR(ec_child));
RETURN(PTR_ERR(ec_child));
}
child = lu_object_locate(ec_child->lo_header, ld->ld_type);
if (child == NULL) {
CERROR("Can not locate the child "DFID"\n", PFID(fid));
GOTO(out_put, rc = -EINVAL);
}
CDEBUG(D_RPCTRACE, "Start creating object "DFID" %s %p\n",
PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
/*
* Do not perform lookup sanity check. We know that name does not exist.
*/
spec->sp_cr_lookup = 0;
rc = mdo_create(env, parent, lname, lu2md(child), spec, ma);
if (rc) {
CERROR("Can not create child "DFID": rc = %d\n", PFID(fid), rc);
GOTO(out_put, rc);
}
CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n",
PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc);
EXIT;
out_put:
lu_object_put(env, ec_child);
return rc;
}
static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld,
struct md_attr *ma)
{
struct echo_thread_info *info = echo_env_info(env);
if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
ma->ma_lmm = (void *)&info->eti_lmm;
ma->ma_lmm_size = sizeof(info->eti_lmm);
} else {
LASSERT(info->eti_big_lmmsize);
ma->ma_lmm = info->eti_big_lmm;
ma->ma_lmm_size = info->eti_big_lmmsize;
}
return 0;
}
static int echo_create_md_object(const struct lu_env *env,
struct echo_device *ed,
struct lu_object *ec_parent,
struct lu_fid *fid,
char *name, int namelen,
__u64 id, __u32 mode, int count,
int stripe_count, int stripe_offset)
{
struct lu_object *parent;
struct echo_thread_info *info = echo_env_info(env);
struct lu_name *lname = &info->eti_lname;
struct md_op_spec *spec = &info->eti_spec;
struct md_attr *ma = &info->eti_ma;
struct lu_device *ld = ed->ed_next;
int rc = 0;
int i;
ENTRY;
if (ec_parent == NULL)
return -1;
parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
if (parent == NULL)
RETURN(-ENXIO);
memset(ma, 0, sizeof(*ma));
memset(spec, 0, sizeof(*spec));
if (stripe_count != 0) {
spec->sp_cr_flags |= FMODE_WRITE;
echo_set_lmm_size(env, ld, ma);
if (stripe_count != -1) {
struct lov_user_md_v3 *lum = &info->eti_lum;
lum->lmm_magic = LOV_USER_MAGIC_V3;
lum->lmm_stripe_count = stripe_count;
lum->lmm_stripe_offset = stripe_offset;
lum->lmm_pattern = 0;
spec->u.sp_ea.eadata = lum;
spec->u.sp_ea.eadatalen = sizeof(*lum);
spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
}
}
ma->ma_attr.la_mode = mode;
ma->ma_attr.la_valid = LA_CTIME | LA_MODE;
ma->ma_attr.la_ctime = cfs_time_current_64();
if (name != NULL) {
lname->ln_name = name;
lname->ln_namelen = namelen;
/* If name is specified, only create one object by name */
rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
spec, ma);
RETURN(rc);
}
/* Create multiple object sequenced by id */
for (i = 0; i < count; i++) {
char *tmp_name = info->eti_name;
echo_md_build_name(lname, tmp_name, id);
rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname,
spec, ma);
if (rc) {
CERROR("Can not create child %s: rc = %d\n", tmp_name,
rc);
break;
}
id++;
fid->f_oid++;
}
RETURN(rc);
}
static struct lu_object *echo_md_lookup(const struct lu_env *env,
struct echo_device *ed,
struct md_object *parent,
struct lu_name *lname)
{
struct echo_thread_info *info = echo_env_info(env);
struct lu_fid *fid = &info->eti_fid;
struct lu_object *child;
int rc;
ENTRY;
CDEBUG(D_INFO, "lookup %s in parent "DFID" %p\n", lname->ln_name,
PFID(fid), parent);
rc = mdo_lookup(env, parent, lname, fid, NULL);
if (rc) {
CERROR("lookup %s: rc = %d\n", lname->ln_name, rc);
RETURN(ERR_PTR(rc));
}
/* In the function below, .hs_keycmp resolves to
* lu_obj_hop_keycmp() */
/* coverity[overrun-buffer-val] */
child = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
RETURN(child);
}
static int echo_setattr_object(const struct lu_env *env,
struct echo_device *ed,
struct lu_object *ec_parent,
__u64 id, int count)
{
struct lu_object *parent;
struct echo_thread_info *info = echo_env_info(env);
struct lu_name *lname = &info->eti_lname;
char *name = info->eti_name;
struct lu_device *ld = ed->ed_next;
struct lu_buf *buf = &info->eti_buf;
int rc = 0;
int i;
ENTRY;
if (ec_parent == NULL)
return -1;
parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
if (parent == NULL)
RETURN(-ENXIO);
for (i = 0; i < count; i++) {
struct lu_object *ec_child, *child;
echo_md_build_name(lname, name, id);
ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
if (IS_ERR(ec_child)) {
CERROR("Can't find child %s: rc = %ld\n",
lname->ln_name, PTR_ERR(ec_child));
RETURN(PTR_ERR(ec_child));
}
child = lu_object_locate(ec_child->lo_header, ld->ld_type);
if (child == NULL) {
CERROR("Can not locate the child %s\n", lname->ln_name);
lu_object_put(env, ec_child);
rc = -EINVAL;
break;
}
CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n",
PFID(lu_object_fid(child)));
buf->lb_buf = info->eti_xattr_buf;
buf->lb_len = sizeof(info->eti_xattr_buf);
sprintf(name, "%s.test1", XATTR_USER_PREFIX);
rc = mo_xattr_set(env, lu2md(child), buf, name,
LU_XATTR_CREATE);
if (rc < 0) {
CERROR("Can not setattr child "DFID": rc = %d\n",
PFID(lu_object_fid(child)), rc);
lu_object_put(env, ec_child);
break;
}
CDEBUG(D_RPCTRACE, "End setattr object "DFID"\n",
PFID(lu_object_fid(child)));
id++;
lu_object_put(env, ec_child);
}
RETURN(rc);
}
static int echo_getattr_object(const struct lu_env *env,
struct echo_device *ed,
struct lu_object *ec_parent,
__u64 id, int count)
{
struct lu_object *parent;
struct echo_thread_info *info = echo_env_info(env);
struct lu_name *lname = &info->eti_lname;
char *name = info->eti_name;
struct md_attr *ma = &info->eti_ma;
struct lu_device *ld = ed->ed_next;
int rc = 0;
int i;
ENTRY;
if (ec_parent == NULL)
return -1;
parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
if (parent == NULL)
RETURN(-ENXIO);
memset(ma, 0, sizeof(*ma));
ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF;
ma->ma_acl = info->eti_xattr_buf;
ma->ma_acl_size = sizeof(info->eti_xattr_buf);
for (i = 0; i < count; i++) {
struct lu_object *ec_child, *child;
ma->ma_valid = 0;
echo_md_build_name(lname, name, id);
echo_set_lmm_size(env, ld, ma);
ec_child = echo_md_lookup(env, ed, lu2md(parent), lname);
if (IS_ERR(ec_child)) {
CERROR("Can't find child %s: rc = %ld\n",
lname->ln_name, PTR_ERR(ec_child));
RETURN(PTR_ERR(ec_child));
}
child = lu_object_locate(ec_child->lo_header, ld->ld_type);
if (child == NULL) {
CERROR("Can not locate the child %s\n", lname->ln_name);
lu_object_put(env, ec_child);
RETURN(-EINVAL);
}
CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n",
PFID(lu_object_fid(child)));
rc = echo_attr_get_complex(env, lu2md(child), ma);
if (rc) {
CERROR("Can not getattr child "DFID": rc = %d\n",
PFID(lu_object_fid(child)), rc);
lu_object_put(env, ec_child);
break;
}
CDEBUG(D_RPCTRACE, "End getattr object "DFID"\n",
PFID(lu_object_fid(child)));
id++;
lu_object_put(env, ec_child);
}
RETURN(rc);
}
static int echo_lookup_object(const struct lu_env *env,
struct echo_device *ed,
struct lu_object *ec_parent,
__u64 id, int count)
{
struct lu_object *parent;
struct echo_thread_info *info = echo_env_info(env);
struct lu_name *lname = &info->eti_lname;
char *name = info->eti_name;
struct lu_fid *fid = &info->eti_fid;
struct lu_device *ld = ed->ed_next;
int rc = 0;
int i;
if (ec_parent == NULL)
return -1;
parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
if (parent == NULL)
return -ENXIO;
/*prepare the requests*/
for (i = 0; i < count; i++) {
echo_md_build_name(lname, name, id);
CDEBUG(D_RPCTRACE, "Start lookup object "DFID" %s %p\n",
PFID(lu_object_fid(parent)), lname->ln_name, parent);
rc = mdo_lookup(env, lu2md(parent), lname, fid, NULL);
if (rc) {
CERROR("Can not lookup child %s: rc = %d\n", name, rc);
break;
}
CDEBUG(D_RPCTRACE, "End lookup object "DFID" %s %p\n",
PFID(lu_object_fid(parent)), lname->ln_name, parent);
id++;
}
return rc;
}
static int echo_md_destroy_internal(const struct lu_env *env,
struct echo_device *ed,
struct md_object *parent,
struct lu_name *lname,
struct md_attr *ma)
{
struct lu_device *ld = ed->ed_next;
struct lu_object *ec_child;
struct lu_object *child;
int rc;
ENTRY;
ec_child = echo_md_lookup(env, ed, parent, lname);
if (IS_ERR(ec_child)) {
CERROR("Can't find child %s: rc = %ld\n", lname->ln_name,
PTR_ERR(ec_child));
RETURN(PTR_ERR(ec_child));
}
child = lu_object_locate(ec_child->lo_header, ld->ld_type);
if (child == NULL) {
CERROR("Can not locate the child %s\n", lname->ln_name);
GOTO(out_put, rc = -EINVAL);
}
CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
rc = mdo_unlink(env, parent, lu2md(child), lname, ma, 0);
if (rc) {
CERROR("Can not unlink child %s: rc = %d\n",
lname->ln_name, rc);
GOTO(out_put, rc);
}
CDEBUG(D_RPCTRACE, "End destroy object "DFID" %s %p\n",
PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
out_put:
lu_object_put(env, ec_child);
return rc;
}
static int echo_destroy_object(const struct lu_env *env,
struct echo_device *ed,
struct lu_object *ec_parent,
char *name, int namelen,
__u64 id, __u32 mode,
int count)
{
struct echo_thread_info *info = echo_env_info(env);
struct lu_name *lname = &info->eti_lname;
struct md_attr *ma = &info->eti_ma;
struct lu_device *ld = ed->ed_next;
struct lu_object *parent;
int rc = 0;
int i;
ENTRY;
parent = lu_object_locate(ec_parent->lo_header, ld->ld_type);
if (parent == NULL)
RETURN(-EINVAL);
memset(ma, 0, sizeof(*ma));
ma->ma_attr.la_mode = mode;
ma->ma_attr.la_valid = LA_CTIME;
ma->ma_attr.la_ctime = cfs_time_current_64();
ma->ma_need = MA_INODE;
ma->ma_valid = 0;
if (name != NULL) {
lname->ln_name = name;
lname->ln_namelen = namelen;
rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
ma);
RETURN(rc);
}
/*prepare the requests*/
for (i = 0; i < count; i++) {
char *tmp_name = info->eti_name;
ma->ma_valid = 0;
echo_md_build_name(lname, tmp_name, id);
rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname,
ma);
if (rc) {
CERROR("Can not unlink child %s: rc = %d\n", name, rc);
break;
}
id++;
}
RETURN(rc);
}
static struct lu_object *echo_resolve_path(const struct lu_env *env,
struct echo_device *ed, char *path,
int path_len)
{
struct lu_device *ld = ed->ed_next;
struct md_device *md = lu2md_dev(ld);
struct echo_thread_info *info = echo_env_info(env);
struct lu_fid *fid = &info->eti_fid;
struct lu_name *lname = &info->eti_lname;
struct lu_object *parent = NULL;
struct lu_object *child = NULL;
int rc = 0;
ENTRY;
/*Only support MDD layer right now*/
rc = md->md_ops->mdo_root_get(env, md, fid);
if (rc) {
CERROR("get root error: rc = %d\n", rc);
RETURN(ERR_PTR(rc));
}
/* In the function below, .hs_keycmp resolves to
* lu_obj_hop_keycmp() */
/* coverity[overrun-buffer-val] */
parent = lu_object_find_at(env, &ed->ed_cl.cd_lu_dev, fid, NULL);
if (IS_ERR(parent)) {
CERROR("Can not find the parent "DFID": rc = %ld\n",
PFID(fid), PTR_ERR(parent));
RETURN(parent);
}
while (1) {
struct lu_object *ld_parent;
char *e;
e = strsep(&path, "/");
if (e == NULL)
break;
if (e[0] == 0) {
if (!path || path[0] == '\0')
break;
continue;
}
lname->ln_name = e;
lname->ln_namelen = strlen(e);
ld_parent = lu_object_locate(parent->lo_header, ld->ld_type);
if (ld_parent == NULL) {
lu_object_put(env, parent);
rc = -EINVAL;
break;
}
child = echo_md_lookup(env, ed, lu2md(ld_parent), lname);
lu_object_put(env, parent);
if (IS_ERR(child)) {
rc = (int)PTR_ERR(child);
CERROR("lookup %s under parent "DFID": rc = %d\n",
lname->ln_name, PFID(lu_object_fid(ld_parent)),
rc);
break;
}
parent = child;
}
if (rc)
RETURN(ERR_PTR(rc));
RETURN(parent);
}
static void echo_ucred_init(struct lu_env *env)
{
struct lu_ucred *ucred = lu_ucred(env);
ucred->uc_valid = UCRED_INVALID;
ucred->uc_suppgids[0] = -1;
ucred->uc_suppgids[1] = -1;
ucred->uc_uid = ucred->uc_o_uid = current_uid();
ucred->uc_gid = ucred->uc_o_gid = current_gid();
ucred->uc_fsuid = ucred->uc_o_fsuid = current_fsuid();
ucred->uc_fsgid = ucred->uc_o_fsgid = current_fsgid();
ucred->uc_cap = cfs_curproc_cap_pack();
/* remove fs privilege for non-root user. */
if (ucred->uc_fsuid)
ucred->uc_cap &= ~CFS_CAP_FS_MASK;
ucred->uc_valid = UCRED_NEW;
}
static void echo_ucred_fini(struct lu_env *env)
{
struct lu_ucred *ucred = lu_ucred(env);
ucred->uc_valid = UCRED_INIT;
}
#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION)
static int echo_md_handler(struct echo_device *ed, int command,
char *path, int path_len, __u64 id, int count,
struct obd_ioctl_data *data)
{
struct echo_thread_info *info;
struct lu_device *ld = ed->ed_next;
struct lu_env *env;
int refcheck;
struct lu_object *parent;
char *name = NULL;
int namelen = data->ioc_plen2;
int rc = 0;
ENTRY;
if (ld == NULL) {
CERROR("MD echo client is not being initialized properly\n");
RETURN(-EINVAL);
}
if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) {
CERROR("Only support MDD layer right now!\n");
RETURN(-EINVAL);
}
env = cl_env_get(&refcheck);
if (IS_ERR(env))
RETURN(PTR_ERR(env));
rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
if (rc != 0)
GOTO(out_env, rc);
/* init big_lmm buffer */
info = echo_env_info(env);
LASSERT(info->eti_big_lmm == NULL);
OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE);
if (info->eti_big_lmm == NULL)
GOTO(out_env, rc = -ENOMEM);
info->eti_big_lmmsize = MIN_MD_SIZE;
parent = echo_resolve_path(env, ed, path, path_len);
if (IS_ERR(parent)) {
CERROR("Can not resolve the path %s: rc = %ld\n", path,
PTR_ERR(parent));
GOTO(out_free, rc = PTR_ERR(parent));
}
if (namelen > 0) {
OBD_ALLOC(name, namelen + 1);
if (name == NULL)
GOTO(out_put, rc = -ENOMEM);
if (copy_from_user(name, data->ioc_pbuf2, namelen))
GOTO(out_name, rc = -EFAULT);
}
echo_ucred_init(env);
switch (command) {
case ECHO_MD_CREATE:
case ECHO_MD_MKDIR: {
struct echo_thread_info *info = echo_env_info(env);
__u32 mode = data->ioc_obdo2.o_mode;
struct lu_fid *fid = &info->eti_fid;
int stripe_count = (int)data->ioc_obdo2.o_misc;
int stripe_index = (int)data->ioc_obdo2.o_stripe_idx;
rc = ostid_to_fid(fid, &data->ioc_obdo1.o_oi, 0);
if (rc != 0)
break;
/* In the function below, .hs_keycmp resolves to
* lu_obj_hop_keycmp() */
/* coverity[overrun-buffer-val] */
rc = echo_create_md_object(env, ed, parent, fid, name, namelen,
id, mode, count, stripe_count,
stripe_index);
break;
}
case ECHO_MD_DESTROY:
case ECHO_MD_RMDIR: {
__u32 mode = data->ioc_obdo2.o_mode;
rc = echo_destroy_object(env, ed, parent, name, namelen,
id, mode, count);
break;
}
case ECHO_MD_LOOKUP:
rc = echo_lookup_object(env, ed, parent, id, count);
break;
case ECHO_MD_GETATTR:
rc = echo_getattr_object(env, ed, parent, id, count);
break;
case ECHO_MD_SETATTR:
rc = echo_setattr_object(env, ed, parent, id, count);
break;
default:
CERROR("unknown command %d\n", command);
rc = -EINVAL;
break;
}
echo_ucred_fini(env);
out_name:
if (name != NULL)
OBD_FREE(name, namelen + 1);
out_put:
lu_object_put(env, parent);
out_free:
LASSERT(info->eti_big_lmm);
OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize);
info->eti_big_lmm = NULL;
info->eti_big_lmmsize = 0;
out_env:
cl_env_put(env, &refcheck);
return rc;
}
static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
int on_target, struct obdo *oa, void *ulsm,
int ulsm_nob, struct obd_trans_info *oti)
{
struct echo_object *eco;
struct echo_client_obd *ec = ed->ed_ec;
struct lov_stripe_md *lsm = NULL;
int rc;
int created = 0;
ENTRY;
if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
(on_target || /* set_stripe */
ec->ec_nstripes != 0)) { /* LOV */
CERROR ("No valid oid\n");
RETURN(-EINVAL);
}
rc = echo_alloc_memmd(ed, &lsm);
if (rc < 0) {
CERROR("Cannot allocate md: rc = %d\n", rc);
GOTO(failed, rc);
}
if (ulsm != NULL) {
int i, idx;
rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
if (rc != 0)
GOTO(failed, rc);
if (lsm->lsm_stripe_count == 0)
lsm->lsm_stripe_count = ec->ec_nstripes;
if (lsm->lsm_stripe_size == 0)
lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
idx = cfs_rand();
/* setup stripes: indices + default ids if required */
for (i = 0; i < lsm->lsm_stripe_count; i++) {
if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
lsm->lsm_oinfo[i]->loi_ost_idx =
(idx + i) % ec->ec_nstripes;
}
}
/* setup object ID here for !on_target and LOV hint */
if (oa->o_valid & OBD_MD_FLID) {
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
lsm->lsm_oi = oa->o_oi;
}
if (ostid_id(&lsm->lsm_oi) == 0)
ostid_set_id(&lsm->lsm_oi, ++last_object_id);
rc = 0;
if (on_target) {
/* Only echo objects are allowed to be created */
LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
(ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
if (rc != 0) {
CERROR("Cannot create objects: rc = %d\n", rc);
GOTO(failed, rc);
}
created = 1;
}
/* See what object ID we were given */
oa->o_oi = lsm->lsm_oi;
oa->o_valid |= OBD_MD_FLID;
eco = cl_echo_object_find(ed, &lsm);
if (IS_ERR(eco))
GOTO(failed, rc = PTR_ERR(eco));
cl_echo_object_put(eco);
CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
EXIT;
failed:
if (created && rc)
obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
if (lsm)
echo_free_memmd(ed, &lsm);
if (rc)
CERROR("create object failed with: rc = %d\n", rc);
return (rc);
}
static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
struct obdo *oa)
{
struct lov_stripe_md *lsm = NULL;
struct echo_object *eco;
int rc;
ENTRY;
if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
/* disallow use of object id 0 */
CERROR ("No valid oid\n");
RETURN(-EINVAL);
}
rc = echo_alloc_memmd(ed, &lsm);
if (rc < 0)
RETURN(rc);
lsm->lsm_oi = oa->o_oi;
if (!(oa->o_valid & OBD_MD_FLGROUP))
ostid_set_seq_echo(&lsm->lsm_oi);
rc = 0;
eco = cl_echo_object_find(ed, &lsm);
if (!IS_ERR(eco))
*ecop = eco;
else
rc = PTR_ERR(eco);
if (lsm)
echo_free_memmd(ed, &lsm);
RETURN(rc);
}
static void echo_put_object(struct echo_object *eco)
{
if (cl_echo_object_put(eco))
CERROR("echo client: drop an object failed");
}
static void
echo_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp)
{
unsigned long stripe_count;
unsigned long stripe_size;
unsigned long width;
unsigned long woffset;
int stripe_index;
obd_off offset;
if (lsm->lsm_stripe_count <= 1)
return;
offset = *offp;
stripe_size = lsm->lsm_stripe_size;
stripe_count = lsm->lsm_stripe_count;
/* width = # bytes in all stripes */
width = stripe_size * stripe_count;
/* woffset = offset within a width; offset = whole number of widths */
woffset = do_div (offset, width);
stripe_index = woffset / stripe_size;
*idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
*offp = offset * stripe_size + woffset % stripe_size;
}
static void
echo_client_page_debug_setup(struct lov_stripe_md *lsm,
struct page *page, int rw, obd_id id,
obd_off offset, obd_off count)
{
char *addr;
obd_off stripe_off;
obd_id stripe_id;
int delta;
/* no partial pages on the client */
LASSERT(count == PAGE_CACHE_SIZE);
addr = kmap(page);
for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
if (rw == OBD_BRW_WRITE) {
stripe_off = offset + delta;
stripe_id = id;
echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
} else {
stripe_off = 0xdeadbeef00c0ffeeULL;
stripe_id = 0xdeadbeef00c0ffeeULL;
}
block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
stripe_off, stripe_id);
}
kunmap(page);
}
static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
struct page *page, obd_id id,
obd_off offset, obd_off count)
{
obd_off stripe_off;
obd_id stripe_id;
char *addr;
int delta;
int rc;
int rc2;
/* no partial pages on the client */
LASSERT(count == PAGE_CACHE_SIZE);
addr = kmap(page);
for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
stripe_off = offset + delta;
stripe_id = id;
echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
rc2 = block_debug_check("test_brw",
addr + delta, OBD_ECHO_BLOCK_SIZE,
stripe_off, stripe_id);
if (rc2 != 0) {
CERROR ("Error in echo object "LPX64"\n", id);
rc = rc2;
}
}
kunmap(page);
return rc;
}
static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
struct echo_object *eco, obd_off offset,
obd_size count, int async,
struct obd_trans_info *oti)
{
struct lov_stripe_md *lsm = eco->eo_lsm;
obd_count npages;
struct brw_page *pga;
struct brw_page *pgp;
struct page **pages;
obd_off off;
int i;
int rc;
int verify;
int gfp_mask;
int brw_flags = 0;
ENTRY;
verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
(oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
(oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
LASSERT(lsm != NULL);
LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
if (count <= 0 ||
(count & (~CFS_PAGE_MASK)) != 0)
RETURN(-EINVAL);
/* XXX think again with misaligned I/O */
npages = count >> PAGE_CACHE_SHIFT;
if (rw == OBD_BRW_WRITE)
brw_flags = OBD_BRW_ASYNC;
OBD_ALLOC(pga, npages * sizeof(*pga));
if (pga == NULL)
RETURN(-ENOMEM);
OBD_ALLOC(pages, npages * sizeof(*pages));
if (pages == NULL) {
OBD_FREE(pga, npages * sizeof(*pga));
RETURN(-ENOMEM);
}
for (i = 0, pgp = pga, off = offset;
i < npages;
i++, pgp++, off += PAGE_CACHE_SIZE) {
LASSERT (pgp->pg == NULL); /* for cleanup */
rc = -ENOMEM;
OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
if (pgp->pg == NULL)
goto out;
pages[i] = pgp->pg;
pgp->count = PAGE_CACHE_SIZE;
pgp->off = off;
pgp->flag = brw_flags;
if (verify)
echo_client_page_debug_setup(lsm, pgp->pg, rw,
ostid_id(&oa->o_oi), off,
pgp->count);
}
/* brw mode can only be used at client */
LASSERT(ed->ed_next != NULL);
rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
out:
if (rc != 0 || rw != OBD_BRW_READ)
verify = 0;
for (i = 0, pgp = pga; i < npages; i++, pgp++) {
if (pgp->pg == NULL)
continue;
if (verify) {
int vrc;
vrc = echo_client_page_debug_check(lsm, pgp->pg,
ostid_id(&oa->o_oi),
pgp->off, pgp->count);
if (vrc != 0 && rc == 0)
rc = vrc;
}
OBD_PAGE_FREE(pgp->pg);
}
OBD_FREE(pga, npages * sizeof(*pga));
OBD_FREE(pages, npages * sizeof(*pages));
RETURN(rc);
}
static int echo_client_prep_commit(const struct lu_env *env,
struct obd_export *exp, int rw,
struct obdo *oa, struct echo_object *eco,
obd_off offset, obd_size count,
obd_size batch, struct obd_trans_info *oti,
int async)
{
struct lov_stripe_md *lsm = eco->eo_lsm;
struct obd_ioobj ioo;
struct niobuf_local *lnb;
struct niobuf_remote *rnb;
obd_off off;
obd_size npages, tot_pages;
int i, ret = 0, brw_flags = 0;
ENTRY;
if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
(lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
RETURN(-EINVAL);
npages = batch >> PAGE_CACHE_SHIFT;
tot_pages = count >> PAGE_CACHE_SHIFT;
OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
if (lnb == NULL || rnb == NULL)
GOTO(out, ret = -ENOMEM);
if (rw == OBD_BRW_WRITE && async)
brw_flags |= OBD_BRW_ASYNC;
obdo_to_ioobj(oa, &ioo);
off = offset;
for(; tot_pages; tot_pages -= npages) {
int lpages;
if (tot_pages < npages)
npages = tot_pages;
for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
rnb[i].offset = off;
rnb[i].len = PAGE_CACHE_SIZE;
rnb[i].flags = brw_flags;
}
ioo.ioo_bufcnt = npages;
oti->oti_transno = 0;
lpages = npages;
ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
lnb, oti, NULL);
if (ret != 0)
GOTO(out, ret);
LASSERT(lpages == npages);
for (i = 0; i < lpages; i++) {
struct page *page = lnb[i].page;
/* read past eof? */
if (page == NULL && lnb[i].rc == 0)
continue;
if (async)
lnb[i].flags |= OBD_BRW_ASYNC;
if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
(oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
(oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
continue;
if (rw == OBD_BRW_WRITE)
echo_client_page_debug_setup(lsm, page, rw,
ostid_id(&oa->o_oi),
rnb[i].offset,
rnb[i].len);
else
echo_client_page_debug_check(lsm, page,
ostid_id(&oa->o_oi),
rnb[i].offset,
rnb[i].len);
}
ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
rnb, npages, lnb, oti, ret);
if (ret != 0)
GOTO(out, ret);
/* Reset oti otherwise it would confuse ldiskfs. */
memset(oti, 0, sizeof(*oti));
}
out:
if (lnb)
OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
if (rnb)
OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
RETURN(ret);
}
static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
struct obd_export *exp,
struct obd_ioctl_data *data,
struct obd_trans_info *dummy_oti)
{
struct obd_device *obd = class_exp2obd(exp);
struct echo_device *ed = obd2echo_dev(obd);
struct echo_client_obd *ec = ed->ed_ec;
struct obdo *oa = &data->ioc_obdo1;
struct echo_object *eco;
int rc;
int async = 1;
long test_mode;
ENTRY;
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
rc = echo_get_object(&eco, ed, oa);
if (rc)
RETURN(rc);
oa->o_valid &= ~OBD_MD_FLHANDLE;
/* OFD/obdfilter works only via prep/commit */
test_mode = (long)data->ioc_pbuf1;
if (test_mode == 1)
async = 0;
if (ed->ed_next == NULL && test_mode != 3) {
test_mode = 3;
data->ioc_plen1 = data->ioc_count;
}
/* Truncate batch size to maximum */
if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
switch (test_mode) {
case 1:
/* fall through */
case 2:
rc = echo_client_kbrw(ed, rw, oa,
eco, data->ioc_offset,
data->ioc_count, async, dummy_oti);
break;
case 3:
rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
eco, data->ioc_offset,
data->ioc_count, data->ioc_plen1,
dummy_oti, async);
break;
default:
rc = -EINVAL;
}
echo_put_object(eco);
RETURN(rc);
}
static int
echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
int mode, obd_off offset, obd_size nob)
{
struct echo_device *ed = obd2echo_dev(exp->exp_obd);
struct lustre_handle *ulh = &oa->o_handle;
struct echo_object *eco;
obd_off end;
int rc;
ENTRY;
if (ed->ed_next == NULL)
RETURN(-EOPNOTSUPP);
if (!(mode == LCK_PR || mode == LCK_PW))
RETURN(-EINVAL);
if ((offset & (~CFS_PAGE_MASK)) != 0 ||
(nob & (~CFS_PAGE_MASK)) != 0)
RETURN(-EINVAL);
rc = echo_get_object (&eco, ed, oa);
if (rc != 0)
RETURN(rc);
end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
if (rc == 0) {
oa->o_valid |= OBD_MD_FLHANDLE;
CDEBUG(D_INFO, "Cookie is "LPX64"\n", ulh->cookie);
}
echo_put_object(eco);
RETURN(rc);
}
static int
echo_client_cancel(struct obd_export *exp, struct obdo *oa)
{
struct echo_device *ed = obd2echo_dev(exp->exp_obd);
__u64 cookie = oa->o_handle.cookie;
if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
return -EINVAL;
CDEBUG(D_INFO, "Cookie is "LPX64"\n", cookie);
return cl_echo_cancel(ed, cookie);
}
static int
echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void *uarg)
{
struct obd_device *obd = exp->exp_obd;
struct echo_device *ed = obd2echo_dev(obd);
struct echo_client_obd *ec = ed->ed_ec;
struct echo_object *eco;
struct obd_ioctl_data *data = karg;
struct obd_trans_info dummy_oti;
struct lu_env *env;
struct oti_req_ack_lock *ack_lock;
struct obdo *oa;
struct lu_fid fid;
int rw = OBD_BRW_READ;
int rc = 0;
int i;
ENTRY;
memset(&dummy_oti, 0, sizeof(dummy_oti));
oa = &data->ioc_obdo1;
if (!(oa->o_valid & OBD_MD_FLGROUP)) {
oa->o_valid |= OBD_MD_FLGROUP;
ostid_set_seq_echo(&oa->o_oi);
}
/* This FID is unpacked just for validation at this point */
rc = ostid_to_fid(&fid, &oa->o_oi, 0);
if (rc < 0)
RETURN(rc);
OBD_ALLOC_PTR(env);
if (env == NULL)
RETURN(-ENOMEM);
rc = lu_env_init(env, LCT_DT_THREAD);
if (rc)
GOTO(out, rc = -ENOMEM);
switch (cmd) {
case OBD_IOC_CREATE: /* may create echo object */
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
data->ioc_plen1, &dummy_oti);
GOTO(out, rc);
case OBD_IOC_ECHO_MD: {
int count;
int cmd;
char *dir = NULL;
int dirlen;
__u64 id;
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO(out, rc = -EPERM);
count = data->ioc_count;
cmd = data->ioc_command;
id = ostid_id(&data->ioc_obdo2.o_oi);
dirlen = data->ioc_plen1;
OBD_ALLOC(dir, dirlen + 1);
if (dir == NULL)
GOTO(out, rc = -ENOMEM);
if (copy_from_user(dir, data->ioc_pbuf1, dirlen)) {
OBD_FREE(dir, data->ioc_plen1 + 1);
GOTO(out, rc = -EFAULT);
}
rc = echo_md_handler(ed, cmd, dir, dirlen, id, count, data);
OBD_FREE(dir, dirlen + 1);
GOTO(out, rc);
}
case OBD_IOC_ECHO_ALLOC_SEQ: {
struct lu_env *cl_env;
int refcheck;
__u64 seq;
int max_count;
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO(out, rc = -EPERM);
cl_env = cl_env_get(&refcheck);
if (IS_ERR(cl_env))
GOTO(out, rc = PTR_ERR(cl_env));
rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
ECHO_MD_SES_TAG);
if (rc != 0) {
cl_env_put(cl_env, &refcheck);
GOTO(out, rc);
}
rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
cl_env_put(cl_env, &refcheck);
if (rc < 0) {
CERROR("%s: Can not alloc seq: rc = %d\n",
obd->obd_name, rc);
GOTO(out, rc);
}
if (copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
return -EFAULT;
max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
if (copy_to_user(data->ioc_pbuf2, &max_count,
data->ioc_plen2))
return -EFAULT;
GOTO(out, rc);
}
case OBD_IOC_DESTROY:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rc = echo_get_object(&eco, ed, oa);
if (rc == 0) {
rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
&dummy_oti, NULL, NULL);
if (rc == 0)
eco->eo_deleted = 1;
echo_put_object(eco);
}
GOTO(out, rc);
case OBD_IOC_GETATTR:
rc = echo_get_object(&eco, ed, oa);
if (rc == 0) {
struct obd_info oinfo = { { { 0 } } };
oinfo.oi_md = eco->eo_lsm;
oinfo.oi_oa = oa;
rc = obd_getattr(env, ec->ec_exp, &oinfo);
echo_put_object(eco);
}
GOTO(out, rc);
case OBD_IOC_SETATTR:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rc = echo_get_object(&eco, ed, oa);
if (rc == 0) {
struct obd_info oinfo = { { { 0 } } };
oinfo.oi_oa = oa;
oinfo.oi_md = eco->eo_lsm;
rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
echo_put_object(eco);
}
GOTO(out, rc);
case OBD_IOC_BRW_WRITE:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rw = OBD_BRW_WRITE;
/* fall through */
case OBD_IOC_BRW_READ:
rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
GOTO(out, rc);
case ECHO_IOC_GET_STRIPE:
rc = echo_get_object(&eco, ed, oa);
if (rc == 0) {
rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
data->ioc_plen1);
echo_put_object(eco);
}
GOTO(out, rc);
case ECHO_IOC_SET_STRIPE:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
if (data->ioc_pbuf1 == NULL) { /* unset */
rc = echo_get_object(&eco, ed, oa);
if (rc == 0) {
eco->eo_deleted = 1;
echo_put_object(eco);
}
} else {
rc = echo_create_object(env, ed, 0, oa,
data->ioc_pbuf1,
data->ioc_plen1, &dummy_oti);
}
GOTO (out, rc);
case ECHO_IOC_ENQUEUE:
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rc = echo_client_enqueue(exp, oa,
data->ioc_conn1, /* lock mode */
data->ioc_offset,
data->ioc_count);/*extent*/
GOTO (out, rc);
case ECHO_IOC_CANCEL:
rc = echo_client_cancel(exp, oa);
GOTO (out, rc);
default:
CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
GOTO (out, rc = -ENOTTY);
}
EXIT;
out:
lu_env_fini(env);
OBD_FREE_PTR(env);
/* XXX this should be in a helper also called by target_send_reply */
for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
i++, ack_lock++) {
if (!ack_lock->mode)
break;
ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
}
return rc;
}
static int echo_client_setup(const struct lu_env *env,
struct obd_device *obddev, struct lustre_cfg *lcfg)
{
struct echo_client_obd *ec = &obddev->u.echo_client;
struct obd_device *tgt;
struct obd_uuid echo_uuid = { "ECHO_UUID" };
struct obd_connect_data *ocd = NULL;
int rc;
ENTRY;
if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
CERROR("requires a TARGET OBD name\n");
RETURN(-EINVAL);
}
tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
CERROR("device not attached or not set up (%s)\n",
lustre_cfg_string(lcfg, 1));
RETURN(-EINVAL);
}
spin_lock_init(&ec->ec_lock);
INIT_LIST_HEAD (&ec->ec_objects);
INIT_LIST_HEAD (&ec->ec_locks);
ec->ec_unique = 0;
ec->ec_nstripes = 0;
if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
lu_context_tags_update(ECHO_MD_CTX_TAG);
lu_session_tags_update(ECHO_MD_SES_TAG);
RETURN(0);
}
OBD_ALLOC(ocd, sizeof(*ocd));
if (ocd == NULL) {
CERROR("Can't alloc ocd connecting to %s\n",
lustre_cfg_string(lcfg, 1));
return -ENOMEM;
}
ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE;
ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
ocd->ocd_version = LUSTRE_VERSION_CODE;
ocd->ocd_group = FID_SEQ_ECHO;
rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
if (rc == 0) {
/* Turn off pinger because it connects to tgt obd directly. */
spin_lock(&tgt->obd_dev_lock);
list_del_init(&ec->ec_exp->exp_obd_chain_timed);
spin_unlock(&tgt->obd_dev_lock);
}
OBD_FREE(ocd, sizeof(*ocd));
if (rc != 0) {
CERROR("fail to connect to device %s\n",
lustre_cfg_string(lcfg, 1));
return (rc);
}
RETURN(rc);
}
static int echo_client_cleanup(struct obd_device *obddev)
{
struct echo_device *ed = obd2echo_dev(obddev);
struct echo_client_obd *ec = &obddev->u.echo_client;
int rc;
ENTRY;
/*Do nothing for Metadata echo client*/
if (ed == NULL )
RETURN(0);
if (ed->ed_next_ismd) {
lu_context_tags_clear(ECHO_MD_CTX_TAG);
lu_session_tags_clear(ECHO_MD_SES_TAG);
RETURN(0);
}
if (!list_empty(&obddev->obd_exports)) {
CERROR("still has clients!\n");
RETURN(-EBUSY);
}
LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
rc = obd_disconnect(ec->ec_exp);
if (rc != 0)
CERROR("fail to disconnect device: %d\n", rc);
RETURN(rc);
}
static int echo_client_connect(const struct lu_env *env,
struct obd_export **exp,
struct obd_device *src, struct obd_uuid *cluuid,
struct obd_connect_data *data, void *localdata)
{
int rc;
struct lustre_handle conn = { 0 };
ENTRY;
rc = class_connect(&conn, src, cluuid);
if (rc == 0) {
*exp = class_conn2export(&conn);
}
RETURN (rc);
}
static int echo_client_disconnect(struct obd_export *exp)
{
#if 0
struct obd_device *obd;
struct echo_client_obd *ec;
struct ec_lock *ecl;
#endif
int rc;
ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
#if 0
obd = exp->exp_obd;
ec = &obd->u.echo_client;
/* no more contention on export's lock list */
while (!list_empty (&exp->exp_ec_data.eced_locks)) {
ecl = list_entry (exp->exp_ec_data.eced_locks.next,
struct ec_lock, ecl_exp_chain);
list_del (&ecl->ecl_exp_chain);
rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
ecl->ecl_mode, &ecl->ecl_lock_handle);
CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
"(%d)\n", ecl->ecl_object->eco_id, rc);
echo_put_object (ecl->ecl_object);
OBD_FREE (ecl, sizeof (*ecl));
}
#endif
rc = class_disconnect(exp);
GOTO(out, rc);
out:
return rc;
}
static struct obd_ops echo_client_obd_ops = {
.o_owner = THIS_MODULE,
#if 0
.o_setup = echo_client_setup,
.o_cleanup = echo_client_cleanup,
#endif
.o_iocontrol = echo_client_iocontrol,
.o_connect = echo_client_connect,
.o_disconnect = echo_client_disconnect
};
int echo_client_init(void)
{
struct lprocfs_static_vars lvars = { 0 };
int rc;
lprocfs_echo_init_vars(&lvars);
rc = lu_kmem_init(echo_caches);
if (rc == 0) {
rc = class_register_type(&echo_client_obd_ops, NULL,
lvars.module_vars,
LUSTRE_ECHO_CLIENT_NAME,
&echo_device_type);
if (rc)
lu_kmem_fini(echo_caches);
}
return rc;
}
void echo_client_exit(void)
{
class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
lu_kmem_fini(echo_caches);
}
static int __init obdecho_init(void)
{
struct lprocfs_static_vars lvars;
int rc;
ENTRY;
LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
lprocfs_echo_init_vars(&lvars);
rc = echo_client_init();
RETURN(rc);
}
static void /*__exit*/ obdecho_exit(void)
{
echo_client_exit();
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
MODULE_LICENSE("GPL");
cfs_module(obdecho, LUSTRE_VERSION_STRING, obdecho_init, obdecho_exit);
/** @} echo_client */