Source code for enbid_ananke

#!/usr/bin/env python
#
# Author: Adrien CR Thob
# Copyright (C) 2022  Adrien CR Thob
#
# This file is part of the py-EnBiD-ananke project,
# <https://github.com/athob/py-EnBiD-ananke>,  which is licensed
# under the GNU General Public License v2.0 (GPL-2.0).
# 
# The full copyright notice, including terms governing use, modification,
# and redistribution, is contained in the files LICENSE and COPYRIGHT,
# which can be found at the root of the source code distribution tree:
# - LICENSE <https://github.com/athob/py-EnBiD-ananke/blob/main/LICENSE>
# - COPYRIGHT <https://github.com/athob/py-EnBiD-ananke/blob/main/COPYRIGHT>
#
"""
enbid_ananke
============

Provides a set of utilities to run the kernel density estimator EnBiD
(`Sharma & Steinmetz 2011 <http://ascl.net/1109.012>`).

How to use
----------

enbid_ananke comes with the function enbid, please refer to its documentation
for further help.
"""
from typing import Any, Optional, Union, Tuple, Dict
from numpy.typing import ArrayLike, NDArray
import pathlib
import warnings
import hashlib
import struct
import numpy as np
import pandas as pd
from sklearn import neighbors as nghb

from .__metadata__ import *
from ._constants import *
from ._templates import *
from ._defaults import *
from .utils import execute

__all__ = ['enbid']


def __make_path_of_name(name : Optional[Union[str, pathlib.Path]] = None) -> pathlib.Path:
    """
        Generate the folders structure representing a given name as a path,
        or generate a temporary one.

        Call signature::

            path = __make_path_of_name(name=None)
        
        Parameters
        ----------
        name : string
            Path representing a folders structure. Default to None.
        
        Returns
        ----------
        path : pathlib.Path
            Path corresponding to given name, or to new temporary one.
    """
    if name is None:
        raise NotImplementedError("name is None")  # TODO https://pypi.org/project/temppathlib/
    else:
        path = pathlib.Path(name)
        path.mkdir(parents=True, exist_ok=True)
    return path


