Source code for viscid.calculator.calc

#!/usr/bin/env python
""" This has a mechanism to dispatch to different backends. You can change the
order of preference of the backend by changing default_backends.
TODO: this dispatch mechanism is very simple, maybe something a little more
flexable would be useful down the line? """

from __future__ import print_function
from itertools import count

import numpy as np

import viscid
from viscid import field
from viscid import logger
from viscid import verror
from viscid import seed
from viscid.compat import izip, OrderedDict

try:
    from viscid.calculator import cycalc
    has_cython = True
except ImportError:
    has_cython = False

try:
    from viscid.calculator import necalc
    has_numexpr = True
except ImportError as e:
    has_numexpr = False

__all__ = ['neg', 'scale', 'add', 'diff', 'mul', 'axpby', 'relative_diff',
           'abs_diff', 'abs_val', 'abs_max', 'abs_min', 'magnitude', 'dot',
           'cross', 'grad', 'convective_deriv', 'div', 'curl', 'normalize',
           'project', 'project_vector', 'project_along_line', 'resample_lines',
           'integrate_along_line', 'integrate_along_lines', 'jacobian_at_point',
           'jacobian_at_ind', 'jacobian_eig_at_point', 'jacobian_eig_at_ind',
           'div_at_point', 'curl_at_point', 'extend_boundaries',
           'extend_boundaries_ndarr']


class Operation(object):
    default_backends = ["numexpr", "cython", "numpy"]

    _imps = None  # implementations
    opname = None
    short_name = None

    def __init__(self, name, short_name, implementations=(), doc=""):
        self.opname = name
        self.short_name = short_name
        self._imps = OrderedDict()
        self.add_implementations(implementations)
        setattr(self, "__doc__", doc)

    @property
    def __name__(self):
        return self.opname

    def add_implementation(self, name, func):
        self._imps[name] = func

    def add_implementations(self, implementations):
        for name, func in implementations:
            self.add_implementation(name, func)

    def _get_imp(self, preferred, only=False):
        if not isinstance(preferred, (list, tuple)):
            if preferred is None:
                preferred = list(self._imps.keys())
            else:
                preferred = [preferred]

        for name in preferred:
            if name in self._imps:
                return self._imps[name]

        msg = "{0} :: {1}".format(self.opname, preferred)
        if only:
            raise verror.BackendNotFound(msg)
        logger.info("No preferred backends available: " + msg)

        for name in self.default_backends:
            if name in self._imps:
                return self._imps[name]

        if len(self._imps) == 0:
            raise verror.BackendNotFound("No backends available")
        return list(self._imps.values())[0]

    def __call__(self, *args, **kwargs):
        preferred = kwargs.pop("preferred", None)
        only = kwargs.pop("only", False)
        func = self._get_imp(preferred, only)
        return func(*args, **kwargs)


class UnaryOperation(Operation):
    def __call__(self, a, **kwargs):
        ret = super(UnaryOperation, self).__call__(a, **kwargs)
        ret.name = "{0} {1}".format(self.short_name, a.name)
        return ret

class BinaryOperation(Operation):
    def __call__(self, a, b=None, **kwargs):
        ret = super(BinaryOperation, self).__call__(a, b, **kwargs)
        try:
            ret.name = "{0} {1} {2}".format(a.name, self.short_name, b.name)
        except AttributeError:
            try:
                ret.name = "{0} {1}".format(self.short_name, b.name)
            except AttributeError:
                try:
                    ret.name = "{0} {1}".format(self.short_name, a.name)
                except AttributeError:
                    ret.name = self.short_name
        return ret

neg = UnaryOperation("neg", "-", doc="Callable, calculates -a")
scale = BinaryOperation("scale", "*=", doc="Callable, scales a")
add = BinaryOperation("add", "+", doc="Callable, calculates a + b")
diff = BinaryOperation("diff", "-", doc="Callable, calculates a - b")
mul = BinaryOperation("mul", "*", doc="Callable, calculates a * b")
axpby = Operation("axpby", "+", doc="Callable, calculates a * x + b * y")
relative_diff = BinaryOperation("relative diff", "%-",
                                doc="Callable, calculates abs(a - b) / a")
abs_diff = BinaryOperation("abs diff", "|-|", doc="Callable, calculates abs(a - b)")
abs_val = UnaryOperation("abs val", "absval", doc="Callable, calculates abs(a)")
abs_max = Operation("abs max", "absmax", doc="Callable, calculates max(abs(a))")
abs_min = Operation("abs min", "absmin", doc="Callable, calculates min(abs(a))")
magnitude = UnaryOperation("magnitude", "magnitude",
                           doc="Callable, calculates L2 Norm of a vectors in a vector field")
