Source code for viscid.readers.athena

""" athena reader common stuff """

from __future__ import print_function
import re
import os
from operator import itemgetter

import numpy as np
try:
    import numexpr
    _has_numexpr = True
except ImportError:
    _has_numexpr = False

from viscid import field
from viscid import grid
from viscid.calculator import plasma

[docs]def group_athena_files_common(detector, fnames): infolst = [] for name in fnames: m = re.match(detector, name) grps = m.groups() d = dict(runname=grps[0], time=int(grps[1]), fname=m.string) infolst.append(d) # careful with sorting, only consecutive files will be grouped infolst.sort(key=itemgetter("time")) infolst.sort(key=itemgetter("runname")) info_groups = [] info_group = [infolst[0]] for info in infolst[1:]: last = info_group[-1] if info['runname'] == last['runname']: info_group.append(info) else: info_groups.append(info_group) info_group = [info] info_groups.append(info_group) # turn info_groups into groups of just file names groups = [] for info_group in info_groups: groups.append([info['fname'] for info in info_group]) return groups
[docs]def athena_collective_name_from_group(detector, fnames): fname0 = fnames[0] basename = os.path.basename(fname0) m = re.match(detector, basename) run = m.group(1) ext = m.group(3) new_basename = "{0}.STAR.{1}".format(run, ext) return os.path.join(os.path.dirname(fname0), new_basename)
[docs]class AthenaGrid(grid.Grid): def _get_rr(self): return self['d'] def _get_p(self): # TODO: calc pressure from conserved fields return self['P'] def _get_pp(self): return self['p'] def _get_bx(self): try: f = self['B1c'] except KeyError: f = self['B1'] f.name = "bx" f.pretty_name = "$B_x$" return f def _get_by(self): try: f = self['B2c'] except KeyError: f = self['B2'] f.name = "by" f.pretty_name = "$B_y$" return f def _get_bz(self): try: f = self['B3c'] except KeyError: f = self['B3'] f.name = "bz" f.pretty_name = "$B_z$" return f def _get_vx(self): try: f = self['V1'] except KeyError: f = self['M1'] / self['d'] f.name = "vx" f.pretty_name = "$V_x$" return f def _get_vy(self): try: f = self['V2'] except KeyError: f = self['M2'] / self['d'] f.name = "vy" f.pretty_name = "$V_y$" return f def _get_vz(self): try: f = self['V3'] except KeyError: f = self['M3'] / self['d'] f.name = "bz" f.pretty_name = "$V_z$" return f def _get_mx(self): try: f = self['M1'] except KeyError: f = self['V1'] * self['d'] f.name = "mx" f.pretty_name = "$M_x$" return f def _get_my(self): try: f = self['M2'] except KeyError: f = self['V2'] * self['d'] f.name = "my" f.pretty_name = "$M_y$" return f def _get_mz(self): try: f = self['M3'] except KeyError: f = self['V3'] * self['d'] f.name = "mz" f.pretty_name = "$M_z$" return f def _get_jx(self): f = self['j'].component_fields()[0] f.pretty_name = "$J_x$" return f def _get_jy(self): f = self['j'].component_fields()[1] f.pretty_name = "$J_y$" return f def _get_jz(self): f = self['j'].component_fields()[2] f.pretty_name = "$J_z$" return f def _get_b(self): bx, by, bz = self['bx'], self['by'], self['bz'] opts = dict(_force_layout=self.force_vector_layout) return field.scalar_fields_to_vector([bx, by, bz], name="B", **opts) def _get_v(self): vx, vy, vz = self['vx'], self['vy'], self['vz'] opts = dict(_force_layout=self.force_vector_layout) return field.scalar_fields_to_vector([vx, vy, vz], name="V", **opts) def _get_j(self): # b = self['b'] # j = calc.curl(b) # j.name = 'j' # j.pretty_name = "J" # return j # TODO: make sure curl works with 2.5-D fields and 1-D fields raise NotImplementedError("Haven't hooked up the curl yet to get J") @staticmethod def _calc_mag(vx, vy, vz): if _has_numexpr: vmag = numexpr.evaluate("sqrt(vx**2 + vy**2 + vz**2)") return vx.wrap(vmag, fldtype="Scalar") else: vmag = np.sqrt(vx**2 + vy**2 + vz**2) return vmag def _get_bmag(self): bx, by, bz = self['bx'], self['by'], self['bz'] bmag = self._calc_mag(bx, by, bz) bmag.name = "|B|" return bmag def _get_speed(self): vx, vy, vz = self['vx'], self['vy'], self['vz'] vmag = self._calc_mag(vx, vy, vz) vmag.name = "|V|" return vmag def _get_jmag(self): jx, jy, jz = self['j'].component_fields() jmag = self._calc_mag(jx, jy, jz) jmag.name = "|J|" return jmag def _get_beta(self): return plasma.calc_beta(self['pp'], self['b']) def _get_psi(self): return plasma.calc_psi(self['b'])
[docs]class AthenaFile(object): """Mixin some Athena convenience stuff""" _grid_type = AthenaGrid
## ## EOF ##