#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022 Adrien CR Thob
#
# This file is part of the py-EnBiD-ananke project,
# <https://github.com/athob/py-EnBiD-ananke>, which is licensed
# under the GNU General Public License v2.0 (GPL-2.0).
#
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-EnBiD-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-EnBiD-ananke/blob/main/COPYRIGHT>
#
"""
enbid_ananke
============
Provides a set of utilities to run the kernel density estimator EnBiD
(`Sharma & Steinmetz 2011 <http://ascl.net/1109.012>`).
How to use
----------
enbid_ananke comes with the function enbid, please refer to its documentation
for further help.
"""
from typing import Any, Optional, Union, Tuple, Dict
from numpy.typing import ArrayLike, NDArray
import pathlib
import warnings
import hashlib
import numpy as np
import pandas as pd
from sklearn import neighbors as nghb
from .__metadata__ import *
from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import execute
__all__ = ['enbid']
def __make_path_of_name(name : Optional[Union[str, pathlib.Path]] = None) -> pathlib.Path:
"""
Generate the folders structure representing a given name as a path,
or generate a temporary one.
Call signature::
path = run_enbid(name=None)
Parameters
----------
name : string
Path representing a folders structure. Default to None.
Returns
----------
path : pathlib.Path
Path corresponding to given name, or to new temporary one.
"""
if name is None:
raise NotImplementedError("name is None") # TODO https://pypi.org/project/temppathlib/
else:
path = pathlib.Path(name)
path.mkdir(parents=True, exist_ok=True)
return path
[docs]
def write_for_enbid(points: ArrayLike, name : Optional[Union[str, pathlib.Path]] = None,
caching : bool = False) -> pathlib.Path:
"""
Writes the input files for EnBiD given the input particles 3D
coordinates.
Call signature::
path = write_for_enbid(points, name=None, caching=False)
Parameters
----------
points : array_like
Contains 3D coordinates of the input particles, must be of shape
(N,3) for any given N integer.
name : string
Name of folder where to place EnBiD input files. Default to None.
caching : bool
If True, check if EnBiD input file already exists and ignore
writing if it does. Default to False.
Returns
----------
path : pathlib.Path
Path of folder where EnBiD input files are located.
"""
path: pathlib.Path = __make_path_of_name(name)
enbid_inputfile: pathlib.Path = path / DEFAULT_FOR_PARAMFILE[TTAGS.fname]
enbid_inputhashfile: pathlib.Path = enbid_inputfile.with_suffix(f".{HASH_EXT}")
points: NDArray = np.asarray(points)
inputhash = bytes(hashlib.sha256(points).hexdigest(), HASH_ENCODING)
if ((enbid_inputhashfile.read_bytes() != inputhash # proceed if hashes don't match,
if (enbid_inputfile.exists() and # only if enbid_inputfile exists,
enbid_inputhashfile.exists()) # and enbid_inputhashfile exists,
else True) # otherwise proceed if both don't exist
if caching else True): # -> proceed anyway if caching is False
assert points.ndim == 2 and points.shape[-1] == 3, 'Array-like input must be of shape (X, 3)'
# depreciating that warning
# temp = np.max(np.abs(np.average(points, axis=0)/np.std(points, axis=0)))
# if temp>1: warnings.warn("Input points may be not centered, which may cause EnBiD to run into a SegmentationFault")
# center frame on most clustered structure using NN distances
NN = nghb.NearestNeighbors(n_neighbors=2)
NN.fit(points)
NN_distances: NDArray = NN.kneighbors(points)[0][:,1]
most_clustered_structure: NDArray = points[NN_distances < np.median(NN_distances)]
most_clustered_structure_center: NDArray = np.average(most_clustered_structure, axis=0)
#
coordinates: NDArray = points - most_clustered_structure_center
np.savetxt(enbid_inputfile, coordinates, delimiter=' ')
enbid_inputhashfile.write_bytes(inputhash)
return path
[docs]
def run_enbid(name: Optional[Union[str, pathlib.Path]] = None, ngb: int = DEFAULT_NGB,
verbose: bool = True, caching: bool = False, **kwargs: Dict[str, Any]) -> pathlib.Path:
"""
Run EnBiD using input files in name.
Call signature::
path = run_enbid(name=None, ngb=64, verbose=True,
caching=False, **kwargs)
Parameters
----------
name : string
Name of folder where EnBiD input files are located. Default to
None.
ngb : int
Number of neighbouring particles EnBiD should consider in the
smoothing for the density estimation. Default to {DEFAULT_NGB}.
verbose : bool
Verbose boolean flag to allow EnBiD to print what it's doing to
stdout. Default to True.
caching : bool
If True, check if EnBiD paramfile and usedvalues files already
exist, and ignore running if parameters from the previous run are
the same. Default to False.
spatial_scale : float
Scaling between position and velocity space where the scaling goes
as velocity = position/spatial_scale if spatial_scale is set
strictly positive, or velocity = position/std(position) if
spatial_scale is set to 0 (with std representing the standard
deviation for each coordinate). Default to 1 - TODO currently not
implemented.
part_bounday : int
Minimum number of particles which a node must contain to have a
boundary correction applied to its surfaces during tree generation.
Optimum choice should be whichever the higher between 7 or d+1
where d is the dimensionality of the space considered.
Default to 7.
node_splitting_criterion : int (0, 1)
Flag to allow for the node splitting to always split in priority
the dimension with lowest Shannon entropy. If set to 0, the
criteria splits each dimension alternately. Default to 1.
cubic_cells : int (0, 1)
Flag to allow the node splitting to use position or velocity
subspaces rather than individual dimensions when generating cells.
Only work for 3 & 6 dimensional spaces. Default to 0 - TODO
currently not implemented.
median_splitting_on : int (0, 1)
Flag to allow for cell splitting to happen at the mean of data
points when building the tree for faster estimates. Default to 0
- TODO currently not implemented.
type_of_smoothing : int (0, 1, 2, 3, 4, 5)
Type of smoothing used:
0) None
1) FiEstAS
2) Normal isotropic spherical kernel
3) Adaptive metric spherical kernel
4) Normal isotropic product form kernel
5) Adaptive metric product form kernel
Default to 3.
vol_corr : int (0, 1)
Flag to enable a correction that avoid underestimating density
when the smoothing box extends outside the boundary. Default to 1.
type_of_kernel : int (0, 1, 2, 3, 4, 5)
Type of the kernel profile used:
0) B-spline
1) Top hat
2) Bi-weight (1-x^2)^2
3) Epanechikov
4) Cloud in cell
5) Triangular shaped cloud
Default to 3.
kernel_bias_correction : int (0, 1)
Flag to enable corrections that displace central data points when
computing densities, and reduce bias caused by irregularly
distributed data. Default to 1.
anisotropy_kernel : int (0, 1)
Flag to enable the use of anisotropic kernels which can have both
shear and rotation. Kerels become then rotated ellipsoids in the
density computation. With it on, type_of_smoothing should be either
2 or 3. Default to 0.
anisotropy : float
Minimum allowable minor to major axis ratio of the kernel smoothing
lengths for computational management. Default to 0.
ngb_a : int
Number of neighbouring particles EnBiD should consider when
computing the anisotropic kernel. Default to ngb.
type_list_on : int (0, 1)
Flag to extend the number of particle types on which EnBiD can
run independent density estimations from the default 6 types of
GADGET formated data. Default to 0 - TODO currently not
implemented.
periodic_boundary_on : int (0, 1)
Flag to allow periodic boundary conditions. Default to 0 - TODO
currently not implemented.
Returns
----------
path : pathlib.Path
Path of folder where EnBiD output files are located.
"""
path: pathlib.Path = __make_path_of_name(name)
kwargs[TTAGS.des_num_ngb] = ngb
kwargs[TTAGS.des_num_ngb_a] = kwargs.pop('ngb_a', ngb)
paramfile_text: str = ENBID_PARAMFILE_TEMPLATE.substitute(DEFAULT_FOR_PARAMFILE, **kwargs)
paramfile: pathlib.Path = path / CONSTANTS.enbid_paramfile
usedvalfile: pathlib.Path = path / CONSTANTS.usedvalues
if ((paramfile.read_text() != paramfile_text # proceed if paramfile_text is not in paramfile,
if (paramfile.exists() and # only if paramfile exist,
usedvalfile.exists()) # and usedvalfile exists,
else True) # otherwise proceed if both don't exist
if caching else True): # -> proceed anyway if caching is False
paramfile.write_text(paramfile_text)
execute([CONSTANTS.enbid, CONSTANTS.enbid_paramfile], verbose=verbose, cwd=path)
return path
run_enbid.__doc__ = run_enbid.__doc__.format(DEFAULT_NGB=DEFAULT_NGB)
[docs]
def return_enbid(name: Optional[Union[str, pathlib.Path]] = None) -> NDArray:
"""
Read EnBiD output file and returns the associated kernel density
estimates after running the EnBiD estimator.
Call signature::
rho = return_enbid(name=None)
Parameters
----------
name : string
Name of folder where EnBiD saved its output files. Default to None.
Returns
----------
rho : array_like
Array representing the kernel density estimates output by EnBiD
"""
path: pathlib.Path = __make_path_of_name(name)
usedvals = pd.read_table(path / CONSTANTS.usedvalues, header=None, sep="\s+",
index_col=0).T.reset_index(drop=True).to_dict('records')[0]
rho: NDArray = np.loadtxt(path / f"{DEFAULT_FOR_PARAMFILE[TTAGS.fname]}{usedvals[SNAPSHOT_FILEBASE]}.{ENBID_OUT_EXT}")
return rho
[docs]
def enbid(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> NDArray:
"""
Returns kernel density estimates given a set of particle 3D coordinates.
Call signature::
rho = enbid(points, name=None, **kwargs)
Parameters
----------
points : array_like
Contains 3D coordinates of the input particles, must be of shape
(N,3) for any given N integer.
name : string
Name of folder where to save the input/output files for the EnBiD
estimator. Default to None.
caching : bool
Only to be used with a given folder under argument name. If True,
check if EnBiD had already been used to produce the kernel density
estimates. If it hasn't, compute the estimates, otherwise, load
the existing data that had previously been cached. Default to
False.
**kwargs : dict
Refer to function run_enbid documentation for additional keyword
arguments.
Returns
----------
rho : array_like
Array representing kernel density estimates for the input particles
"""
points = args[0]
name = kwargs.pop('name', None)
caching = False if name is None else kwargs.pop('caching', False)
return return_enbid(run_enbid(write_for_enbid(points, name=name, caching=caching), caching=caching, **kwargs))
if __name__ == '__main__':
raise NotImplementedError()