reuse filedescriptors, massively decreasing fragmentation

pull/76/head
bert hubert 2020-01-09 11:53:31 +01:00
parent e49c3ff6c2
commit 1cdd55a75e
1 changed files with 90 additions and 6 deletions

View File

@ -7,7 +7,7 @@
#include "fmt/printf.h"
#include <mutex>
#include "storage.hh"
#include <map>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -27,13 +27,98 @@ struct FatalException : public std::runtime_error
FatalException(const std::string& str) : std::runtime_error(str){}
};
void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
int getfd(const char* path, int mode, int permission)
{
auto path = getPath(g_storagedir, s, sourceid, true);
int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0600);
struct FileID {
string path;
int mode;
int permission;
bool operator<(const FileID& rhs) const
{
return std::tie(path, mode, permission) < std::tie(rhs.path, rhs.mode, rhs. permission);
}
};
struct FDID {
explicit FDID(int fd_)
{
fd = fd_;
lastUsed = time(0);
}
FDID(const FDID& rhs) = delete;
FDID& operator=(const FDID& rhs) = delete;
FDID(FDID&& rhs)
{
fd = rhs.fd;
lastUsed = rhs.lastUsed;
rhs.fd = -1;
}
~FDID()
{
if(fd >= 0) {
cout<<"Closing fd "<<fd<<endl;
close(fd);
}
}
int fd;
time_t lastUsed;
};
static std::mutex mut;
std::lock_guard<std::mutex> lock(mut);
static map<FileID, FDID> fds;
// do some cleanup
time_t now = time(0);
for(auto iter = fds.begin(); iter != fds.end(); ) {
if(now - iter->second.lastUsed > 60) {
cout<<"Found stale entry for "<<iter->first.path<<endl;
iter = fds.erase(iter);
}
else
++iter;
}
int toErase = fds.size() - 512;
if(toErase > 0) {
cout<<"Need to erase "<<toErase<<" entries"<<endl;
auto end = fds.begin();
std::advance(end, toErase);
fds.erase(fds.begin(), end);
}
FileID fid({path, mode, permission});
// cout<<"Request for "<<path<<endl;
auto iter = fds.find(fid);
if(iter != fds.end()) {
// cout<<"Found a live FD!"<<endl;
iter->second.lastUsed = time(0);
return iter->second.fd;
}
// cout<<"Did not find it, first doing some cleaning"<<endl;
int fd = open(path, mode, permission);
if(fd < 0) {
throw FatalException("Unable to open file for storage: "+string(strerror(errno)));
}
cout<<"Opened fd "<<fd<<" for path "<<path<<endl;
fds.emplace(fid, FDID(fd));
return fd;
}
void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
{
auto path = getPath(g_storagedir, s, sourceid, true);
int fd = getfd(path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0600);
int res = write(fd, &message[0], message.size());
if(res < 0) {
close(fd);
@ -43,7 +128,6 @@ void writeToDisk(time_t s, uint64_t sourceid, std::string_view message)
close(fd);
throw FatalException("Partial write to storage");
}
close(fd);
}
void recvSession(int s, ComboAddress client)
@ -82,7 +166,7 @@ void recvSession(int s, ComboAddress client)
catch(std::exception& e) {
cout<<"Error in receiving thread: "<<e.what()<<endl;
}
cout<<"Thread exiting"<<endl;
cout<<"Thread for "<<client.toStringWithPort()<< " exiting"<<endl;
}
void recvListener(Socket&& s, ComboAddress local)