dot = BinaryOperation("dot", "dot", doc="Callable, calculates a dot b")
cross = BinaryOperation("cross", "x", doc="Callable, calculates a cross b")
project = BinaryOperation("project", "dot mag",
                          doc="Callable, scalar projection of a onto b; a dot b / norm(b)")
normalize = UnaryOperation("normalize", "normalize",
                           doc="Callable, divide a vector field by its magnitude")
grad = UnaryOperation("grad", "grad", doc="Callable, gradient of a scalar field")
div = UnaryOperation("div", "div", doc="Callable, divergence of a vector field")
curl = UnaryOperation("curl", "curl", doc="Callable, curl of a vector field")

if has_numexpr:
    neg.add_implementation("numexpr", necalc.neg)
    scale.add_implementation("numexpr", necalc.scale)
    add.add_implementation("numexpr", necalc.add)
    diff.add_implementation("numexpr", necalc.diff)
    mul.add_implementation("numexpr", necalc.mul)
    axpby.add_implementation("numexpr", necalc.axpby)
    relative_diff.add_implementation("numexpr", necalc.relative_diff)
    abs_diff.add_implementation("numexpr", necalc.abs_diff)
    abs_val.add_implementation("numexpr", necalc.abs_val)
    abs_max.add_implementation("numexpr", necalc.abs_max)
    abs_min.add_implementation("numexpr", necalc.abs_min)
    magnitude.add_implementation("numexpr", necalc.magnitude)
    dot.add_implementation("numexpr", necalc.dot)
    cross.add_implementation("numexpr", necalc.cross)
    project.add_implementation("numexpr", necalc.project)
    normalize.add_implementation("numexpr", necalc.normalize)
    grad.add_implementation("numexpr", necalc.grad)
    div.add_implementation("numexpr", necalc.div)
    curl.add_implementation("numexpr", necalc.curl)

# numpy versions
neg.add_implementation("numpy", lambda a: -a)
scale.add_implementation("numpy", lambda a, b: np.asarray(a, dtype=b.dtype) * b)
add.add_implementation("numpy", lambda a, b: a + b)
diff.add_implementation("numpy", lambda a, b: a - b)
mul.add_implementation("numpy", lambda a, b: a * b)
axpby.add_implementation("numpy", lambda a, x, b, y: a * x + b * y)
relative_diff.add_implementation("numpy", lambda a, b: (a - b) / a)
abs_diff.add_implementation("numpy", lambda a, b: np.abs(a - b))
abs_val.add_implementation("numpy", np.abs)
abs_max.add_implementation("numpy", lambda a: np.max(np.abs(a)))
abs_min.add_implementation("numpy", lambda a: np.min(np.abs(a)))

def _magnitude_np(fld):
    return np.sqrt((np.sum(fld * fld, axis=fld.nr_comp)))
magnitude.add_implementation("numpy", _magnitude_np)

def _project_np(a, b):
    """ project a along b (a dot b / |b|) """
    return (np.sum(a * b, axis=b.nr_comp) /
            np.sqrt(np.sum(b * b, axis=b.nr_comp)))
project.add_implementation("numpy", _project_np)

def _normalize_np(a):
    """ normalize a vector field """
    shp0 = list(a.shape)
    shp0[a.nr_comp] = 1
    return a / np.linalg.norm(a, axis=a.nr_comp).reshape(shp0)
normalize.add_implementation("numpy", _normalize_np)

def _dot_np(fld_a, fld_b):
    """dot product of two vector fields"""
    if fld_a.nr_comp != fld_b.nr_comp:
        raise ValueError("field must have same layout (flat or interlaced)")
    return np.sum(fld_a * fld_b, axis=fld_a.nr_comp)
dot.add_implementation("numpy", _dot_np)

def _cross_np(fld_a, fld_b):
    """cross product of two vector fields"""
    ax, ay, az = fld_a.component_views()
    bx, by, bz = fld_b.component_views()
    prodx = ay * bz - az * by
    prody = -ax * bz + az * bx
    prodz = ax * by - ay * bx
    return fld_a.wrap([prodx, prody, prodz])
cross.add_implementation("numpy", _cross_np)

