Params: use a multiple-reader / single-writer flock to improve concurrency (#2207)
* improve concurrency: multiple readers, single writer locks * remove lock in read_db_valuepull/2225/head
parent
254814cc79
commit
4fba3408c4
|
@ -117,14 +117,15 @@ def fsync_dir(path):
|
|||
|
||||
|
||||
class FileLock():
|
||||
def __init__(self, path, create):
|
||||
def __init__(self, path, create, lock_ex):
|
||||
self._path = path
|
||||
self._create = create
|
||||
self._fd = None
|
||||
self._lock_ex = lock_ex
|
||||
|
||||
def acquire(self):
|
||||
self._fd = os.open(self._path, os.O_CREAT if self._create else 0)
|
||||
fcntl.flock(self._fd, fcntl.LOCK_EX)
|
||||
fcntl.flock(self._fd, fcntl.LOCK_EX if self._lock_ex else fcntl.LOCK_SH)
|
||||
|
||||
def release(self):
|
||||
if self._fd is not None:
|
||||
|
@ -152,8 +153,8 @@ class DBAccessor():
|
|||
except KeyError:
|
||||
return None
|
||||
|
||||
def _get_lock(self, create):
|
||||
lock = FileLock(os.path.join(self._path, ".lock"), create)
|
||||
def _get_lock(self, create, lock_ex):
|
||||
lock = FileLock(os.path.join(self._path, ".lock"), create, lock_ex)
|
||||
lock.acquire()
|
||||
return lock
|
||||
|
||||
|
@ -185,7 +186,7 @@ class DBAccessor():
|
|||
class DBReader(DBAccessor):
|
||||
def __enter__(self):
|
||||
try:
|
||||
lock = self._get_lock(False)
|
||||
lock = self._get_lock(False, False)
|
||||
except OSError as e:
|
||||
# Do not create lock if it does not exist.
|
||||
if e.errno == errno.ENOENT:
|
||||
|
@ -223,7 +224,7 @@ class DBWriter(DBAccessor):
|
|||
|
||||
try:
|
||||
os.chmod(self._path, 0o777)
|
||||
self._lock = self._get_lock(True)
|
||||
self._lock = self._get_lock(True, True)
|
||||
self._vals = self._read_values_locked()
|
||||
except Exception:
|
||||
os.umask(self._prev_umask)
|
||||
|
@ -312,7 +313,7 @@ def write_db(params_path, key, value):
|
|||
value = value.encode('utf8')
|
||||
|
||||
prev_umask = os.umask(0)
|
||||
lock = FileLock(params_path + "/.lock", True)
|
||||
lock = FileLock(params_path + "/.lock", True, True)
|
||||
lock.acquire()
|
||||
|
||||
try:
|
||||
|
|
|
@ -268,45 +268,19 @@ cleanup:
|
|||
}
|
||||
|
||||
int read_db_value(const char* key, char** value, size_t* value_sz, bool persistent_param) {
|
||||
int lock_fd = -1;
|
||||
int result;
|
||||
char path[1024];
|
||||
const char* params_path = persistent_param ? persistent_params_path : default_params_path;
|
||||
|
||||
result = snprintf(path, sizeof(path), "%s/.lock", params_path);
|
||||
int result = snprintf(path, sizeof(path), "%s/d/%s", params_path, key);
|
||||
if (result < 0) {
|
||||
goto cleanup;
|
||||
}
|
||||
lock_fd = open(path, 0);
|
||||
|
||||
result = snprintf(path, sizeof(path), "%s/d/%s", params_path, key);
|
||||
if (result < 0) {
|
||||
goto cleanup;
|
||||
return result;
|
||||
}
|
||||
|
||||
// Take lock.
|
||||
result = flock(lock_fd, LOCK_EX);
|
||||
if (result < 0) {
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
// Read value.
|
||||
// TODO(mgraczyk): If there is a lot of contention, we can release the lock
|
||||
// after opening the file, before reading.
|
||||
*value = static_cast<char*>(read_file(path, value_sz));
|
||||
if (*value == NULL) {
|
||||
result = -22;
|
||||
goto cleanup;
|
||||
return -22;
|
||||
}
|
||||
|
||||
result = 0;
|
||||
|
||||
cleanup:
|
||||
// Release lock.
|
||||
if (lock_fd >= 0) {
|
||||
close(lock_fd);
|
||||
}
|
||||
return result;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void read_db_value_blocking(const char* key, char** value, size_t* value_sz, bool persistent_param) {
|
||||
|
@ -330,7 +304,7 @@ int read_db_all(std::map<std::string, std::string> *params, bool persistent_para
|
|||
int lock_fd = open(lock_path.c_str(), 0);
|
||||
if (lock_fd < 0) return -1;
|
||||
|
||||
err = flock(lock_fd, LOCK_EX);
|
||||
err = flock(lock_fd, LOCK_SH);
|
||||
if (err < 0) return err;
|
||||
|
||||
std::string key_path = util::string_format("%s/d", params_path);
|
||||
|
|
Loading…
Reference in New Issue