Source code for viscid.calculator.mpause

#!/usr/bin/env python
"""Minimum Variance Analysis and boundary normal crd tools"""

from __future__ import print_function, division
import os

import numpy as np
import viscid


__all__ = ["paraboloid", "paraboloid_normal", "fit_paraboloid",
           "get_mp_info", "find_mp_edges"]

_dtf = 'f8'
_paraboloid_dt = np.dtype([('x0', _dtf), ('y0', _dtf), ('z0', _dtf),
                           ('ax', _dtf), ('ay', _dtf), ('az', _dtf)])


[docs]def paraboloid(y, z, x0, y0, z0, ax, ay, az): """Generic paraboloid function""" return ax * (((y - y0) / ay)**2 + ((z - z0) / az)**2) + x0
[docs]def paraboloid_normal(y, z, x0, y0, z0, ax, ay, az, normalize=True): # pylint: disable=unused-argument """Normal vector of a generic paraboloid""" dyF = 2.0 * (y - y0) / ay**2 dzF = 2.0 * (z - z0) / az**2 dxF = (-1.0 / ax) * np.ones_like(dyF) normal = np.array([dxF, dyF, dzF]) if normalize: normal = normal / np.linalg.norm(normal, axis=0) return normal
[docs]def fit_paraboloid(fld, p0=(9.0, 0.0, 0.0, 1.0, -1.0, -1.0), tolerance=0.0): """Fit paraboloid it GSE coordinates x ~ y**2 + z**2 Args: fld (:py:class:`viscid.field.ScalarField`): field of x values p0 (sequence): initial guess for parabaloid (x0, y0, z0, ax, ay, az), where (x0, y0, z0) is the nose location (should be subsolar for 0 dipole tilt), and the ay, az, and az coefficients determine the curvature Returns: numpy.recarray: record array of parameters with length 2; the 1st value is the fit value, and the 2nd is one sigma of the fit """ from scipy.optimize import curve_fit def paraboloid_yz(yz, x0, y0, z0, ax, ay, az): return paraboloid(yz[0], yz[1], x0, y0, z0, ax, ay, az) Y, Z = fld.meshgrid_flat(prune=True) popt, pcov = curve_fit(paraboloid_yz, np.vstack((Y, Z)), fld.data.reshape(-1), p0=p0) perr = np.sqrt(np.diag(pcov)) parab = np.recarray([2], dtype=_paraboloid_dt) # wow.. needing a nested loop to fill a recarray is stooopid, # am i missing something about recarrays? for i, vals in zip(range(len(parab)), [popt, perr]): for name, val in zip(parab.dtype.names, vals): parab[i][name] = val if tolerance: for n in parab.dtype.names: if n != "ax" and np.abs(parab[1][n] / parab[0][n]) > tolerance: viscid.logger.warning("paraboloid parameter {0} didn't converge to " "within {1:g}%\n{0} = {2:g} +/- {3:g}" "".format(n, 100 * tolerance, parab[0][n], parab[1][n])) return parab
[docs]def get_mp_info(pp, b, j, e, cache=True, cache_dir=None, slc="x=5.5j:11.0j, y=-4.0j:4.0j, z=-3.6j:3.6j", fit="mp_xloc", fit_p0=(9.0, 0.0, 0.0, 1.0, -1.0, -1.0)): """Get info about m-pause as flattened fields Notes: The first thing this function does is mask locations where the GSE-y current density < 1e-4. This masks out the bow shock and current free regions. This works for southward IMF, but it is not very general. Parameters: pp (ScalarcField): pressure b (VectorField): magnetic field j (VectorField): current density e (VectorField, None): electric field (same centering as b). If None, then the info that requires E will be filled with NaN cache (bool, str): Save to and load from cache, if "force", then don't load from cache if it exists, but do save a cache at the end cache_dir (str): Directory for cache, if None, same directory as that file to which the grid belongs slc (str): slice that gives a box that contains the m-pause fit (str): to which resulting field should the paraboloid be fit, defaults to mp_xloc, but pp_max_xloc might be useful in some circumstances fit_p0 (tuple): Initial guess vector for paraboloid fit Returns: dict: Unless otherwise noted, the entiries are 2D (y-z) fields - **mp_xloc** location of minimum abs(Bz), this works better than max of J^2 for FTEs - **mp_sheath_edge** location where Jy > 0.1 * Jy when coming in from the sheath side - **mp_sphere_edge** location where Jy > 0.1 * Jy when coming in from the sphere side - **mp_width** difference between m-sheath edge and msphere edge - **mp_shear** magnetic shear taken 6 grid points into the m-sheath / m-sphere - **pp_max** max pp - **pp_max_xloc** location of max pp - **epar_max** max e parallel - **epar_max_xloc** location of max e parallel - **paraboloid** numpy.recarray of paraboloid fit. The parameters are given in the 0th element, and the 1st element contains the 1-sigma values for the fit Raises: RuntimeError: if using MHD crds instead of GSE crds """ if not cache_dir: cache_dir = pp.find_info("_viscid_dirname", "./") run_name = pp.find_info("run", None) if cache and run_name: t = pp.time mp_fname = "{0}/{1}.mpause.{2:06.0f}".format(cache_dir, run_name, t) else: mp_fname = "" try: force = cache.strip().lower() == "force" except AttributeError: force = False try: if force or not mp_fname or not os.path.isfile(mp_fname + ".xdmf"): raise IOError() mp_info = {} with viscid.load_file(mp_fname + ".xdmf") as dat: fld_names = ["mp_xloc", "mp_sheath_edge", "mp_sphere_edge", "mp_width", "mp_shear", "pp_max", "pp_max_xloc", "epar_max", "epar_max_xloc"] for fld_name in fld_names: mp_info[fld_name] = dat[fld_name]["x=0"] except (IOError, KeyError): mp_info = {} crd_system = viscid.as_crd_system(b, None) if crd_system != 'gse': raise RuntimeError("get_mp_info can't work in MHD crds, " "switch to GSE please") if j.nr_patches == 1: pp_block = pp[slc] b_block = b[slc] j_block = j[slc] if e is None: e_block = np.nan * viscid.empty_like(j_block) else: e_block = e[slc] else: # interpolate an amr grid so we can proceed obnd = pp.get_slice_extent(slc) dx = np.min(pp.skeleton.L / pp.skeleton.n, axis=0) nx = np.ceil((obnd[1] - obnd[0]) / dx) vol = viscid.seed.Volume(obnd[0], obnd[1], nx, cache=True) pp_block = vol.wrap_field(viscid.interp_trilin(pp, vol), name="P").as_cell_centered() b_block = vol.wrap_field(viscid.interp_trilin(b, vol), name="B").as_cell_centered() j_block = vol.wrap_field(viscid.interp_trilin(j, vol), name="J").as_cell_centered() if e is None: e_block = np.nan * viscid.empty_like(j_block) else: e_block = vol.wrap_field(viscid.interp_trilin(e, vol), name="E").as_cell_centered() # jsq = viscid.dot(j_block, j_block) bsq = viscid.dot(b_block, b_block) # extract ndarrays and mask out bow shock / current free regions maskval = 1e-4 jy_mask = j_block['y'].data < maskval masked_bsq = 1.0 * bsq masked_bsq.data = np.ma.masked_where(jy_mask, bsq) xcc = j_block.get_crd_cc('x') nx = len(xcc) mp_xloc = np.argmin(masked_bsq, axis=0) # indices mp_xloc = mp_xloc.wrap(xcc[mp_xloc.data]) # location pp_max = np.max(pp_block, axis=0) pp_max_xloc = np.argmax(pp_block, axis=0) # indices pp_max_xloc = pp_max_xloc.wrap(xcc[pp_max_xloc.data]) # location epar = viscid.project(e_block, b_block) epar_max = np.max(epar, axis=0) epar_max_xloc = np.argmax(epar, axis=0) # indices epar_max_xloc = pp_max_xloc.wrap(xcc[epar_max_xloc.data]) # location _ret = find_mp_edges(j_block, 0.1, 0.1, maskval=maskval) sheath_edge, msphere_edge, mp_width, sheath_ind, sphere_ind = _ret # extract b and b**2 at sheath + 6 grid points and sphere - 6 grid pointns # clipping cases where things go outside the block. clipped ponints are # set to nan step = 6 # extract b if b_block.layout == "flat": comp_axis = 0 ic, _, iy, iz = np.ix_(*[np.arange(si) for si in b_block.shape]) ix = np.clip(sheath_ind + step, 0, nx - 1) b_sheath = b_block.data[ic, ix, iy, iz] ix = np.clip(sheath_ind - step, 0, nx - 1) b_sphere = b_block.data[ic, ix, iy, iz] elif b_block.layout == "interlaced": comp_axis = 3 _, iy, iz = np.ix_(*[np.arange(si) for si in b_block.shape[:-1]]) ix = np.clip(sheath_ind + step, 0, nx - 1) b_sheath = b_block.data[ix, iy, iz] ix = np.clip(sheath_ind - step, 0, nx - 1) b_sphere = b_block.data[ix, iy, iz] # extract b**2 bmag_sheath = np.sqrt(np.sum(b_sheath**2, axis=comp_axis)) bmag_sphere = np.sqrt(np.sum(b_sphere**2, axis=comp_axis)) costheta = (np.sum(b_sheath * b_sphere, axis=comp_axis) / (bmag_sphere * bmag_sheath)) costheta = np.where((sheath_ind + step < nx) & (sphere_ind - step >= 0), costheta, np.nan) mp_shear = mp_width.wrap((180.0 / np.pi) * np.arccos(costheta)) # don't bother with pretty name since it's not written to file # plane_crds = b_block.crds.slice_keep('x=0', cc=True) # fld_kwargs = dict(center="Cell", time=b.time) mp_width.name = "mp_width" mp_xloc.name = "mp_xloc" sheath_edge.name = "mp_sheath_edge" msphere_edge.name = "mp_sphere_edge" mp_shear.name = "mp_shear" pp_max.name = "pp_max" pp_max_xloc.name = "pp_max_xloc" epar_max.name = "epar_max" epar_max_xloc.name = "epar_max_xloc" mp_info = {} mp_info["mp_width"] = mp_width mp_info["mp_xloc"] = mp_xloc mp_info["mp_sheath_edge"] = sheath_edge mp_info["mp_sphere_edge"] = msphere_edge mp_info["mp_shear"] = mp_shear mp_info["pp_max"] = pp_max mp_info["pp_max_xloc"] = pp_max_xloc mp_info["epar_max"] = epar_max mp_info["epar_max_xloc"] = epar_max_xloc # cache new fields to disk if mp_fname: viscid.save_fields(mp_fname + ".h5", list(mp_info.values())) try: _paraboloid_params = fit_paraboloid(mp_info[fit], p0=fit_p0) mp_info["paraboloid"] = _paraboloid_params except ImportError as _exception: try: msg = _exception.message except AttributeError: msg = _exception.msg mp_info["paraboloid"] = viscid.DeferredImportError(msg) mp_info["mp_width"].pretty_name = "Magnetopause Width" mp_info["mp_xloc"].pretty_name = "Magnetopause $X_{gse}$ Location" mp_info["mp_sheath_edge"].pretty_name = "Magnetosheath Edge" mp_info["mp_sphere_edge"].pretty_name = "Magnetosphere Edge" mp_info["mp_shear"].pretty_name = "Magnetic Shear" mp_info["pp_max"].pretty_name = "Max Pressure" mp_info["pp_max_xloc"].pretty_name = "Max Pressure Location" mp_info["epar_max"].pretty_name = "Max E Parallel" mp_info["epar_max_xloc"].pretty_name = "Max E Parallel Location" return mp_info
[docs]def find_mp_edges(j_block, msphere_thresh=0.1, sheath_thresh=0.1, maskval=1e-4): """Find x location of msphere and msheath edges using current (J) Note: GSE coordinates only please Args: j_block (VectorField): Current density containing the whole magnetopause msphere_thresh (float): thereshold of current on the magnetosphere side as a fraction of the maximum current density, i.e., 0.1 is 10% of the max sheath_thresh (float): thereshold of current on the magnetosheath side as a fraction of the maximum current density, i.e., 0.1 is 10% of the max maskval (float, None): if not None, then mask out J values less than maskval; useful for masking out bowshock, and current free regions Returns: tuple: sheath and sphere fields / values - **sheath_edge**: float or 2D ScalarField of x values - **msphere_edge**: float or 2D ScalarField of x values - **mp_width**: **sheath_edge** - **msphere_edge** - **sheath_ind**: index of sheath_edge x location - **sphere_ind**: index of msphere_edge x location """ if maskval is not None: jy_mask = j_block['y'].data < maskval else: jy_mask = np.zeros_like(j_block['y'].data, dtype='bool') xcc = j_block.get_crd_cc('x') nx = len(xcc) masked_jy = 1.0 * j_block['y'] masked_jy.data = np.ma.masked_where(jy_mask, j_block['y']) jy_absmax = np.amax(np.abs(masked_jy), axis=0, keepdims=True) msphere_mask = (masked_jy > msphere_thresh * jy_absmax) sheath_mask = (masked_jy > sheath_thresh * jy_absmax) jy_absmax = None sphere_ind = np.argmax(msphere_mask, axis=0) if isinstance(sphere_ind, viscid.field.Field): msphere_edge = np.where(sphere_ind > 0, xcc[sphere_ind.data], np.nan) msphere_edge = sphere_ind.wrap(msphere_edge) else: msphere_edge = np.where(sphere_ind > 0, xcc[sphere_ind], np.nan) # reverse it to go from the other direction sheath_ind = nx - 1 - np.argmax(sheath_mask['x=::-1'], axis=0) if isinstance(sheath_ind, viscid.field.Field): sheath_edge = np.where(sheath_ind < (nx - 1), xcc[sheath_ind.data], np.nan) sheath_edge = sheath_ind.wrap(sheath_edge) else: sheath_edge = np.where(sheath_ind < (nx - 1), xcc[sheath_ind], np.nan) # in MHD crds, it my be sufficient to swap msp and msh at this point mp_width = sheath_edge - msphere_edge return sheath_edge, msphere_edge, mp_width, sheath_ind, sphere_ind
def _main(): f = viscid.load_file("$WORK/xi_fte_001/*.3d.[4050f].xdmf") mp = get_mp_info(f['pp'], f['b'], f['j'], f['e_cc'], fit='mp_xloc', slc="x=6.5j:10.5j, y=-4j:4j, z=-4.8j:3j", cache=False) y, z = mp['pp_max_xloc'].meshgrid_flat(prune=True) x = mp['pp_max_xloc'].data.reshape(-1) Y, Z = mp['pp_max_xloc'].meshgrid(prune=True) x2 = paraboloid(Y, Z, *mp['paraboloid'][0]) skip = 117 n = paraboloid_normal(Y, Z, *mp['paraboloid'][0]).reshape(3, -1)[:, ::skip] minvar_y = Y.reshape(-1)[::skip] minvar_z = Z.reshape(-1)[::skip] minvar_n = np.zeros([3, len(minvar_y)]) for i in range(minvar_n.shape[0]): p0 = [0.0, minvar_y[i], minvar_z[i]] p0[0] = mp['pp_max_xloc']['y={0[0]}f, z={0[1]}f'.format(p0)] minvar_n[:, i] = viscid.find_minvar_lmn_around(f['b'], p0, l=2.0, n=64)[2, :] # 2d plots, normals don't look normal in the matplotlib projection if False: # pylint: disable=using-constant-test from viscid.plot import vpyplot as vlt from matplotlib import pyplot as plt normals = paraboloid_normal(Y, Z, *mp['paraboloid'][0]) p0 = np.array([x2, Y, Z]).reshape(3, -1) p1 = p0 + normals.reshape(3, -1) vlt.scatter_3d(np.vstack([x, y, z])[:, ::skip], equal=True) for i in range(0, p0.shape[1], skip): plt.gca().plot([p0[0, i], p1[0, i]], [p0[1, i], p1[1, i]], [p0[2, i], p1[2, i]], color='c') # z2 = _ellipsiod(X, Y, *popt) plt.gca().plot_surface(Y, Z, x2, color='r') vlt.show() # mayavi 3d plots, normals look better here if True: # pylint: disable=using-constant-test from viscid.plot import vlab vlab.points3d(x[::skip], y[::skip], z[::skip], scale_factor=0.25, color=(0.0, 0.0, 1.0)) mp_width = mp['mp_width']['x=0'] mp_sheath_edge = mp['mp_sheath_edge']['x=0'] mp_sphere_edge = mp_sheath_edge - mp_width vlab.mesh(x2, Y, Z, scalars=mp_width.data) vlab.mesh(mp_sheath_edge.data, Y, Z, opacity=0.75, color=(0.75, ) * 3) vlab.mesh(mp_sphere_edge.data, Y, Z, opacity=0.75, color=(0.75, ) * 3) n = paraboloid_normal(Y, Z, *mp['paraboloid'][0]).reshape(3, -1)[:, ::skip] vlab.quiver3d(x2.reshape(-1)[::skip], Y.reshape(-1)[::skip], Z.reshape(-1)[::skip], n[0], n[1], n[2], color=(1, 0, 0)) vlab.quiver3d(x2.reshape(-1)[::skip], Y.reshape(-1)[::skip], Z.reshape(-1)[::skip], minvar_n[0], minvar_n[1], minvar_n[2], color=(0, 0, 1)) vlab.show() if __name__ == "__main__": import sys # pylint: disable=wrong-import-position,wrong-import-order sys.exit(_main()) ## ## EOF ##