def _grad_np(fld, bnd=True):
    """2nd order centeral diff, 1st order @ boundaries if bnd"""
    if bnd:
        fld = viscid.extend_boundaries(fld, order=0, crd_order=0)

    if fld.iscentered("Cell"):
        crdx, crdy, crdz = fld.get_crds_cc(shaped=True)
        # divcenter = "Cell"
        # divcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        # divcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    elif fld.iscentered("Node"):
        crdx, crdy, crdz = fld.get_crds_nc(shaped=True)
        # divcenter = "Node"
        # divcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        # divcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    else:
        raise NotImplementedError("Can only do cell and node centered gradients")

    v = fld.data
    g = viscid.zeros(fld['x=1:-1, y=1:-1, z=1:-1'].crds, nr_comps=3)

    xp, xm = crdx[2:,  :,  :], crdx[:-2, :  , :  ]  # pylint: disable=bad-whitespace
    yp, ym = crdy[ :, 2:,  :], crdy[:  , :-2, :  ]  # pylint: disable=bad-whitespace
    zp, zm = crdz[ :,  :, 2:], crdz[:  , :  , :-2]  # pylint: disable=bad-whitespace

    vxp, vxm = v[2:  , 1:-1, 1:-1], v[ :-2, 1:-1, 1:-1]  # pylint: disable=bad-whitespace
    vyp, vym = v[1:-1, 2:  , 1:-1], v[1:-1,  :-2, 1:-1]  # pylint: disable=bad-whitespace
    vzp, vzm = v[1:-1, 1:-1, 2:  ], v[1:-1, 1:-1,  :-2]  # pylint: disable=bad-whitespace

    g['x'].data[...] = (vxp - vxm) / (xp - xm)
    g['y'].data[...] = (vyp - vym) / (yp - ym)
    g['z'].data[...] = (vzp - vzm) / (zp - zm)
    return g
grad.add_implementation("numpy", _grad_np)

def _div_np(fld, bnd=True):
    """2nd order centeral diff, 1st order @ boundaries if bnd"""
    if fld.iscentered("Face"):
        # dispatch fc div immediately since that does its own pre-processing
        return viscid.div_fc(fld, bnd=bnd)

    if bnd:
        fld = viscid.extend_boundaries(fld, order=0, crd_order=0)

    vx, vy, vz = fld.component_views()

    if fld.iscentered("Cell"):
        crdx, crdy, crdz = fld.get_crds_cc(shaped=True)
        divcenter = "Cell"
        # divcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        divcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    elif fld.iscentered("Node"):
        crdx, crdy, crdz = fld.get_crds_nc(shaped=True)
        divcenter = "Node"
        # divcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        divcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    else:
        raise NotImplementedError("Can only do cell and node centered divs")

    xp, xm = crdx[2:,  :,  :], crdx[:-2, :  , :  ]  # pylint: disable=bad-whitespace
    yp, ym = crdy[ :, 2:,  :], crdy[:  , :-2, :  ]  # pylint: disable=bad-whitespace
    zp, zm = crdz[ :,  :, 2:], crdz[:  , :  , :-2]  # pylint: disable=bad-whitespace

    vxp, vxm = vx[2:  , 1:-1, 1:-1], vx[ :-2, 1:-1, 1:-1]  # pylint: disable=bad-whitespace
    vyp, vym = vy[1:-1, 2:  , 1:-1], vy[1:-1,  :-2, 1:-1]  # pylint: disable=bad-whitespace
    vzp, vzm = vz[1:-1, 1:-1, 2:  ], vz[1:-1, 1:-1,  :-2]  # pylint: disable=bad-whitespace

    div_arr = ((vxp - vxm) / (xp - xm) + (vyp - vym) / (yp - ym) +
               (vzp - vzm) / (zp - zm))
    return field.wrap_field(div_arr, divcrds, name="div " + fld.name,
                            center=divcenter, time=fld.time, parents=[fld])
div.add_implementation("numpy", _div_np)

