diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 72f8e1311392..0176241eaea7 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2738,15 +2738,13 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, _got = 0; ret = try_get_cap_refs(ci, need, want, endoff, false, &_got); - if (ret == -EAGAIN) { + if (ret == -EAGAIN) continue; - } else if (!ret) { - int err; - + if (!ret) { DEFINE_WAIT_FUNC(wait, woken_wake_function); add_wait_queue(&ci->i_cap_wq, &wait); - while (!(err = try_get_cap_refs(ci, need, want, endoff, + while (!(ret = try_get_cap_refs(ci, need, want, endoff, true, &_got))) { if (signal_pending(current)) { ret = -ERESTARTSYS; @@ -2756,14 +2754,16 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, } remove_wait_queue(&ci->i_cap_wq, &wait); - if (err == -EAGAIN) + if (ret == -EAGAIN) continue; } - if (ret == -ESTALE) { - /* session was killed, try renew caps */ - ret = ceph_renew_caps(&ci->vfs_inode); - if (ret == 0) - continue; + if (ret < 0) { + if (ret == -ESTALE) { + /* session was killed, try renew caps */ + ret = ceph_renew_caps(&ci->vfs_inode); + if (ret == 0) + continue; + } return ret; } @@ -2992,8 +2992,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, } if (complete_capsnap) wake_up_all(&ci->i_cap_wq); - while (put-- > 0) - iput(inode); + while (put-- > 0) { + /* avoid calling iput_final() in osd dispatch threads */ + ceph_async_iput(inode); + } } /* @@ -3964,8 +3966,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, done: mutex_unlock(&session->s_mutex); done_unlocked: - iput(inode); ceph_put_string(extra_info.pool_ns); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(inode); return; flush_cap_releases: @@ -4011,7 +4014,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) if (inode) { dout("check_delayed_caps on %p\n", inode); ceph_check_caps(ci, flags, NULL); - iput(inode); + /* avoid calling iput_final() in tick thread */ + ceph_async_iput(inode); } } spin_unlock(&mdsc->cap_delay_lock); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 305daf043eb0..183c37c0a8fc 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -791,7 +791,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) if (aio_work) { INIT_WORK(&aio_work->work, ceph_aio_retry_work); aio_work->req = req; - queue_work(ceph_inode_to_client(inode)->wb_wq, + queue_work(ceph_inode_to_client(inode)->inode_wq, &aio_work->work); return; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f85355bf49c4..761451f36e2d 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -33,9 +33,7 @@ static const struct inode_operations ceph_symlink_iops; -static void ceph_invalidate_work(struct work_struct *work); -static void ceph_writeback_work(struct work_struct *work); -static void ceph_vmtruncate_work(struct work_struct *work); +static void ceph_inode_work(struct work_struct *work); /* * find or create an inode, given the ceph ino number @@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_snap_realm_item); INIT_LIST_HEAD(&ci->i_snap_flush_item); - INIT_WORK(&ci->i_wb_work, ceph_writeback_work); - INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work); - - INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work); + INIT_WORK(&ci->i_work, ceph_inode_work); + ci->i_work_mask = 0; ceph_fscache_inode_init(ci); @@ -1480,7 +1476,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, pr_err("fill_inode badness on %p got %d\n", in, rc); err = rc; } - iput(in); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(in); } return err; @@ -1678,8 +1675,11 @@ retry_lookup: &req->r_caps_reservation); if (ret < 0) { pr_err("fill_inode badness on %p\n", in); - if (d_really_is_negative(dn)) - iput(in); + if (d_really_is_negative(dn)) { + /* avoid calling iput_final() in mds + * dispatch threads */ + ceph_async_iput(in); + } d_drop(dn); err = ret; goto next_item; @@ -1689,7 +1689,7 @@ retry_lookup: if (ceph_security_xattr_deadlock(in)) { dout(" skip splicing dn %p to inode %p" " (security xattr deadlock)\n", dn, in); - iput(in); + ceph_async_iput(in); skipped++; goto next_item; } @@ -1740,57 +1740,87 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) return ret; } +/* + * Put reference to inode, but avoid calling iput_final() in current thread. + * iput_final() may wait for reahahead pages. The wait can cause deadlock in + * some contexts. + */ +void ceph_async_iput(struct inode *inode) +{ + if (!inode) + return; + for (;;) { + if (atomic_add_unless(&inode->i_count, -1, 1)) + break; + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ceph_inode(inode)->i_work)) + break; + /* queue work failed, i_count must be at least 2 */ + } +} + /* * Write back inode data in a worker thread. (This can't be done * in the message handler context.) */ void ceph_queue_writeback(struct inode *inode) { + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask); + ihold(inode); - if (queue_work(ceph_inode_to_client(inode)->wb_wq, - &ceph_inode(inode)->i_wb_work)) { + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ci->i_work)) { dout("ceph_queue_writeback %p\n", inode); } else { - dout("ceph_queue_writeback %p failed\n", inode); + dout("ceph_queue_writeback %p already queued, mask=%lx\n", + inode, ci->i_work_mask); iput(inode); } } -static void ceph_writeback_work(struct work_struct *work) -{ - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_wb_work); - struct inode *inode = &ci->vfs_inode; - - dout("writeback %p\n", inode); - filemap_fdatawrite(&inode->i_data); - iput(inode); -} - /* * queue an async invalidation */ void ceph_queue_invalidate(struct inode *inode) { + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask); + ihold(inode); - if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq, - &ceph_inode(inode)->i_pg_inv_work)) { + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ceph_inode(inode)->i_work)) { dout("ceph_queue_invalidate %p\n", inode); } else { - dout("ceph_queue_invalidate %p failed\n", inode); + dout("ceph_queue_invalidate %p already queued, mask=%lx\n", + inode, ci->i_work_mask); iput(inode); } } /* - * Invalidate inode pages in a worker thread. (This can't be done - * in the message handler context.) + * Queue an async vmtruncate. If we fail to queue work, we will handle + * the truncation the next time we call __ceph_do_pending_vmtruncate. */ -static void ceph_invalidate_work(struct work_struct *work) +void ceph_queue_vmtruncate(struct inode *inode) { - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_pg_inv_work); - struct inode *inode = &ci->vfs_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask); + + ihold(inode); + if (queue_work(ceph_inode_to_client(inode)->inode_wq, + &ci->i_work)) { + dout("ceph_queue_vmtruncate %p\n", inode); + } else { + dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n", + inode, ci->i_work_mask); + iput(inode); + } +} + +static void ceph_do_invalidate_pages(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); u32 orig_gen; int check = 0; @@ -1842,44 +1872,6 @@ static void ceph_invalidate_work(struct work_struct *work) out: if (check) ceph_check_caps(ci, 0, NULL); - iput(inode); -} - - -/* - * called by trunc_wq; - * - * We also truncate in a separate thread as well. - */ -static void ceph_vmtruncate_work(struct work_struct *work) -{ - struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, - i_vmtruncate_work); - struct inode *inode = &ci->vfs_inode; - - dout("vmtruncate_work %p\n", inode); - __ceph_do_pending_vmtruncate(inode); - iput(inode); -} - -/* - * Queue an async vmtruncate. If we fail to queue work, we will handle - * the truncation the next time we call __ceph_do_pending_vmtruncate. - */ -void ceph_queue_vmtruncate(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - - ihold(inode); - - if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq, - &ci->i_vmtruncate_work)) { - dout("ceph_queue_vmtruncate %p\n", inode); - } else { - dout("ceph_queue_vmtruncate %p failed, pending=%d\n", - inode, ci->i_truncate_pending); - iput(inode); - } } /* @@ -1943,6 +1935,25 @@ retry: wake_up_all(&ci->i_cap_wq); } +static void ceph_inode_work(struct work_struct *work) +{ + struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, + i_work); + struct inode *inode = &ci->vfs_inode; + + if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) { + dout("writeback %p\n", inode); + filemap_fdatawrite(&inode->i_data); + } + if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask)) + ceph_do_invalidate_pages(inode); + + if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask)) + __ceph_do_pending_vmtruncate(inode); + + iput(inode); +} + /* * symlinks */ diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 959b1bf7c327..6af2d0d4a87a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -690,11 +690,12 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_msg_put(req->r_reply); if (req->r_inode) { ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); - iput(req->r_inode); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(req->r_inode); } if (req->r_parent) ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); - iput(req->r_target_inode); + ceph_async_iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -708,7 +709,7 @@ void ceph_mdsc_release_request(struct kref *kref) */ ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); - iput(req->r_old_dentry_dir); + ceph_async_iput(req->r_old_dentry_dir); } kfree(req->r_path1); kfree(req->r_path2); @@ -818,7 +819,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, } if (req->r_unsafe_dir) { - iput(req->r_unsafe_dir); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(req->r_unsafe_dir); req->r_unsafe_dir = NULL; } @@ -983,7 +985,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { spin_unlock(&ci->i_ceph_lock); - iput(inode); + ceph_async_iput(inode); goto random; } mds = cap->session->s_mds; @@ -992,7 +994,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); out: - iput(inode); + /* avoid calling iput_final() while holding mdsc->mutex or + * in mds dispatch threads */ + ceph_async_iput(inode); return mds; random: @@ -1302,7 +1306,9 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, spin_unlock(&session->s_cap_lock); if (last_inode) { - iput(last_inode); + /* avoid calling iput_final() while holding + * s_mutex or in mds dispatch threads */ + ceph_async_iput(last_inode); last_inode = NULL; } if (old_cap) { @@ -1335,7 +1341,7 @@ out: session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock); - iput(last_inode); + ceph_async_iput(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap); @@ -1471,7 +1477,8 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_unlock(&session->s_cap_lock); inode = ceph_find_inode(sb, vino); - iput(inode); + /* avoid calling iput_final() while holding s_mutex */ + ceph_async_iput(inode); spin_lock(&session->s_cap_lock); } @@ -3912,8 +3919,9 @@ release: ceph_con_send(&session->s_con, msg); out: - iput(inode); mutex_unlock(&session->s_mutex); + /* avoid calling iput_final() in mds dispatch threads */ + ceph_async_iput(inode); return; bad: diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c index c4522212872c..d629fc857450 100644 --- a/fs/ceph/quota.c +++ b/fs/ceph/quota.c @@ -74,7 +74,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, le64_to_cpu(h->max_files)); spin_unlock(&ci->i_ceph_lock); - iput(inode); + /* avoid calling iput_final() in dispatch thread */ + ceph_async_iput(inode); } static struct ceph_quotarealm_inode * @@ -235,7 +236,8 @@ restart: ci = ceph_inode(in); has_quota = __ceph_has_any_quota(ci); - iput(in); + /* avoid calling iput_final() while holding mdsc->snap_rwsem */ + ceph_async_iput(in); next = realm->parent; if (has_quota || !next) @@ -372,7 +374,8 @@ restart: pr_warn("Invalid quota check op (%d)\n", op); exceeded = true; /* Just break the loop */ } - iput(in); + /* avoid calling iput_final() while holding mdsc->snap_rwsem */ + ceph_async_iput(in); next = realm->parent; if (exceeded || !next) diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index b26e12cd8ec3..72c6c022f02b 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -648,13 +648,15 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock); - iput(lastinode); + /* avoid calling iput_final() while holding + * mdsc->snap_rwsem or in mds dispatch threads */ + ceph_async_iput(lastinode); lastinode = inode; ceph_queue_cap_snap(ci); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); - iput(lastinode); + ceph_async_iput(lastinode); dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); } @@ -806,7 +808,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ihold(inode); spin_unlock(&mdsc->snap_flush_lock); ceph_flush_snaps(ci, &session); - iput(inode); + /* avoid calling iput_final() while holding + * session->s_mutex or in mds dispatch threads */ + ceph_async_iput(inode); spin_lock(&mdsc->snap_flush_lock); } spin_unlock(&mdsc->snap_flush_lock); @@ -950,12 +954,14 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ceph_get_snap_realm(mdsc, realm); ceph_put_snap_realm(mdsc, oldrealm); - iput(inode); + /* avoid calling iput_final() while holding + * mdsc->snap_rwsem or mds in dispatch threads */ + ceph_async_iput(inode); continue; skip_inode: spin_unlock(&ci->i_ceph_lock); - iput(inode); + ceph_async_iput(inode); } /* we may have taken some of the old realm's children. */ diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 01be7c1bc4c6..d57fa60dcd43 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -672,18 +672,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, * The number of concurrent works can be high but they don't need * to be processed in parallel, limit concurrency. */ - fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1); - if (!fsc->wb_wq) + fsc->inode_wq = alloc_workqueue("ceph-inode", WQ_UNBOUND, 0); + if (!fsc->inode_wq) goto fail_client; - fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1); - if (!fsc->pg_inv_wq) - goto fail_wb_wq; - fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1); - if (!fsc->trunc_wq) - goto fail_pg_inv_wq; fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1); if (!fsc->cap_wq) - goto fail_trunc_wq; + goto fail_inode_wq; /* set up mempools */ err = -ENOMEM; @@ -697,12 +691,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, fail_cap_wq: destroy_workqueue(fsc->cap_wq); -fail_trunc_wq: - destroy_workqueue(fsc->trunc_wq); -fail_pg_inv_wq: - destroy_workqueue(fsc->pg_inv_wq); -fail_wb_wq: - destroy_workqueue(fsc->wb_wq); +fail_inode_wq: + destroy_workqueue(fsc->inode_wq); fail_client: ceph_destroy_client(fsc->client); fail: @@ -715,9 +705,7 @@ fail: static void flush_fs_workqueues(struct ceph_fs_client *fsc) { - flush_workqueue(fsc->wb_wq); - flush_workqueue(fsc->pg_inv_wq); - flush_workqueue(fsc->trunc_wq); + flush_workqueue(fsc->inode_wq); flush_workqueue(fsc->cap_wq); } @@ -725,9 +713,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) { dout("destroy_fs_client %p\n", fsc); - destroy_workqueue(fsc->wb_wq); - destroy_workqueue(fsc->pg_inv_wq); - destroy_workqueue(fsc->trunc_wq); + destroy_workqueue(fsc->inode_wq); destroy_workqueue(fsc->cap_wq); mempool_destroy(fsc->wb_pagevec_pool); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 6edab9a750f8..5f27e1f7f2d6 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -109,9 +109,7 @@ struct ceph_fs_client { mempool_t *wb_pagevec_pool; atomic_long_t writeback_count; - struct workqueue_struct *wb_wq; - struct workqueue_struct *pg_inv_wq; - struct workqueue_struct *trunc_wq; + struct workqueue_struct *inode_wq; struct workqueue_struct *cap_wq; #ifdef CONFIG_DEBUG_FS @@ -387,10 +385,8 @@ struct ceph_inode_info { struct list_head i_snap_realm_item; struct list_head i_snap_flush_item; - struct work_struct i_wb_work; /* writeback work */ - struct work_struct i_pg_inv_work; /* page invalidation work */ - - struct work_struct i_vmtruncate_work; + struct work_struct i_work; + unsigned long i_work_mask; #ifdef CONFIG_CEPH_FSCACHE struct fscache_cookie *fscache; @@ -512,6 +508,13 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, #define CEPH_I_ERROR_FILELOCK (1 << 12) /* have seen file lock errors */ +/* + * Masks of ceph inode work. + */ +#define CEPH_I_WORK_WRITEBACK 0 /* writeback */ +#define CEPH_I_WORK_INVALIDATE_PAGES 1 /* invalidate pages */ +#define CEPH_I_WORK_VMTRUNCATE 2 /* vmtruncate */ + /* * We set the ERROR_WRITE bit when we start seeing write errors on an inode * and then clear it when they start succeeding. Note that we do a lockless @@ -896,9 +899,9 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask); extern bool ceph_inode_set_size(struct inode *inode, loff_t size); extern void __ceph_do_pending_vmtruncate(struct inode *inode); extern void ceph_queue_vmtruncate(struct inode *inode); - extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode); +extern void ceph_async_iput(struct inode *inode); extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, int mask, bool force);