ColumnStoreWriter/Reader support for dictionary of arrays in single file

albatross
Greg Hogan 2020-04-13 19:04:52 -07:00
parent 585a6fe2d8
commit efbb4aac2e
1 changed files with 67 additions and 13 deletions

View File

@ -6,7 +6,7 @@ from contextlib import closing
from common.file_helpers import mkdirs_exists_ok
class ColumnStoreReader():
def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False):
def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False, prefix="", np_data=None):
if not (path and os.path.isdir(path)):
raise ValueError("Not a column store: {}".format(path))
@ -15,6 +15,9 @@ class ColumnStoreReader():
self._mmap = mmap
self._allow_pickle = allow_pickle
self._direct_io = direct_io
self._is_dict = 'columnstore' in self._keys
self._prefix = prefix
self._np_data = np_data
@property
def path(self):
@ -30,6 +33,11 @@ class ColumnStoreReader():
return None
def keys(self):
if self._is_dict:
if not self._np_data:
self._np_data = self._load(os.path.join(self._path, 'columnstore'))
return self._np_data.keys()
return list(self._keys)
def iteritems(self):
@ -49,25 +57,42 @@ class ColumnStoreReader():
else:
return None
def _load(self, path):
if self._mmap:
# note that direct i/o does nothing for mmap since file read/write interface is not used
return np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False)
if self._direct_io:
opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT)
with open(path, 'rb', buffering=0, opener=opener) as f:
return np.load(f, allow_pickle=self._allow_pickle, fix_imports=False)
return np.load(path, allow_pickle=self._allow_pickle, fix_imports=False)
def __getitem__(self, key):
try:
path = os.path.join(self._path, key)
if self._is_dict:
path = os.path.join(self._path, 'columnstore')
else:
path = os.path.join(self._path, key)
# TODO(mgraczyk): This implementation will need to change for zip.
if os.path.isdir(path):
if not self._is_dict and os.path.isdir(path):
return ColumnStoreReader(path)
else:
if self._mmap:
# note that direct i/o does nothing for mmap since file read/write interface is not used
ret = np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False)
if self._is_dict and self._np_data:
ret = self._np_data
else:
if self._direct_io:
opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT)
with open(path, 'rb', buffering=0, opener=opener) as f:
ret = np.load(f, allow_pickle=self._allow_pickle, fix_imports=False)
else:
ret = np.load(path, allow_pickle=self._allow_pickle, fix_imports=False)
ret = self._load(path)
if type(ret) == np.lib.npyio.NpzFile:
if self._is_dict:
if self._prefix+key in ret.keys():
return ret[self._prefix+key]
if any(k.startswith(self._prefix+key+"/") for k in ret.keys()):
return ColumnStoreReader(self._path, mmap=self._mmap, allow_pickle=self._allow_pickle, direct_io=self._direct_io, prefix=self._prefix+key+"/", np_data=ret)
raise KeyError(self._prefix+key)
# if it's saved as compressed, it has arr_0 only in the file. deref this
return ret['arr_0']
else:
@ -128,6 +153,27 @@ class ColumnStoreWriter():
# TODO(mgraczyk): This implementation will need to change if we add zip or compression.
return ColumnStoreWriter(os.path.join(self._path, group_name))
def add_dict(self, data, dtype=None, compression=False, overwrite=False):
# default name exists to have backward compatibility with equivalent directory structure
npy_path = os.path.join(self._path, "columnstore")
mkdirs_exists_ok(os.path.dirname(npy_path))
flat_dict = dict()
_flatten_dict(flat_dict, "", data)
for k, v in flat_dict.items():
flat_dict[k] = np.array(v, copy=False, dtype=dtype)
if overwrite:
f = open(npy_path, "wb")
else:
f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb")
with closing(f) as f:
if compression:
np.savez_compressed(f, **flat_dict)
else:
np.savez(f, **flat_dict)
def close(self):
pass
@ -135,6 +181,15 @@ class ColumnStoreWriter():
def __exit__(self, type, value, traceback): self.close()
def _flatten_dict(flat, ns, d):
for k, v in d.items():
p = (ns + "/" if len(ns) else "") + k
if isinstance(v, collections.Mapping):
_flatten_dict(flat, p, v)
else:
flat[p] = v
def _save_dict_as_column_store(values, writer, compression):
for k, v in values.items():
if isinstance(v, collections.Mapping):
@ -147,4 +202,3 @@ def save_dict_as_column_store(values, output_path, compression=False):
with ColumnStoreWriter(output_path) as writer:
_save_dict_as_column_store(values, writer, compression)