def _curl_np(fld, bnd=True):
    """2nd order centeral diff, 1st order @ boundaries if bnd"""
    if bnd:
        fld = viscid.extend_boundaries(fld, order=0, crd_order=0)

    vx, vy, vz = fld.component_views()

    if fld.iscentered("Cell"):
        crdx, crdy, crdz = fld.get_crds_cc(shaped=True)
        curlcenter = "cell"
        # curlcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        curlcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    elif fld.iscentered("Node"):
        crdx, crdy, crdz = fld.get_crds_nc(shaped=True)
        curlcenter = "node"
        # curlcrds = coordinate.NonuniformCartesianCrds(fld.crds.get_clist(np.s_[1:-1]))
        curlcrds = fld.crds.slice_keep(np.s_[1:-1, 1:-1, 1:-1])
    else:
        raise NotImplementedError("Can only do cell and node centered divs")

    xp, xm = crdx[2:,  :,  :], crdx[:-2, :  , :  ]  # pylint: disable=bad-whitespace
    yp, ym = crdy[ :, 2:,  :], crdy[:  , :-2, :  ]  # pylint: disable=bad-whitespace
    zp, zm = crdz[ :,  :, 2:], crdz[:  , :  , :-2]  # pylint: disable=bad-whitespace

    vxpy, vxmy = vx[1:-1, 2:  , 1:-1], vx[1:-1,  :-2, 1:-1]  # pylint: disable=bad-whitespace
    vxpz, vxmz = vx[1:-1, 1:-1, 2:  ], vx[1:-1, 1:-1,  :-2]  # pylint: disable=bad-whitespace

    vypx, vymx = vy[2:  , 1:-1, 1:-1], vy[ :-2, 1:-1, 1:-1]  # pylint: disable=bad-whitespace
    vypz, vymz = vy[1:-1, 1:-1, 2:  ], vy[1:-1, 1:-1,  :-2]  # pylint: disable=bad-whitespace

    vzpx, vzmx = vz[2:  , 1:-1, 1:-1], vz[ :-2, 1:-1, 1:-1]  # pylint: disable=bad-whitespace
    vzpy, vzmy = vz[1:-1, 2:  , 1:-1], vz[1:-1,  :-2, 1:-1]  # pylint: disable=bad-whitespace

    curl_x = (vzpy - vzmy) / (yp - ym) - (vypz - vymz) / (zp - zm)
    curl_y = -(vzpx - vzmx) / (xp - xm) + (vxpz - vxmz) / (zp - zm)
    curl_z = (vypx - vymx) / (xp - xm) - (vxpy - vxmy) / (yp - ym)

    return field.wrap_field([curl_x, curl_y, curl_z], curlcrds,
                            name="curl " + fld.name, fldtype="Vector",
                            center=curlcenter, time=fld.time,
                            parents=[fld])
curl.add_implementation("numpy", _curl_np)

