""" Athena binary file reader
At the moment, this reader assumes the conserved fields are in the
files. The Athena custom Grid has methods to do vx = M1 / d, etc.
"""
from __future__ import print_function
import os
import re
import numpy as np
from viscid import glob2
from viscid.readers import vfile
from viscid.readers.vfile_bucket import ContainerFile
from viscid.readers import athena
from viscid import coordinate
[docs]class AthenaBinFile(athena.AthenaFile, ContainerFile): # pylint: disable=abstract-method
"""An Athena binary file reader"""
_detector = r"^\s*(.*)\.([0-9]+)\.(bin)\s*$"
_def_fld_center = "Cell"
_collection = None
_fwrapper = None
float_type_name = None
var_type = None
_crds = None
def __init__(self, fname, crds=None, float_type_name=None, var_type=None,
**kwargs):
"""
Keyword Arguments:
float_type_name (str): should be 'f4' or 'f8' if you know
the data type of the file's data.
var_type (str): either 'cons' or 'prim'
"""
# there is no parent bucket, so we need to new one up for children
self.float_type_name = float_type_name
self.var_type = var_type
self._crds = crds
super(AthenaBinFile, self).__init__(fname, **kwargs)
[docs] @classmethod
def group_fnames(cls, fnames):
return athena.group_athena_files_common(cls._detector, fnames)
[docs] @classmethod
def collective_name_from_group(cls, fnames):
return athena.athena_collective_name_from_group(cls._detector,
fnames)
[docs] def get_file_wrapper(self, filename):
if self._fwrapper is None:
self._fwrapper = AthenaBinFileWrapper(filename,
float_type_name=self.float_type_name,
var_type=self.var_type)
else:
assert (self._fwrapper.filename == filename or
glob2(self._fwrapper.filename) == glob2(filename))
return self._fwrapper
[docs] def set_file_wrapper(self, wrapper):
raise NotImplementedError("This must be done at file init")
[docs] def load(self, fname):
if isinstance(fname, list):
self._collection = fname
else:
self._collection = [fname]
fname0 = self._collection[0]
fname1 = self.collective_name(fname)
basename = os.path.basename(fname0)
self.set_info('run', re.match(self._detector, basename).group(1))
super(AthenaBinFile, self).load(fname1)
def _parse(self):
if self._crds is None:
self._crds = self._make_crds(self._collection[0])
if len(self._collection) == 1:
# load a single file
_grid = self._parse_file(self.fname, self)
self.add(_grid)
self.activate(0)
else:
# load each file, and add it to teh bucket
data_temporal = self._make_dataset(self, dset_type="temporal",
name="AthenaTemporalCollection")
for fname in self._collection:
f = self._load_child_file(fname, index_handle=False,
file_type=type(self),
crds=self._crds,
float_type_name=self.float_type_name,
var_type=self.var_type)
data_temporal.add(f)
data_temporal.activate(0)
self.add(data_temporal)
self.activate(0)
def _parse_file(self, filename, parent_node):
# we do minimal file parsing here for performance. we just
# make data wrappers from the templates we got from the first
# file in the group, and package them up into grids
# find the time from the first field's meta data
_file_wrapper = self.get_file_wrapper(filename)
_file_wrapper.read_header()
time = _file_wrapper.time
_grid = self._make_grid(parent_node, name="<AthenaGrid>")
self.time = time
_grid.time = time
_grid.set_crds(self._crds)
# make a DataWrapper and a Field for each template that we
# have from the first file that we parsed, then add it to
# the _grid
data_wrapper = AthenaBinDataWrapper
for i, fld_name in enumerate(_file_wrapper.fld_names):
if self._def_fld_center.lower() == "cell":
shape = self._crds.shape_cc
else:
shape = self._crds.shape_nc
data = data_wrapper(_file_wrapper, fld_name,
shape[::-1], i)
fld = self._make_field(_grid, "Scalar", fld_name,
self._crds, data, time=time,
center=self._def_fld_center,
zyx_native=True)
_grid.add_field(fld)
return _grid
def _make_crds(self, filename):
fw = AthenaBinFileWrapper(filename, keep_crd_clist=True,
float_type_name=self.float_type_name,
var_type=self.var_type)
with fw as f:
crd_clist = f.crd_clist
new_clist = []
dxmin = np.inf
for c in crd_clist:
if len(c[1]) > 1:
dxmin = np.min([dxmin, np.min(c[1][1:] - c[1][:-1])])
for i, cli in enumerate(crd_clist):
cc = cli[1]
try:
hd = 0.5 * (cc[1:] - cc[:-1])
nc = np.hstack([cc[0] - hd[0],
cc[:-1] + hd,
cc[-1] + hd[-1]])
except IndexError:
dxminh = 0.5 * dxmin
nc = np.array([cc[0] - dxminh, cc[0] + dxminh])
new_clist.append([crd_clist[i][0], nc])
crds = coordinate.wrap_crds("nonuniform_cartesian", new_clist[::-1])
return crds
[docs]class AthenaBinFileWrapper(object):
"""A File-like object for interfacing with Athena binary files
Attributes:
float_type_name (str): default float data type, should be
'f4' or 'f8'; deufaults to double ('f8')
"""
# translation is is purely for convenience
float_type_name = "f8"
var_type = "cons"
_file = None
_loc_after_header = None
_endian = None
_float_dtype = None # = np.dtype(_endian + float_type_name)
filename = None
keep_crd_clist = None
fld_names = None
nvars = None
nscalars = None
shape = None
count = None
_file_meta = None
time = None
dt = None
crd_clist = None
def __init__(self, filename, keep_crd_clist=False, float_type_name=None,
var_type=None):
self.filename = filename
self.keep_crd_clist = keep_crd_clist
if float_type_name is not None:
self.float_type_name = float_type_name
if var_type is not None:
self.var_type = var_type
def __del__(self):
self.close()
@property
def float_dtype(self):
if self._float_dtype is None:
with self as _:
# just opening the file makes it read the meta data
pass
return self._float_dtype
@property
def field_names(self):
if self._fld_names_lookup is None:
with self as _:
# just opening the file makes it read the meta data
pass
[docs] def read_field(self, fld_id):
"""Read a field given a seekable location
Parameters:
fld_id(int): number of field in file
Returns:
tuple array
"""
if fld_id >= self.nvars:
raise IndexError("File {0} only has {1} fields, you asked for "
"fld number {2}".format(self.filename,
self.nvars, fld_id))
fld_size_bytes = self.count * self._float_dtype.itemsize
self._file.seek(self._loc_after_header + fld_id * fld_size_bytes)
data = np.fromfile(self._file, dtype=self._float_dtype,
count=self.count)
# return ndarray as native endian
return data.astype(self._float_dtype.name)
[docs] def open(self):
if self._file is None:
self._file = open(self.filename, 'rb')
try:
if self._endian is None:
self._read_file_header()
except IOError as e:
self.close()
raise e
@property
def isopen(self):
return self._file is not None
[docs] def close(self):
if self._file is not None:
f = self._file
self._file = None
f.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, value, traceback):
self.close()
def _read_file_header(self):
"""load up the file's meta data"""
self._file.seek(0, 0)
coordsys = np.fromfile(self._file, dtype="<i", count=1)[0]
dims = np.fromfile(self._file, dtype="<i", count=5)
# if nvar makes sense, we were right, use little endian
if dims[3] < 1000:
self._endian = "<"
else:
self._endian = ">"
coordsys = coordsys.byteswap()
dims = dims.byteswap()
nx, ny, nz = dims[:3]
nvars, nscalars = dims[3:5] # pylint: disable=unused-variable
dtyp_int = np.dtype(self._endian + "i4") # 32bit int
self._float_dtype = np.dtype(self._endian + self.float_type_name)
# ignore self_gravity and particles flags for now
_, _ = np.fromfile(self._file, dtype=dtyp_int, count=2)
# ignore gamm1 and cs for now
_, _ = np.fromfile(self._file, dtype=self._float_dtype, count=2)
# determine field names from hints about how athena was run
# for BAROTROPIC EOS HDYRO: NVAR = 4 + NSCALARS
# for BAROTROPIC EOS MHD: NVAR = 7 + NSCALARS
# for ADIABATIC HDYRO: NVAR = 5 + NSCALARS
# for ADIABATIC MHD: NVAR = 8 + NSCALARS
if self.var_type == "cons":
_all_fld_names = ['d', 'M1', 'M2', 'M3', 'E', "B1", "B2", "B3"]
elif self.var_type == "prim":
_all_fld_names = ['d', 'V1', 'V2', 'V3', 'P', "B1c", "B2c", "B3c"]
else:
raise RuntimeError("var_type must be cons or prim")
nprim_flds = nvars - nscalars
fld_names = _all_fld_names[:4]
# adiabatic runs have an 'E' (energy)
if nprim_flds in (5, 8):
fld_names.append(_all_fld_names[4])
# MHD runs have magnetic fields
if nprim_flds in (7, 8):
fld_names.extend(_all_fld_names[5:8])
fld_names.extend((str(i) for i in range(nprim_flds, nvars)))
self.fld_names = fld_names
self.time, self.dt = np.fromfile(self._file, dtype=self._float_dtype,
count=2)
x = np.fromfile(self._file, dtype=self._float_dtype, count=nx)
y = np.fromfile(self._file, dtype=self._float_dtype, count=ny)
z = np.fromfile(self._file, dtype=self._float_dtype, count=nz)
if self.keep_crd_clist:
self.crd_clist = [('z', z.astype(self._float_dtype.name)),
('y', y.astype(self._float_dtype.name)),
('x', x.astype(self._float_dtype.name))]
self.nvars = nvars
self.nscalars = nscalars
self.shape = (nz, ny, nx)
self.count = np.prod(self.shape)
self._loc_after_header = self._file.tell()
return None
[docs]class AthenaBinDataWrapper(vfile.DataWrapper):
file_wrapper = None
filename = None
fld_name = None
expected_shape = None
fld_number = None
def __init__(self, file_wrapper, fld_name, expected_shape, fld_number):
"""Lazy wrapper for a field in a Fortbin file
Parameters:
expected_shape (tuple): shape of data in the file (zyx)
"""
super(AthenaBinDataWrapper, self).__init__()
self.file_wrapper = file_wrapper
self.filename = file_wrapper.filename
self.fld_name = fld_name
self.expected_shape = expected_shape
self.fld_number = fld_number
@property
def shape(self):
"""
Returns:
zyx shape since that's the shape __array__ returns
"""
return self.expected_shape
@property
def dtype(self):
return self.file_wrapper.float_dtype
def __array__(self, *args, **kwargs):
with self.file_wrapper as f:
arr = f.read_field(self.fld_number).reshape(self.expected_shape)
return arr.astype(self.dtype)
[docs] def read_direct(self, *args, **kwargs):
return self.__array__()
[docs] def len(self):
return self.shape[0]
def __getitem__(self, item):
return self.__array__().__getitem__(item)
##
## EOF
##