add the zstd wrapper

pull/114/head
bert hubert 2020-04-26 16:19:14 +02:00
parent e96ceb7ddd
commit 35e9195af8
2 changed files with 242 additions and 0 deletions

183
zstdwrap.cc 100644
View File

@ -0,0 +1,183 @@
#include "zstdwrap.hh"
#include <zstd.h>
#include <iostream>
#include <string.h>
#include "navmon.hh"
using std::cout;
using std::endl;
using std::cerr;
ZStdCompressor::ZStdCompressor(const std::function<void(const char*, uint32_t)>& emit, int compressionLevel): d_emit(emit)
{
d_z=ZSTD_createCStream();
ZSTD_initCStream(d_z, compressionLevel);
d_outcapacity=ZSTD_CStreamOutSize();
d_out.dst = malloc(d_outcapacity); // ????
d_out.pos=0;
d_out.size=d_outcapacity;
}
int ZStdCompressor::maxCompressionLevel()
{
return ZSTD_maxCLevel();
}
ZStdCompressor::~ZStdCompressor()
{
for(;;) {
auto res = ZSTD_endStream(d_z, &d_out);
d_outputBytes += d_out.pos;
try {
d_emit((const char*)d_out.dst, d_out.pos);
}
catch(...){}
// cout<<"res: "<<res<<endl;
if(!res)
break;
d_out.pos = 0;
if(ZSTD_isError(res)) {
cerr<<"Error in ZSTD_endStream"<<endl;
break;
}
}
ZSTD_freeCStream(d_z);
free(d_out.dst); // ????
}
uint32_t ZStdCompressor::outputBufferBytes()
{
return d_out.pos;
}
uint32_t ZStdCompressor::outputBufferCapacity()
{
return d_out.size;
}
void ZStdCompressor::flushToEmit()
{
d_outputBytes += d_out.pos;
d_emit((char*)d_out.dst, d_out.pos);
d_out.pos=0;
}
void ZStdCompressor::flush()
{
ZSTD_flushStream(d_z, &d_out);
flushToEmit();
}
void ZStdCompressor::give(const char* data, uint32_t bytes)
{
d_inputBytes += bytes;
ZSTD_inBuffer in;
in.src=data;
in.pos=0;
in.size=bytes;
for(;;) {
// cout<<"before out: "<<d_out.pos<<endl;
ZSTD_compressStream(d_z, &d_out, &in);
// cout<<"after out: "<<d_out.pos<<", in.pos="<<in.pos<<", in.size="<<in.size<<endl;
if(in.pos == in.size)
break;
// if we are here, zstd did not consume everything, so we must make room
flushToEmit();
}
}
ZStdReader::ZStdReader(int fd)
{
d_sourcefd = fd;
int pfd[2];
if(pipe(pfd) < 0)
unixDie("Creating pipe for zstd reader");
d_readpipe=pfd[0]; // for the customer
d_writepipe=pfd[1]; // where we stuff data
d_thread = std::thread(std::bind(&ZStdReader::worker, this));
}
static size_t writen(int fd, const void *buf, size_t count)
{
const char *ptr = (char*)buf;
const char *eptr = ptr + count;
while(ptr != eptr) {
ssize_t res = ::write(fd, ptr, eptr - ptr);
if(res < 0) {
if (errno == EAGAIN)
throw std::runtime_error("used writen2 on non-blocking socket, got EAGAIN");
else if (errno == EPIPE) {
// other end closed, we are pleased to exit
return 0;
}
else
unixDie("failed in writen2");
}
else if (res == 0)
throw std::runtime_error("could not write all bytes, got eof in writen2");
ptr += (size_t) res;
}
return count;
}
void ZStdReader::worker()
{
auto z = ZSTD_createDStream(); // think about automatic cleanup somehow
ZSTD_initDStream(z);
ZSTD_outBuffer output;
ZSTD_inBuffer input;
auto inputcapacity=128;
std::vector<char> src(inputcapacity);
input.src = &src[0];
auto outputcapacity = ZSTD_DStreamOutSize();
std::vector<char> dst(outputcapacity);
output.dst = &dst[0];
for(;;) {
input.pos=0;
input.size=read(d_sourcefd, (char*)input.src, inputcapacity);
if(input.size <= 0) {
cerr<<"Got EOF on input fd "<<d_sourcefd<<", terminating thread"<<endl;
break;
}
while(input.pos != input.size) {
output.pos=0;
output.size=outputcapacity;
ZSTD_decompressStream(z, &output, &input);
int res;
res = writen(d_writepipe, output.dst, output.pos);
if(!res) // we are history
break;
if(res < 0) {
cerr<<"Error in zstd thread: "<<strerror(errno)<<endl;
break;
}
}
}
close(d_writepipe);
ZSTD_freeDStream(z);
}
ZStdReader::~ZStdReader()
{
cerr<<"ZStdReader destructor called"<<endl;
int rc = close(d_readpipe);
cerr<<"Close rc = "<<rc<<endl;
cerr<<"Waiting on join"<<endl;
d_thread.join();
cerr<<"Done waiting on join"<<endl;
}

59
zstdwrap.hh 100644
View File

@ -0,0 +1,59 @@
#pragma once
#include <cstdint>
#include <zstd.h> // can't easily be moved to zstdwrap.cc, trust me
#include <functional>
#include <thread>
#include <atomic>
// users submit (give()) data to this class
// the class has an internal buffer to which compressed data gets written
// If that buffer is full, we call emit() to empty it
// the emit() function must make sure that everything in the buffer gets sent!
class ZStdCompressor
{
public:
explicit ZStdCompressor(const std::function<void(const char*, uint32_t)>& emit, int compressionLevel);
ZStdCompressor(const ZStdCompressor& rhs) = delete;
~ZStdCompressor();
void give(const char* data, uint32_t bytes);
static int maxCompressionLevel();
uint64_t d_inputBytes{0}, d_outputBytes{0};
uint32_t outputBufferBytes(); // Number of bytes in output buffer
uint32_t outputBufferCapacity(); // output buffer capacity
void flush();
private:
void flushToEmit();
ZSTD_CCtx *d_z{nullptr};
ZSTD_outBuffer d_out;
uint32_t d_outcapacity;
std::function<void(const char*, uint32_t)> d_emit;
};
/* this class is tremendously devious
you pass it a filedescriptor from which it reads zstd compressed data
You can then read the uncompressed data on the filedescriptor you
get from getFD()
*/
class ZStdReader
{
public:
ZStdReader(int fd); // we don't close this for you
ZStdReader(const ZStdReader& rhs) = delete;
~ZStdReader();
int getFD()
{
return d_readpipe;
}
private:
std::thread d_thread;
int d_sourcefd; // this is where we read compressed data
int d_writepipe; // which we then stuff into this pipe
int d_readpipe; // and it comes out here for the client
void worker();
};