[docs]def convective_deriv(a, b=None, bnd=True): r"""Compute (a \dot \nabla) b for vector fields a and b""" # [(B \dot \nabla) B]_j = B_i \partial_i B_j # FIXME: this is a lot of temporary arrays if bnd: if b is None: b = viscid.extend_boundaries(a, order=0, crd_order=0) else: b = viscid.extend_boundaries(b, order=0, crd_order=0) else: if b is None: b = a a = a['x=1:-1, y=1:-1, z=1:-1'] if b.nr_comps > 1: diBj = [[None, None, None], [None, None, None], [None, None, None]] for j, jcmp in enumerate('xyz'): g = grad(b[jcmp], bnd=False) for i, icmp in enumerate('xyz'): diBj[i][j] = g[icmp] dest = viscid.zeros(a.crds, nr_comps=3) for i, icmp in enumerate('xyz'): for j, jcmp in enumerate('xyz'): dest[jcmp][...] += a[icmp] * diBj[i][j] else: dest = dot(a, grad(b, bnd=False)) return dest
# native versions NOTE: magnitude_native is really only for benchmarking def _magnitude_native(fld): vx, vy, vz = fld.component_views() mag = np.empty_like(vx) for i in range(mag.shape[0]): for j in range(mag.shape[1]): for k in range(mag.shape[2]): mag[i, j, k] = np.sqrt(vx[i, j, k]**2 + vy[i, j, k]**2 + vz[i, j, k]**2) return vx.wrap(mag, context={"name": "{0} magnitude".format(fld.name)}) magnitude.add_implementation("native", _magnitude_native)
[docs]def project_vector(a, b): """Calculates the vector (a dot b_hat) * b_hat""" bnorm = normalize(b) # print(">>", type(bnorm), bnorm.shape, bnorm.nr_comps, # "MIN", np.min(np.linalg.norm(bnorm, axis=b.nr_comp)), # "MAX", np.max(np.linalg.norm(bnorm, axis=b.nr_comp))) return project(a, bnorm) * bnorm
[docs]def resample_lines(lines, factor=1, kind="linear"): """Resample a bunch of lines Args: lines (sequence): a list of 3xN ndarrays that are lines factor (float): how to resample. 2 will give a point at the midpoint of all points, 3 will give two points, etc. 0.5 will leave half the number of points, etc. This should be > 0.0. kind (str): How to interpolate; must be a value recognized by :py:func:`scipy.interpolate.interp1d` Raises: ValueError: List of resampled 3xN ndarrays """ # FIXME: this is wicked slow if factor <= 0: raise ValueError("factor should be > 0") elif factor == 1: return lines for i, line in enumerate(lines): nr_sdims, nold = line.shape nnew = np.ceil(factor * (nold - 1)) + 1 newline = np.empty((nr_sdims, nnew), dtype=line.dtype) sold = np.linspace(0, 1, nold) snew = np.linspace(0, 1, nnew) try: # raise ImportError from scipy.interpolate import interp1d for j in range(nr_sdims): newline[j, :] = interp1d(sold, line[j, :], kind=kind)(snew) except ImportError: if kind != 'linear': viscid.logger.error("Scipy is required to do anything " "other than linear interpolation") raise for j in range(nr_sdims): newline[j, :] = np.interp(snew, sold, line[j, :]) lines[i] = newline return lines
[docs]def project_along_line(line, fld, interp_kind='trilin'): """Project a Vector Field Parallel to a streamline Args: line (ndarray): 3xN of points along the fld (VectorField): Field to interpolate and project onto the line interp_kind (str): which interpolation to use, const or trilin """ fld_on_verts = viscid.interp(fld, line, kind=interp_kind) # FIXME: here in lies a bug, which axis am I actually summing over? # maybe that's not the bug, but there must be something wrong # here as indicated by kris_scripts/work/iono_xi_mismatch/analytic_pot.py dsvec = line[:, 2:] - line[:, :-2] dsvec = np.concatenate([dsvec[:, 0:1], dsvec, dsvec[:, -2:-1]], axis=1) dsvec = dsvec / np.linalg.norm(dsvec, axis=0) return np.sum(fld_on_verts * dsvec.T, axis=1)
[docs]def integrate_along_line(line, fld, reduction="dot", mask_func=None, interp_kind='trilin'): """Integrate the value of fld along a line Args: line (ndarray): 3xN ndarray fld (Field): Field to interpolate / integrate reduction (str): If fld is a vector field, what quantity to integrate. Can be "dot" to dot the vectors with ds along the line, or "mag" to integrate the magnitude. interp_kind (str): which interpolation to use, const or trilin Returns: a scalar with the same dtype as fld """ return integrate_along_lines([line], fld, reduction=reduction, mask_func=mask_func, interp_kind=interp_kind)[0]
[docs]def integrate_along_lines(lines, fld, reduction="dot", mask_func=None, interp_kind='trilin'): """Integrate the value of fld along a list of lines Args: lines (list): list of 3xN ndarrays, N needs not be the same for all lines fld (Field): Field to interpolate / integrate reduction (str): If fld is a vector field, what quantity to integrate. Can be "dot" to dot the vectors with ds along the line, or "mag" to integrate the magnitude. interp_kind (str): which interpolation to use, const or trilin Returns: ndarray with shape (len(lines), ) """ arr = np.zeros((len(lines),), dtype=fld.dtype) cum_n = np.cumsum([0] + [line.shape[1] for line in lines]) all_verts = np.concatenate(lines, axis=1) fld_on_verts = viscid.interp(fld, all_verts, kind=interp_kind).data for i, start, stop in izip(count(), cum_n[:-1], cum_n[1:]): ds = np.linalg.norm(lines[i][:, 1:] - lines[i][:, :-1], axis=0) if len(fld_on_verts.shape) > 1: reduction = reduction.strip().lower() if reduction == "dot": dsvec = lines[i][:, 1:] - lines[i][:, :-1] dsvec = dsvec / np.linalg.norm(dsvec, axis=0) values = 0.5 * (fld_on_verts[start:stop - 1, :] + fld_on_verts[start + 1:stop, :]) values = values * dsvec.T if mask_func is not None: values = np.ma.masked_where(mask_func(values), values) values = np.sum(values, axis=1) elif reduction in ["mag", "magnitude", "norm"]: mag = np.linalg.norm(fld_on_verts[start:stop], axis=1) values = 0.5 * (mag[start:stop - 1] + mag[start + 1:stop]) else: raise ValueError("Unknown reduction: {0}".format(reduction)) else: values = 0.5 * (fld_on_verts[start:stop - 1] + fld_on_verts[start + 1:stop]) arr[i] = np.sum(values * ds) return arr
def local_vector_points(B, x, y, z, dx=None, dy=None, dz=None): """Get B at 6 points surrounding X X = [x, y, z] with spacing [+/-dx, +/-dy, +/-dz] Args: B (VectorField): B field x (float, ndarray, list): x (single value) y (float, ndarray, list): y (single value) z (float, ndarray, list): z (single value) dx (float, optional): dx, one grid cell if None dy (float, optional): dy, one grid cell if None dz (float, optional): dz, one grid cell if None Returns: (bs, pts, dcrd) * bs (ndarary): shape (6, 3) where 0-3 -> Bx,By,Bz and 0-6 -> X-dx, X+dx, X-dy, X+dy, X-dz, X+dz * pts (ndarray): shape (6, 3); the location of the points of the bs, but this time, 0-3 -> x,y,z * dcrd (list): [dx, dy, dz] """ assert has_cython # if a problem, you need to build Viscid assert B.iscentered("Cell") x, y, z = [np.array(c).reshape(1, 1) for c in [x, y, z]] crds = B.get_crds("xyz") inds = [0] * len(crds) dcrd = [0] * len(crds) # This makes points in xyz order pts = np.tile([x, y, z], 6).reshape(3, -1).T for i, crd, loc, d in zip(count(), crds, [x, y, z], [dx, dy, dz]): inds[i] = cycalc.closest_ind(crd, loc) if d is None: dcrd[i] = crd[inds[i] + 1] - crd[inds[i]] else: dcrd[i] = d pts[2 * i + 1, i] += dcrd[i] pts[2 * i + 0, i] -= dcrd[i] bs = cycalc.interp_trilin(B, seed.Point(pts)) # viscid.interact(banner="in local_vector_points") return bs, pts, dcrd
[docs]def jacobian_at_point(B, x, y, z, dx=None, dy=None, dz=None): """Get the Jacobian at a point If dx|dy|dz == None, then their set to the grid spacing at the point of interest Returns: The jacobian as a 3x3 ndarray. The result is in xyz order, in other words:: [ [d_x Bx, d_y Bx, d_z Bx], [d_x By, d_y By, d_z By], [d_x Bz, d_y Bz, d_z Bz] ] """ bs, _, dcrd = local_vector_points(B, x, y, z, dx, dy, dz) gradb = np.empty((3, 3), dtype=B.dtype) # bs is in xyz spatial order, but components are in xyz order # dcrd is in xyz order # gradb has xyz order for everything for i in range(3): gradb[i, 0] = (bs[1, i] - bs[0, i]) / (2.0 * dcrd[0]) # d_x Bi gradb[i, 1] = (bs[3, i] - bs[2, i]) / (2.0 * dcrd[1]) # d_y Bi gradb[i, 2] = (bs[5, i] - bs[4, i]) / (2.0 * dcrd[2]) # d_z Bi return gradb
[docs]def jacobian_at_ind(B, ix, iy, iz): """Get the Jacobian at index Returns: The jacobian as a 3x3 ndarray. The result is in xyz order, in other words:: [ [d_x Bx, d_y Bx, d_z Bx], [d_x By, d_y By, d_z By], [d_x Bz, d_y Bz, d_z Bz] ] """ bx, by, bz = B.component_views() x, y, z = B.get_crds("xyz") gradb = np.empty((3, 3), dtype=B.dtype) for i, bi in enumerate([bx, by, bz]): # d_x Bi, d_y Bi, d_z Bi gradb[i, 0] = (bi[ix + 1, iy, iz] - bi[ix - 1, iy, iz]) / (x[ix + 1] - x[ix - 1]) gradb[i, 1] = (bi[ix, iy + 1, iz] - bi[ix, iy - 1, iz]) / (y[iy + 1] - y[iy - 1]) gradb[i, 2] = (bi[ix, iy, iz + 1] - bi[ix, iy, iz - 1]) / (z[iz + 1] - z[iz - 1]) return gradb
[docs]def jacobian_eig_at_point(B, x, y, z, dx=None, dy=None, dz=None): """Get the eigen vals/vecs of the jacobian Returns: evals, evecs (3x3 ndarray) The evec[:, i] corresponds to evals[i]. Eigen vectors are returned in xyz order, aka evec[:, 0] is [x, y, z] for the 0th eigen vector """ gradb = jacobian_at_point(B, x, y, z, dx, dy, dz) evals, evecs = np.linalg.eig(gradb) return evals, evecs
[docs]def jacobian_eig_at_ind(B, ix, iy, iz): """Get the eigen vals/vecs of the jacobian Returns: evals, evecs (3x3 ndarray) The evec[:, i] corresponds to evals[i]. Eigen vectors are returned in xyz order, aka evec[:, 0] is [x, y, z] for the 0th eigen vector """ gradb = jacobian_at_ind(B, ix, iy, iz) evals, evecs = np.linalg.eig(gradb) return evals, evecs
[docs]def div_at_point(A, x, y, z, dx=None, dy=None, dz=None): """Returns divergence at a point""" As, _, dcrd = local_vector_points(A, x, y, z, dx, dy, dz) d = 0.0 for i in range(3): d += (As[2 * i + 1, i] - As[2 * i + 0, i]) / (2.0 * dcrd[i]) return d
[docs]def curl_at_point(A, x, y, z, dx=None, dy=None, dz=None): """Returns curl at point as ndarray with shape (3,) xyz""" As, _, dcrd = local_vector_points(A, x, y, z, dx, dy, dz) c = np.zeros(3, dtype=A.dtype) # this is confusing: In As, first index is xyz, 2nd index is xyz # xi+dxi comp xi-dxi comp / (2 * dxi) c[0] = ((As[2 * 1 + 1, 2] - As[2 * 1 + 0, 2]) / (2.0 * dcrd[1]) - (As[2 * 0 + 1, 1] - As[2 * 0 + 0, 1]) / (2.0 * dcrd[2])) c[1] = ((As[2 * 0 + 1, 0] - As[2 * 0 + 0, 0]) / (2.0 * dcrd[2]) - (As[2 * 2 + 1, 2] - As[2 * 2 + 0, 2]) / (2.0 * dcrd[0])) c[2] = ((As[2 * 2 + 1, 1] - As[2 * 2 + 0, 1]) / (2.0 * dcrd[0]) - (As[2 * 1 + 1, 0] - As[2 * 1 + 0, 0]) / (2.0 * dcrd[1])) return c
[docs]def extend_boundaries_ndarr(arr, nl=1, nh=1, axes='all', nr_comp=None, order=1, invarient_dx=0.0): """Extend and pad boundaries of ndarray (leaves new corners @ 0.0) Args: arr (ndarray): Array to extend nl (int): extend this many cells in lower direction nh (int): extend this many cells in upper direction axes (list): list of ints that correspond to axes that should be extended. If nr_comp != None, then axes > nr_comp will be shifted by 1. nr_comp (int, None): index of shape that corresponds to vector component dimension order (int): 0 for repeating boundary values or 1 for linear extrapolation invarient_dx (float): if len(arr) == 1 and order == 1, then the array is extended with constant dx, using 0 here is synonymous with 0th order. Returns: ndarray: A new extended / padded ndarray """ if nr_comp is not None: axes = [ax if ax < nr_comp else ax + 1 for ax in axes] shape = list(arr.shape) target_shape = [s + nl + nh if i in axes else s for i, s in enumerate(shape)] s0 = [] for i, s in enumerate(shape): if i in axes: _nl = nl if nl else None _nh = -nh if nh else None s0.append(slice(_nl, _nh)) else: s0.append(slice(None)) v = np.zeros(target_shape, dtype=arr.dtype) v[tuple(s0)] = arr[...] for _, axi in enumerate(axes): ni = shape[axi] + nl dest_slc = [slice(None)] * len(v.shape) src_slc = [slice(None)] * len(v.shape) src_slcR = [slice(None)] * len(v.shape) if arr.shape[axi] < 2 and invarient_dx: for j in range(nl): dest_slc[axi] = slice(nl - 1 - j, nl - 1 - j + 1) src_slc[axi] = slice(nl - j, nl - j + 1) v[tuple(dest_slc)] = v[tuple(src_slc)] - invarient_dx for j in range(nh): dest_slc[axi] = slice(ni + j, ni + j + 1) src_slc[axi] = slice(ni - 1 + j, ni - 1 + j + 1) v[tuple(dest_slc)] = v[tuple(src_slc)] + invarient_dx elif order == 0: if nl: dest_slc[axi] = slice(None, nl) src_slc[axi] = slice(nl, nl + 1) v[tuple(dest_slc)] = v[tuple(src_slc)] if nh: dest_slc[axi] = slice(ni, None) src_slc[axi] = slice(ni - 1, ni) v[tuple(dest_slc)] = v[tuple(src_slc)] elif arr.shape[axi] < 2: for j in range(nl): dest_slc[axi] = slice(nl - 1 - j, nl - 1 - j + 1) src_slc[axi] = slice(nl - j, nl - j + 1) v[tuple(dest_slc)] = v[tuple(src_slc)] - invarient_dx for j in range(nh): dest_slc[axi] = slice(ni + j, ni + j + 1) src_slc[axi] = slice(ni - 1 + j, ni - 1 + j + 1) v[tuple(dest_slc)] = v[tuple(src_slc)] + invarient_dx elif order == 1: for j in range(nl): dest_slc[axi] = slice(nl - j - 1, nl - j) src_slc[axi] = slice(nl - j, nl - j + 1) src_slcR[axi] = slice(nl - j + 1, nl - j + 2) v[tuple(dest_slc)] = 2 * v[tuple(src_slc)] - v[tuple(src_slcR)] for j in range(nh): dest_slc[axi] = slice(ni + j, ni + j + 1) src_slc[axi] = slice(ni + j - 1, ni + j) src_slcR[axi] = slice(ni + j -2, ni + j - 1) v[tuple(dest_slc)] = 2 * v[tuple(src_slc)] - v[tuple(src_slcR)] # src_slc[i] = slice(-1, None) # dest_slc[i] = slice(-nh, None) # v[dest_slc] = arr[src_slc] else: raise ValueError("extend_boundaries can only be 0th or 1st order") return v
[docs]def extend_boundaries(fld, nl=1, nh=1, axes='all', nr_comp=None, order=1, crd_order=1, crds=None, invarient_dx=0.0, crd_invarient_dx=1.0): """Extend and pad boundaries of field (leaves new corners @ 0.0) Args: fld (Field): Field to extend nl (int): extend this many cells in lower direction nh (int): extend this many cells in upper direction axes (str, list): something like 'xyz', ['x', 'theta'], etc. nr_comp (int, None): index of shape that corresponds to vector component dimension order (int): extrapolation order for data; 0 for repeating boundary values or 1 for linear extrapolation crd_order (int): extrapolation order for crds; 0 for repeating boundary values or 1 for linear extrapolation crds (Coordinates): Use these coordinates, no extrapolate them invarient_dx (float): if fld has a single cell in a given dimension, and order == 1, the field is extended with constant dx, using 0 here is synonymous with 0th order invarient_dx (float): if fld has a single cell in a given dimension, and crd_order == 1, the coordinates in that dimension are extended with constant dx, using 0 here is synonymous with crd_order = 0. Returns: ndarray: A new extended / padded ndarray Warning: When using crd_order=0 (0 order hold), coordinates will be extended by repeating boundary values, so min_dx will be 0. Keep this in mind if dividing by dx. """ arr_axes = fld.crds.axes axis_lookup = dict() for i, ax in enumerate(arr_axes): axis_lookup[i] = i axis_lookup[ax] = i if axes == 'all': axes = list([i for i, s in enumerate(fld.sshape)]) if not isinstance(axes, (list, tuple, viscid.string_types)): axes = list(axes) axes = [axis_lookup[ax] for ax in axes] try: nr_comp = fld.nr_comp except TypeError: nr_comp = None new_dat = extend_boundaries_ndarr(fld.data, nl=nl, nh=nh, axes=axes, nr_comp=nr_comp, order=order, invarient_dx=invarient_dx) if crds is not None: new_crds = crds else: new_clist = [] crds_nc = fld.crds.get_crds_nc() crds_cc = fld.crds.get_crds_cc() for ax, nc, cc in zip(fld.crds.axes, crds_nc, crds_cc): if fld.crds.axes.index(ax) in axes: new_nc = extend_boundaries_ndarr(nc, nl=nl, nh=nh, axes=[0], order=crd_order, invarient_dx=crd_invarient_dx) new_cc = extend_boundaries_ndarr(cc, nl=nl, nh=nh, axes=[0], order=crd_order, invarient_dx=crd_invarient_dx) else: new_nc = nc new_cc = cc new_clist.append((ax, new_nc, new_cc)) crdtype = fld.crds.crdtype if 'nonuniform' not in crdtype: crdtype = crdtype.replace('uniform', 'nonuniform') new_crds = viscid.wrap_crds(crdtype, new_clist) return fld.wrap(new_dat, context=dict(crds=new_crds))
## ## EOF ##