[docs] def write_gadget_file(filename: pathlib.Path, pos: NDArray, mass: NDArray, vel: Optional[NDArray] = None, boxsize: float = 0.0, time: float = 0.0, redshift: float = 0.0, omega0: float = 0.0, omegalambda: float = 0.0, hubble: float = 0.7, particle_type: int = 1) -> None: """ Write a GADGET-1 format snapshot file to serve as input for EnBiD (3D or 6D). Parameters ---------- filename : pathlib.Path Output file path. pos : ndarray, shape (N, 3) Particle positions (float32). mass : ndarray, shape (N,) Particle masses (float32). vel : ndarray, shape (N, 3), optional Particle velocities. If None, zero velocities are written. boxsize : float, optional Size of periodic box. time, redshift, omega0, omegalambda, hubble : float, optional Cosmological parameters (ignored by EnBiD but part of header). particle_type : int, optional Particle type index (0-5). """ N = pos.shape[0] if mass.shape[0] != N: raise ValueError("pos and mass must have same number of particles") # Ensure correct data types pos = np.asarray(pos, dtype=np.float32) mass = np.asarray(mass, dtype=np.float32) # vel = np.zeros((N, 3), dtype=np.float32) # dummy velocities if vel is None: vel = np.zeros((N, 3), dtype=np.float32) # dummy velocities else: vel = np.asarray(vel, dtype=np.float32) if vel.shape[0] != N or vel.shape[1] != 3: raise ValueError("vel must have shape (N, 3)") ids = np.arange(1, N+1, dtype=np.int32) # dummy IDs (1-based) # Build header (256 bytes) npart = np.zeros(6, dtype=np.int32) npart[particle_type] = N massarr = np.zeros(6, dtype=np.float64) nall = npart.copy() # Build header without padding first, then pad to 256 bytes header_data = struct.pack( '<6i6d2d2i6i2i4d3i', npart[0], npart[1], npart[2], npart[3], npart[4], npart[5], massarr[0], massarr[1], massarr[2], massarr[3], massarr[4], massarr[5], time, redshift, 0, 0, # flag_sfr, flag_feedback nall[0], nall[1], nall[2], nall[3], nall[4], nall[5], 0, 1, # flag_cooling, num_files boxsize, omega0, omegalambda, hubble, 0, 0, 1 # flag_id, flag_dim, flag_density ) # Pad to exactly 256 bytes padding_len = 256 - len(header_data) if padding_len < 0: raise RuntimeError("Header exceeds 256 bytes") header_data += b'\x00' * padding_len def write_record(f, data): f.write(struct.pack('<i', len(data))) f.write(data) f.write(struct.pack('<i', len(data))) with open(filename, 'wb') as f: write_record(f, header_data) write_record(f, pos.ravel().tobytes()) write_record(f, vel.ravel().tobytes()) write_record(f, ids.tobytes()) write_record(f, mass.ravel().tobytes())
[docs] def write_for_enbid(points: ArrayLike, velocities: Optional[ArrayLike] = None, mass: Optional[ArrayLike] = None, name: Optional[Union[str, pathlib.Path]] = None, caching: bool = False) -> pathlib.Path: """ Writes the input files for EnBiD from 3D positions (and optional 3D velocities). If velocities are given, the combined input represents 6D phase-space data. The position frame is automatically centred on the most clustered structure to improve numerical stability; velocities are left unchanged. Call signature:: path = write_for_enbid(points, velocities=None, mass=None, name=None, caching=False) Parameters ---------- points : array_like Contains 3D coordinates of the input particles, must be of shape (N,3) for any given N integer. velocities : array_like, optional Particle velocities, shape (N, 3). If provided, EnBiD will run in 6D phase-space mode. mass : array_like, optional Contains the mass of each particle. Must be a 1D array of length N. If provided, a GADGET binary input file is written and EnBiD will compute mass-weighted densities. If None (default), an ASCII file is written and unit masses are assumed. name : string, optional Name of folder where to place EnBiD input files. Default to None. caching : bool, optional If True, check if EnBiD input file already exists and ignore writing if it does. Default to False. Returns ---------- path : pathlib.Path Path of folder where EnBiD input files are located. """ path: pathlib.Path = __make_path_of_name(name) enbid_inputfile: pathlib.Path = path / DEFAULT_FOR_PARAMFILE[TTAGS.fname] enbid_inputhashfile: pathlib.Path = enbid_inputfile.with_suffix(f".{HASH_EXT}") points: NDArray = np.asarray(points) mass_arr: Optional[NDArray] = np.asarray(mass) if mass is not None else None vel_arr: Optional[NDArray] = np.asarray(velocities) if velocities is not None else None # Validate shapes if vel_arr is not None: assert points.ndim == 2 and points.shape[-1] == 3, 'Points must be (N,3)' assert vel_arr.ndim == 2 and vel_arr.shape[-1] == 3, 'Velocities must be (N,3)' assert vel_arr.shape[0] == points.shape[0], 'Points and velocities must have same N' else: assert points.ndim == 2 and points.shape[-1] == 3, 'Points must be (N,3)' if mass_arr is not None: assert mass_arr.ndim == 1 and mass_arr.shape[0] == points.shape[0], 'Mass array length mismatch' # Compute hash including mass and/or velocities if present hash_input = points.tobytes() if vel_arr is not None: hash_input += vel_arr.tobytes() if mass_arr is not None: hash_input += mass_arr.tobytes() inputhash = bytes(hashlib.sha256(hash_input).hexdigest(), HASH_ENCODING) # Check if we need to write the file if ((enbid_inputhashfile.read_bytes() != inputhash # proceed if hashes don't match, if (enbid_inputfile.exists() and # only if enbid_inputfile exists, enbid_inputhashfile.exists()) # and enbid_inputhashfile exists, else True) # otherwise proceed if both don't exist if caching else True): # -> proceed anyway if caching is False assert points.ndim == 2 and points.shape[-1] == 3, 'Array-like input must be of shape (X, 3)' if mass_arr is not None: assert mass_arr.ndim == 1 and mass_arr.shape[0] == points.shape[0], 'mass must be 1D array with same length as points' if points.shape[0]: # center frame on most clustered structure using NN distances NN = nghb.NearestNeighbors(n_neighbors=2) NN.fit(points) NN_distances: NDArray = NN.kneighbors(points)[0][:,1] most_clustered_structure: NDArray = points[NN_distances < np.median(NN_distances)] most_clustered_structure_center: NDArray = np.average(most_clustered_structure, axis=0) # coordinates: NDArray = points - most_clustered_structure_center else: coordinates: NDArray = points if mass_arr is None: # ASCII format: write positions (and velocities if present) if vel_arr is not None: # 6 columns combined = np.column_stack((coordinates, vel_arr)) np.savetxt(enbid_inputfile, combined, delimiter=' ') else: np.savetxt(enbid_inputfile, coordinates, delimiter=' ') else: # GADGET binary format write_gadget_file(enbid_inputfile, coordinates, mass_arr, vel=vel_arr if vel_arr is not None else None) enbid_inputhashfile.write_bytes(inputhash) return path
[docs] def run_enbid(points: ArrayLike, velocities: Optional[ArrayLike] = None, mass: Optional[ArrayLike] = None, name: Optional[Union[str, pathlib.Path]] = None, ngb: int = DEFAULT_NGB, verbose: bool = True, caching: bool = False, **kwargs: Dict[str, Any]) -> pathlib.Path: """ Run the EnBiD kernel density estimator on the supplied particle data. The appropriate pre-compiled EnBiD binary (3D or 6D) is selected automatically based on whether ``velocities`` are provided. Call signature:: path = run_enbid(points, velocities=None, mass=None, name=None, ngb=64, verbose=True, caching=False, **kwargs) Parameters ---------- points : array_like Contains 3D coordinates of the input particles, must be of shape (N,3) for any given N integer. velocities : array_like, optional Particle velocities, shape (N, 3). If provided, EnBiD runs in 6D mode. mass : array_like, optional Contains the mass of each particle. Must be a 1D array of length N. If provided, a GADGET binary input file is written and EnBiD will compute mass-weighted densities. If None (default), an ASCII file is written and unit masses are assumed. name : string, optional Name of folder where EnBiD input files are located. Default to None. ngb : int, optional Number of neighbouring particles EnBiD should consider in the smoothing for the density estimation. Default to {DEFAULT_NGB}. verbose : bool, optional Verbose boolean flag to allow EnBiD to print what it's doing to stdout. Default to True. caching : bool, optional If True, the input data is hashed and compared to a cached hash. If the data (points, velocities, masses) are unchanged, the existing parameter file and density estimates are reused. Default to False. spatial_scale : float, optional Scaling between position and velocity space where the scaling goes as velocity = position/spatial_scale if spatial_scale is set strictly positive, or velocity = position/std(position) if spatial_scale is set to 0 (with std representing the standard deviation for each coordinate). Default to 1 - TODO currently not implemented. part_boundary : int, optional Minimum number of particles which a node must contain to have a boundary correction applied to its surfaces during tree generation. Optimum choice should be the larger of 7 or d+1, where d is the dimensionality of the space considered. Default to 7. node_splitting_criterion : int (0, 1), optional Flag to allow for the node splitting to always split in priority the dimension with lowest Shannon entropy. If set to 0, the criteria splits each dimension alternately. Default to 1. cubic_cells : int (0, 1), optional Flag to allow the node splitting to use position or velocity subspaces rather than individual dimensions when generating cells. Only work for 3 & 6 dimensional spaces. Default to 0 - TODO currently not implemented. median_splitting_on : int (0, 1), optional Flag to allow for cell splitting to happen at the mean of data points when building the tree for faster estimates. Default to 1 (requires EnBiD compiled with -DMEDIAN TODO). type_of_smoothing : int (0, 1, 2, 3, 4, 5), optional Type of smoothing used: 0) None 1) FiEstAS 2) Normal isotropic spherical kernel 3) Adaptive metric spherical kernel 4) Normal isotropic product form kernel 5) Adaptive metric product form kernel Default to 3. vol_corr : int (0, 1), optional Flag to enable a correction that avoid underestimating density when the smoothing box extends outside the boundary. Default to 1. type_of_kernel : int (0, 1, 2, 3, 4, 5), optional Type of the kernel profile used: 0) B-spline 1) Top hat 2) Bi-weight (1-x^2)^2 3) Epanechikov 4) Cloud in cell 5) Triangular shaped cloud Default to 3. kernel_bias_correction : int (0, 1), optional Flag to enable corrections that displace central data points when computing densities, and reduce bias caused by irregularly distributed data. Default to 1. anisotropy_kernel : int (0, 1), optional Flag to enable the use of anisotropic kernels which can have both shear and rotation. Kerels become then rotated ellipsoids in the density computation. With it on, type_of_smoothing should be either 2 or 3. Default to 0. anisotropy : float, optional Minimum allowable minor to major axis ratio of the kernel smoothing lengths for computational management. Default to 0. ngb_a : int, optional Number of neighbouring particles EnBiD should consider when computing the anisotropic kernel. Default to ngb. type_list_on : int (0, 1), optional Flag to extend the number of particle types on which EnBiD can run independent density estimations from the default 6 types of GADGET formated data. Default to 0 - TODO currently not implemented. periodic_boundary_on : int (0, 1), optional Flag to allow periodic boundary conditions. Default to 0 - TODO currently not implemented. Returns ---------- path : pathlib.Path Path of folder where EnBiD output files are located. """ # Determine ICFormat based on whether mass is provided ic_format = 1 if mass is not None else 0 kwargs[TTAGS.ic_format] = ic_format # Determine dimension tag for output file suffix dim_tag = 'd6' if velocities is not None else 'd3' kwargs[TTAGS.snapshot_filebase] = f"_{dim_tag}n{ngb}" # Write input files path = write_for_enbid(points, velocities=velocities, mass=mass, name=name, caching=caching) kwargs[TTAGS.des_num_ngb] = ngb kwargs[TTAGS.des_num_ngb_a] = kwargs.pop('ngb_a', ngb) paramfile_text: str = ENBID_PARAMFILE_TEMPLATE.substitute(DEFAULT_FOR_PARAMFILE, **kwargs) paramfile: pathlib.Path = path / CONSTANTS.enbid_paramfile usedvalfile: pathlib.Path = path / CONSTANTS.usedvalues if ((paramfile.read_text() != paramfile_text # proceed if paramfile_text is not in paramfile, if (paramfile.exists() and # only if paramfile exist, usedvalfile.exists()) # and usedvalfile exists, else True) # otherwise proceed if both don't exist if caching else True): # -> proceed anyway if caching is False paramfile.write_text(paramfile_text) # Choose the right executable enbid_exec = CONSTANTS.enbid6d if velocities is not None else CONSTANTS.enbid3d execute([enbid_exec, CONSTANTS.enbid_paramfile], verbose=verbose, cwd=path) return path
run_enbid.__doc__ = run_enbid.__doc__.format(DEFAULT_NGB=DEFAULT_NGB)
[docs] def read_enbid_binary(filename: pathlib.Path) -> NDArray: """ Read a binary EnBiD density file (GADGET-style output). The file contains a 256-byte header followed by a block of single-precision floats (density for each particle). Each record is preceded and followed by a 4-byte integer indicating the block size (Fortran unformatted). Parameters ---------- filename : pathlib.Path Path to the .est file. Returns ------- density : ndarray Array of density values (float32). """ with open(filename, 'rb') as f: # Read and skip header block blklen_data = f.read(4) if len(blklen_data) != 4: raise IOError("Failed to read block size for header") blklen = struct.unpack('<i', blklen_data)[0] if blklen != 256: raise ValueError(f"Expected header size 256, got {blklen}") f.read(256) # header f.read(4) # trailing block size # Read density block blklen_data = f.read(4) if len(blklen_data) != 4: raise IOError("Failed to read block size for density") blklen = struct.unpack('<i', blklen_data)[0] n_floats = blklen // 4 density_bytes = f.read(blklen) density = np.frombuffer(density_bytes, dtype=np.float32, count=n_floats) return density
[docs] def return_enbid(name: Optional[Union[str, pathlib.Path]] = None) -> NDArray: """ Read EnBiD output file and returns the associated kernel density estimates after running the EnBiD estimator. Automatically detects whether the output is ASCII or binary based on the ICFormat used in the run. Call signature:: rho = return_enbid(name=None) Parameters ---------- name : string Name of folder where EnBiD saved its output files. Default to None. Returns ---------- rho : array_like Array representing the kernel density estimates output by EnBiD """ path: pathlib.Path = __make_path_of_name(name) usedvals = pd.read_table(path / CONSTANTS.usedvalues, header=None, sep="\s+", index_col=0).T.reset_index(drop=True).to_dict('records')[0] output_file = path / f"{DEFAULT_FOR_PARAMFILE[TTAGS.fname]}{usedvals[SNAPSHOT_FILEBASE]}.{ENBID_OUT_EXT}" # Check ICFormat used in this run ic_format = int(usedvals.get('ICFormat', 0)) if ic_format == 1: # Binary GADGET output rho = read_enbid_binary(output_file) else: # ASCII output (default) rho = np.loadtxt(output_file) return rho
[docs] def enbid(points: ArrayLike, velocities: Optional[ArrayLike] = None, mass: Optional[ArrayLike] = None, **kwargs: Dict[str, Any]) -> NDArray: """ Returns kernel density estimates for a set of particles. If only 3D positions are given, the result is a 3D spatial density. If 3D velocities are also provided, the estimate becomes a 6D phase-space density. When a `mass` array is given, the densities are mass-weighted (mass density); otherwise they are number densities. Call signature:: rho = enbid(points, velocities=None, mass=None, name=None, **kwargs) Parameters ---------- points : array_like Contains 3D coordinates of the input particles, must be of shape (N,3) for any given N integer. velocities : array_like, optional Particle velocities, shape (N, 3). If provided, EnBiD runs in 6D phase-space mode. mass : array_like, optional Contains the mass of each particle. Must be a 1D array of length N. If provided, a GADGET binary input file is written and EnBiD will compute mass-weighted densities (i.e., mass density). If None (default), an ASCII file is written and unit masses are assumed (number density). name : string, optional Name of folder where to save the input/output files for the EnBiD estimator. Default to None. caching : bool, optional Only to be used with a given folder under argument name. If True, check if EnBiD had already been used to produce the kernel density estimates. If it hasn't, compute the estimates, otherwise, load the existing data that had previously been cached. Default to False. **kwargs : dict Refer to function run_enbid documentation for additional keyword arguments. Returns ---------- rho : array_like Array of density values. Units: mass density if `mass` is given, number density otherwise. """ # points = args[0] name = kwargs.pop('name', None) caching = False if name is None else kwargs.pop('caching', False) return return_enbid(run_enbid(points, velocities=velocities, mass=mass, name=name, caching=caching, **kwargs))
if __name__ == '__main__': raise NotImplementedError()