#!/usr/bin/env python
# FIXME: the timetype=list does not work for the amr sample, goto line 177
from __future__ import print_function
import os
import sys
# from xml.etree import ElementTree
import numpy as np
from viscid.compat import element_tree
from viscid import logger
from viscid.readers.vfile_bucket import ContainerFile
from viscid.readers.hdf5 import FileLazyHDF5
from viscid import amr_grid
from viscid import coordinate
# class XDMFDataItem(data_item.DataItem):
# def set_precision():
# nptype = np.dtype({'Float': 'float', 'Int': 'int', 'UInt': 'unit',
# 'Char': 'int', 'UChar': 'int'}[numbertype] + str(8*precision))
[docs]class FileXDMF(ContainerFile): # pylint: disable=abstract-method
""" on init, parse an xdmf file into datasets, grids, and fields """
_detector = r".*\.(xmf|xdmf)\s*$"
_xdmf_defaults = {
"Attribute": {
"Name": None,
"AttributeType": "Scalar", # Vector,Tensor,Tensor6,Matrix,GlobalID
"Center": "node" # cell,Grid,face,edge
},
"DataItem": {
"Name": None,
"ItemType": "Uniform", # Collection, tree,
# HyperSlab, coordinates, Funciton
"Dimensions": None, # KJI order
"NumberType": "Float", # Int, UInt, Char, UChar
"Precision": 4, # 1, 4, 8
"Format": "XML", # HDF, Binary
"Endian": "Native", # Big, Little
"Compression": "Raw", # Zlib, Bzip2
"Seek": 0
},
"Domain": {
"Name": None
},
"Geometry": {
"GeometryType": "XYZ" # XY, X_Y_Z, VxVyVz, Origin_DxDyDz
},
"Grid": {
"Name": None,
"GridType": "Uniform", # Collection,Tree,Subset
"CollectionType": "Spatial", # Temporal
"Section": "DataItem" # All
},
"Information": {
"Name": None,
"Value": None
},
"Xdmf": {
"Version": None
},
"Topology": {
"Name": None,
"TopologyType": None, # Polyvertex | Polyline | Polygon |
# Triangle | Quadrilateral | Tetrahedron |
# Pyramid| Wedge | Hexahedron | Edge_3 |
# Triagle_6 | Quadrilateral_8 |
# Tetrahedron_10 | Pyramid_13 | Wedge_15 |
# Hexahedron_20 | Mixed |
# 2DSMesh | 2DRectMesh | 2DCoRectMesh |
# 3DSMesh | 3DRectMesh | 3DCoRectMesh
"NodesPerElement": None,
"NumberOfElement": None,
"Dimensions": None,
"Order": None,
"BaseOffset": 0
},
"Time": {
"Type": "Single",
"Value": None
}
}
h5_root_dir = None
_last_amr_skeleton = None # experimental, should be moved
# tree = None
def __init__(self, fname, h5_root_dir=None, **kwargs):
"""XDMF File"""
if h5_root_dir is not None:
h5_root_dir = os.path.expandvars(h5_root_dir)
self.h5_root_dir = os.path.expanduser(h5_root_dir)
super(FileXDMF, self).__init__(fname, **kwargs)
def _parse(self):
grids = self._parse_file(self.fname, self)
for grid in grids:
self.add(grid)
if len(self.children) > 0:
self.activate(0)
def _parse_file(self, fname, parent_node):
# lxml has better xpath support, so it's preferred, but it stops
# if an xinclude doesn't exist, so for now use our custom extension
# of the default python xml lib
# if HAS_LXML:
# # sweet, you have it... use the better xml library
# tree = etree.parse(self.fname) # pylint: disable=E0602
# tree.xinclude() # TODO: gracefully ignore include problems
# root = tree.getroot()
grids = []
tree = element_tree.parse(fname)
element_tree.xinclude(tree, base_url=fname)
root = tree.getroot()
# search for all root grids, and parse them
domain_grids = root.findall("./Domain/Grid")
for dg in domain_grids:
grd = self._parse_grid(dg, parent_node)
grids.append(grd)
return grids
def _fill_attrs(self, el):
defs = self._xdmf_defaults[el.tag]
ret = {}
for opt, defval in defs.items():
if defval is None:
ret[opt] = el.get(opt)
else:
ret[opt] = type(defval)(el.get(opt, defval))
return ret
def _parse_grid(self, el, parent_node=None, time=None):
attrs = self._fill_attrs(el)
grd = None
crds = None
# parse topology, or cascade parent grid's topology
topology = el.find("./Topology")
topoattrs = None
if topology is not None:
topoattrs = self._fill_attrs(topology)
elif parent_node and parent_node.topology_info:
topoattrs = parent_node.topology_info
# parse geometry, or cascade parent grid's geometry
geometry = el.find("./Geometry")
geoattrs = None
if geometry is not None:
crds, geoattrs = self._parse_geometry(geometry, topoattrs)
elif parent_node and parent_node.geometry_info:
geoattrs = parent_node.geometry_info
crds = parent_node.crds # this can be None and that's ok
# parse time
if time is None:
t = el.find("./Time")
if t is not None:
pt, tattrs = self._parse_time(t)
if tattrs["Type"] == "Single":
time = pt
# cascade a parent grid's time
if time is None and parent_node and parent_node.time is not None:
time = parent_node.time
gt = attrs["GridType"]
if gt == "Collection":
times = None
ct = attrs["CollectionType"]
if ct == "Temporal":
grd = self._make_dataset(parent_node, dset_type="temporal",
name=attrs["Name"])
self._inject_info(el, grd)
ttag = el.find("./Time")
if ttag is not None:
times, tattrs = self._parse_time(ttag)
elif ct == "Spatial":
grd = self._make_dataset(parent_node, name=attrs["Name"])
self._inject_info(el, grd)
else:
logger.warning("Unknown collection type %s, ignoring grid", ct)
for i, subgrid in enumerate(el.findall("./Grid")):
t = times[i] if (times is not None and i < len(times)) else time
# print(subgrid, grd, t)
self._parse_grid(subgrid, parent_node=grd, time=time)
if len(grd.children) > 0:
grd.activate(0)
elif gt == "Uniform":
if not (topoattrs and geoattrs):
logger.warning("Xdmf Uniform grids must have "
"topology / geometry.")
else:
grd = self._make_grid(parent_node, name=attrs["Name"],
**self._grid_opts)
self._inject_info(el, grd)
for attribute in el.findall("./Attribute"):
fld = self._parse_attribute(grd, attribute, crds,
topoattrs, time)
if time:
fld.time = time
grd.add_field(fld)
elif gt == "Tree":
logger.warning("Xdmf Tree Grids not implemented, ignoring "
"this grid")
elif gt == "Subset":
logger.warning("Xdmf Subset Grids not implemented, ignoring "
"this grid")
else:
logger.warning("Unknown grid type %s, ignoring this grid", gt)
# fill attributes / data items
# if grid and gt == "Uniform":
# for a in el.findall("./Attribute"):
# fld = self._parse_attribute(a)
# grid.add_field(fld)
if grd:
if time is not None:
grd.time = time
if topoattrs is not None:
grd.topology_info = topoattrs
if geoattrs is not None:
grd.geometry_info = geoattrs
if crds is not None:
grd.set_crds(crds)
# EXPERIMENTAL AMR support, _last_amr_grid shouldn't be an attribute
# of self, since that will only remember the most recently generated
# amr grid, but that's ok for now
# if gt == "Uniform":
# print(">!", crds._TYPE, crds.xl_nc, grd.time)
# print(">!?", type(parent_node), parent_node.children._ordered,
# len(parent_node.children))
if gt == "Collection" and ct == "Spatial":
grd, is_amr = amr_grid.dataset_to_amr_grid(grd,
self._last_amr_skeleton)
if is_amr:
self._last_amr_skeleton = grd.skeleton
if parent_node is not None:
parent_node.add(grd)
return grd # can be None
def _parse_geometry(self, geo, topoattrs):
""" geo is the element tree item, returns Coordinate object and
xml attributes """
geoattrs = self._fill_attrs(geo)
# crds = None
crdlist = None
crdtype = None
crdkwargs = {}
topotype = topoattrs["TopologyType"]
# parse geometry into crds
geotype = geoattrs["GeometryType"]
if geotype.upper() == "XYZ":
data, attrs = self._parse_dataitem(geo.find("./DataItem"),
keep_flat=True)
# x = data[0::3]
# y = data[1::3]
# z = data[2::3]
# crdlist = (('z', z), ('y', y), ('x', x))
# quietly do nothing... we don't support unstructured grids
# or 3d spherical yet, and 2d spherical can be figured out
# if we assume the grid spans the whole sphere
crdlist = None
elif geotype.upper() == "XY":
data, attrs = self._parse_dataitem(geo.find("./DataItem"),
keep_flat=True)
# x = data[0::2]
# y = data[1::2]
# z = np.zeros(len(x))
# crdlist = (('z', z), ('y', y), ('x', x))
# quietly do nothing... we don't support unstructured grids
# or 3d spherical yet, and 2d spherical can be figured out
# if we assume the grid spans the whole sphere
crdlist = None
elif geotype.upper() == "X_Y_Z":
crdlookup = {'x': 0, 'y': 1, 'z': 2}
crdlist = [['x', None], ['y', None], ['z', None]]
# can't use ./DataItem[@Name='X'] so python2.6 works
dataitems = geo.findall("./DataItem")
for di in dataitems:
crd_name = di.attrib["Name"].lower()
data, attrs = self._parse_dataitem(di, keep_flat=True)
crdlist[crdlookup.pop(crd_name)][1] = data
if len(crdlookup) > 0:
raise RuntimeError("XDMF format error: Coords not specified "
"for {0} dimesions"
"".format(list(crdlookup.keys())))
crdkwargs["full_arrays"] = True
elif geotype.upper() == "VXVYVZ":
crdlookup = {'x': 0, 'y': 1, 'z': 2}
crdlist = [['x', None], ['y', None], ['z', None]]
# can't use ./DataItem[@Name='VX'] so python2.6 works
dataitems = geo.findall("./DataItem")
for di in dataitems:
crd_name = di.attrib["Name"].lstrip('V').lower()
data, attrs = self._parse_dataitem(di, keep_flat=True)
crdlist[crdlookup.pop(crd_name)][1] = data
if len(crdlookup) > 0:
raise RuntimeError("XDMF format error: Coords not specified "
"for {0} dimesions"
"".format(list(crdlookup.keys())))
crdkwargs["full_arrays"] = True
elif geotype.upper() == "ORIGIN_DXDYDZ":
# this is for grids with uniform spacing
dataitems = geo.findall("./DataItem")
data_o, _ = self._parse_dataitem(dataitems[0])
data_dx, _ = self._parse_dataitem(dataitems[1])
dtyp = data_o.dtype
nstr = None
if topoattrs["Dimensions"]:
nstr = topoattrs["Dimensions"]
elif topoattrs["NumberOfElements"]:
nstr = topoattrs["NumberOfElements"]
else:
raise ValueError("ORIGIN_DXDYDZ has no number of elements...")
n = [int(num) for num in nstr.split()]
# FIXME: OpenGGCM output uses ZYX ordering even though the xdmf
# website says it should be XYZ, BUT, the file opens correctly
# in Paraview with zyx, so... I guess i need to do this [::-1]
# nonsense here
data_o, data_dx, n = data_o[::-1], data_dx[::-1], n[::-1]
crdlist = [None] * 3
for i, crd in enumerate(['x', 'y', 'z']):
n_nc, n_cc = n[i], n[i] - 1
crd_arr = [data_o[i], data_o[i] + (n_cc * data_dx[i]), n_nc]
crdlist[i] = (crd, crd_arr)
crdkwargs["dtype"] = dtyp
crdkwargs["full_arrays"] = False
else:
logger.warning("Invalid GeometryType: %s", geotype)
if topotype in ['3DCoRectMesh', '2DCoRectMesh']:
crdtype = "uniform_cartesian"
elif topotype in ['3DRectMesh', '2DRectMesh']:
if crdkwargs.get("full_arrays", True):
crdtype = "nonuniform_cartesian"
else: # HACK, hopefully not used ever
crdtype = "uniform_cartesian"
elif topotype in ['2DSMesh']:
crdtype = "uniform_spherical" # HACK!
######## this doesn't quite work, but it's too heavy to be useful
######## anyway... if we assume a 2d spherical grid spans the
######## whole sphere, and radius doesnt matter, all we need are
######## the nr_phis / nr_thetas, so let's just do that
# # this asserts that attrs["Dimensions"] will have the xyz
# # dimensions
# # turn x, y, z -> phi, theta, r
# dims = [int(s) for
# s in reversed(topoattrs["Dimensions"].split(' '))]
# dims = [1] * (3 - len(dims)) + dims
# nr, ntheta, nphi = [d for d in dims]
# # dtype = crdlist[0][1].dtype
# # phi, theta, r = [np.empty((n,), dtype=dtype) for n in dims]
# x, y, z = (crdlist[i][1].reshape(dims) for i in range(3))
# nphitheta = nphi * ntheta
# r = np.sqrt(x[::nphitheta, 0, 0]**2 + y[::nphitheta, 0, 0]**2 +
# z[::nphitheta, 0, 0]**2)
# ir = nr // 2 # things get squirrly near the extrema
# theta = (180.0 / np.pi) * \
# (np.arccos(z[ir, :, ::nphi] / r[ir]).reshape(-1))
# itheta = ntheta // 2
# phi = (180.0 / np.pi) * \
# np.arctan2(y[ir, itheta, :], x[ir, itheta, :])
# print(dims, nr, ntheta, nphi)
# print("r:", r.shape, r)
# print("theta:", theta.shape, theta)
# print("phi:", phi.shape, phi)
# raise RuntimeError()
######## general names in spherical crds
# ntheta, nphi = [int(s) for s in topoattrs["Dimensions"].split(' ')]
# crdlist = [['theta', [0.0, 180.0, ntheta]],
# ['phi', [0.0, 360.0, nphi]]]
######## names on a map
ntheta, nphi = [int(s) for s in topoattrs["Dimensions"].split(' ')]
crdlist = [['phi', [0.0, 360.0, nphi]],
['theta', [0.0, 180.0, ntheta]]]
crdkwargs["full_arrays"] = False
crdkwargs["units"] = 'deg'
elif topotype in ['3DSMesh']:
raise NotImplementedError("3D spherical grids not yet supported")
else:
raise NotImplementedError("Unstructured grids not yet supported")
crds = coordinate.wrap_crds(crdtype, crdlist, **crdkwargs)
return crds, geoattrs
def _parse_attribute(self, parent_node, item, crds, topoattrs, time=0.0):
"""
Args:
parent_node (Dataset, Grid, or None): Hint what the parent will
be. Necessary if _make_field makes decisions based on the
info dict
"""
attrs = self._fill_attrs(item)
data, dataattrs = self._parse_dataitem(item.find("./DataItem"))
name = attrs["Name"]
center = attrs["Center"]
fldtype = attrs["AttributeType"]
fld = self._make_field(parent_node, fldtype, name, crds, data,
center=center, time=time, zyx_native=True)
self._inject_info(item, fld)
return fld
def _parse_dataitem(self, item, keep_flat=False):
""" returns the data as a numpy array, or HDF data item """
attrs = self._fill_attrs(item)
dimensions = attrs["Dimensions"]
if dimensions:
dimensions = [int(d) for d in dimensions.split(' ')]
numbertype = attrs["NumberType"]
precision = attrs["Precision"]
nptype = np.dtype({'Float': 'f', 'Int': 'i', 'UInt': 'u',
'Char': 'i', 'UChar': 'u'}[numbertype] + str(precision))
fmt = attrs["Format"]
if fmt == "XML":
arr = np.fromstring(item.text, sep=' ', dtype=nptype)
if dimensions and not keep_flat:
arr = arr.reshape(dimensions)
return arr, attrs
if fmt == "HDF":
fname, loc = item.text.strip().split(':')
# FIXME: startswith '/' is unix path name specific
if self.h5_root_dir is not None:
fname = os.path.join(self.h5_root_dir, fname)
elif not fname.startswith('/'):
fname = os.path.join(self.dirname, fname)
h5file = self._load_child_file(fname, index_handle=False,
file_type=FileLazyHDF5)
arr = h5file.get_data(loc)
return arr, attrs
if fmt == "Binary":
raise NotImplementedError("binary xdmf data not implemented")
logger.warning("Invalid DataItem Format.")
return (None, None)
def _parse_time(self, timetag):
""" returns the time(s) as float, or numpy array, time attributes"""
attrs = self._fill_attrs(timetag)
timetype = attrs["Type"]
if timetype == 'Single':
return float(timetag.get('Value')), attrs
elif timetype == 'List':
return self._parse_dataitem(timetag.find('.//DataItem'))[0], attrs
elif timetype == 'Range':
raise NotImplementedError("Time Range not yet implemented")
# dat, dattrs = self._parse_dataitem(timetag.find('.//DataItem'))
# TODO: this is not the most general, but I think it'll work
# as a first stab, plus I will probably not need Range ever
# tgridtag = timetag.find("ancestor::Grid[@GridType='Collection']"
# "[@CollectionType='Temporal'][1]"))
# n = len(tgridtag.find(.//Grid[@GridType='Collection']
# [CollectionType=['Spatial']]))
# return np.linspace(dat[0], dat[1], n)
# return np.arange(dat[0], dat[1])
elif timetype == 'HyperSlab':
dat, dattrs = self._parse_dataitem(timetag.find('.//DataItem'))
arr = np.array([dat[0] + i * dat[1] for i in range(int(dat[2]))])
return arr, attrs
else:
logger.warning("invalid TimeType.\n")
def _parse_information(self, information):
""" parse generic information tag """
attrs = self._fill_attrs(information)
name = attrs["Name"]
val = attrs["Value"]
if val is None:
_di = information.find(".//DataItem")
if _di:
val, _ = self._parse_dataitem(_di)
else:
val = information.text
return name, val
def _inject_info(self, el, target):
for _, information in enumerate(el.findall("./Information")):
_name, _val = self._parse_information(information)
target.set_info(_name, _val)
def _main():
import viscid
f = FileXDMF(os.path.join(viscid.sample_dir, 'local_0001.py_0.xdmf'))
sys.stderr.write("{0}\n".format(f))
return 0
if __name__ == '__main__':
sys.exit(_main())
##
## EOF
##