Module phi.physics.advect
Container for different advection schemes for grids and particles.
Examples:
 semi_lagrangian (grid)
 mac_cormack (grid)
 runge_kutta_4 (particle)
Expand source code
"""
Container for different advection schemes for grids and particles.
Examples:
* semi_lagrangian (grid)
* mac_cormack (grid)
* runge_kutta_4 (particle)
"""
import warnings
from phi import math
from phi.field import SampledField, ConstantField, Field, PointCloud, extrapolate_valid, Grid, sample, reduce_sample
from phi.field._field_math import GridType
from phi.geom import Geometry
def euler(elements: Geometry, velocity: Field, dt: float, v0: math.Tensor = None) > Geometry:
""" Euler integrator. """
if v0 is None:
v0 = sample(velocity, elements)
return elements.shifted(v0 * dt)
def rk4(elements: Geometry, velocity: Field, dt: float, v0: math.Tensor = None) > Geometry:
""" RungeKutta4 integrator. """
if v0 is None:
v0 = sample(velocity, elements)
vel_half = sample(velocity, elements.shifted(0.5 * dt * v0))
vel_half2 = sample(velocity, elements.shifted(0.5 * dt * vel_half))
vel_full = sample(velocity, elements.shifted(dt * vel_half2))
vel_rk4 = (1 / 6.) * (v0 + 2 * (vel_half + vel_half2) + vel_full)
return elements.shifted(dt * vel_rk4)
def advect(field: SampledField,
velocity: Field,
dt: float or math.Tensor,
integrator=euler) > Field:
"""
Advect `field` along the `velocity` vectors using the specified integrator.
The behavior depends on the type of `field`:
* `phi.field.PointCloud`: Points are advected forward, see `points`.
* `phi.field.Grid`: Sample points are traced backward, see `semi_lagrangian`.
Args:
field: Field to be advected as `phi.field.SampledField`.
velocity: Any `phi.field.Field` that can be sampled in the elements of `field`.
dt: Time increment
integrator: ODE integrator for solving the movement.
Returns:
Advected field of same type as `field`
"""
if isinstance(field, PointCloud):
return points(field, velocity, dt=dt, integrator=integrator)
elif isinstance(field, Grid):
return semi_lagrangian(field, velocity, dt=dt, integrator=integrator)
elif isinstance(field, ConstantField):
return field
raise NotImplementedError(field)
def points(field: PointCloud, velocity: Field, dt: float, integrator=euler):
"""
Advects the sample points of a point cloud using a simple Euler step.
Each point moves by an amount equal to the local velocity times `dt`.
Args:
field: point cloud to be advected
velocity: velocity sampled at the same points as the point cloud
dt: Euler step time increment
integrator: ODE integrator for solving the movement.
Returns:
Advected point cloud
"""
new_elements = integrator(field.elements, velocity, dt)
return field.with_elements(new_elements)
def semi_lagrangian(field: GridType,
velocity: Field,
dt: float,
integrator=euler) > GridType:
"""
SemiLagrangian advection with simple backward lookup.
This method samples the `velocity` at the grid points of `field`
to determine the lookup location for each grid point by walking backwards along the velocity vectors.
The new values are then determined by sampling `field` at these lookup locations.
Args:
field: quantity to be advected, stored on a grid (CenteredGrid or StaggeredGrid)
velocity: vector field, need not be compatible with with `field`.
dt: time increment
integrator: ODE integrator for solving the movement.
Returns:
Field with same sample points as `field`
"""
lookup = integrator(field.elements, velocity, dt)
interpolated = reduce_sample(field, lookup)
return field.with_values(interpolated)
def mac_cormack(field: GridType,
velocity: Field,
dt: float,
correction_strength=1.0,
integrator=euler) > GridType:
"""
MacCormack advection uses a forward and backward lookup to determine the firstorder error of semiLagrangian advection.
It then uses that error estimate to correct the field values.
To avoid overshoots, the resulting value is bounded by the neighbouring grid cells of the backward lookup.
Args:
field: Field to be advected, one of `(CenteredGrid, StaggeredGrid)`
velocity: Vector field, need not be sampled at same locations as `field`.
dt: Time increment
correction_strength: The estimated error is multiplied by this factor before being applied.
The case correction_strength=0 equals semilagrangian advection. Set lower than 1.0 to avoid oscillations.
integrator: ODE integrator for solving the movement.
Returns:
Advected field of type `type(field)`
"""
v0 = sample(velocity, field.elements)
points_bwd = integrator(field.elements, velocity, dt, v0=v0)
points_fwd = integrator(field.elements, velocity, dt, v0=v0)
# SemiLagrangian advection
field_semi_la = field.with_values(reduce_sample(field, points_bwd))
# Inverse semiLagrangian advection
field_inv_semi_la = field.with_values(reduce_sample(field_semi_la, points_fwd))
# correction
new_field = field_semi_la + correction_strength * 0.5 * (field  field_inv_semi_la)
# Address overshoots
limits = field.closest_values(points_bwd)
lower_limit = math.min(limits, [f'closest_{dim}' for dim in field.shape.spatial.names])
upper_limit = math.max(limits, [f'closest_{dim}' for dim in field.shape.spatial.names])
values_clamped = math.clip(new_field.values, lower_limit, upper_limit)
return new_field.with_values(values_clamped)
def runge_kutta_4(cloud: SampledField, velocity: Field, dt: float, accessible: Field = None, occupied: Field = None):
"""
Lagrangian advection of particles using a fourthorder rungekutta scheme. If `accessible` and `occupied` are specified,
the advection uses velocitydependent extrapolation of `velocity`.
Args:
cloud: PointCloud holding the particle positions as elements
velocity: velocity Grid which should get used for advection
dt: Time step for rungekutta
accessible: Boundary conditions for restricting extrapolation to accessible positions
occupied: Binary Grid indicating particle positions on the grid for extrapolation
Returns:
PointCloud with advected particle positions and their corresponding values.
"""
warnings.warn("runge_kutta_4 is deprecated. Use points() with integrator=rk4 instead.", DeprecationWarning)
assert isinstance(velocity, Grid), 'runge_kutta advection with extrapolation works for Grids only.'
def extrapolation_helper(elements, t_shift, v_field, mask):
shift = math.ceil(math.max(math.abs(elements.center  points.center)))  t_shift
t_shift += shift
v_field, mask = extrapolate_valid(v_field, mask, int(shift))
v_field *= accessible
return v_field, mask, t_shift
points = cloud.elements
total_shift = 0
extrapolate = accessible is not None and occupied is not None
#  Sample velocity at intermediate points and adjust velocitydependent
# extrapolation to maximum shift of corresponding component 
if extrapolate:
assert isinstance(occupied, type(velocity)), 'occupation mask must have same type as velocity.'
velocity, occupied = extrapolate_valid(velocity, occupied, 2)
velocity *= accessible
vel_k1 = sample(velocity, points)
shifted_points = points.shifted(0.5 * dt * vel_k1)
if extrapolate:
velocity, occupied, total_shift = extrapolation_helper(shifted_points, total_shift, velocity, occupied)
vel_k2 = sample(velocity, shifted_points)
shifted_points = points.shifted(0.5 * dt * vel_k2)
if extrapolate:
velocity, occupied, total_shift = extrapolation_helper(shifted_points, total_shift, velocity, occupied)
vel_k3 = sample(velocity, shifted_points)
shifted_points = points.shifted(dt * vel_k3)
if extrapolate:
velocity, _, _ = extrapolation_helper(shifted_points, total_shift, velocity, occupied)
vel_k4 = sample(velocity, shifted_points)
#  Combine points with RK4 scheme 
vel = (1/6.) * (vel_k1 + 2 * (vel_k2 + vel_k3) + vel_k4)
new_points = points.shifted(dt * vel)
return cloud.with_elements(new_points)
Functions
def advect(field: phi.field._field.SampledField, velocity: phi.field._field.Field, dt: float, integrator=<function euler>) ‑> phi.field._field.Field

Advect
field
along thevelocity
vectors using the specified integrator.The behavior depends on the type of
field
:PointCloud
: Points are advected forward, seepoints()
.Grid
: Sample points are traced backward, seesemi_lagrangian()
.
Args
field
 Field to be advected as
SampledField
. velocity
 Any
Field
that can be sampled in the elements offield
. dt
 Time increment
integrator
 ODE integrator for solving the movement.
Returns
Advected field of same type as
field
Expand source code
def advect(field: SampledField, velocity: Field, dt: float or math.Tensor, integrator=euler) > Field: """ Advect `field` along the `velocity` vectors using the specified integrator. The behavior depends on the type of `field`: * `phi.field.PointCloud`: Points are advected forward, see `points`. * `phi.field.Grid`: Sample points are traced backward, see `semi_lagrangian`. Args: field: Field to be advected as `phi.field.SampledField`. velocity: Any `phi.field.Field` that can be sampled in the elements of `field`. dt: Time increment integrator: ODE integrator for solving the movement. Returns: Advected field of same type as `field` """ if isinstance(field, PointCloud): return points(field, velocity, dt=dt, integrator=integrator) elif isinstance(field, Grid): return semi_lagrangian(field, velocity, dt=dt, integrator=integrator) elif isinstance(field, ConstantField): return field raise NotImplementedError(field)
def euler(elements: phi.geom._geom.Geometry, velocity: phi.field._field.Field, dt: float, v0: phi.math._tensors.Tensor = None) ‑> phi.geom._geom.Geometry

Euler integrator.
Expand source code
def euler(elements: Geometry, velocity: Field, dt: float, v0: math.Tensor = None) > Geometry: """ Euler integrator. """ if v0 is None: v0 = sample(velocity, elements) return elements.shifted(v0 * dt)
def mac_cormack(field: ~GridType, velocity: phi.field._field.Field, dt: float, correction_strength=1.0, integrator=<function euler>) ‑> ~GridType

MacCormack advection uses a forward and backward lookup to determine the firstorder error of semiLagrangian advection. It then uses that error estimate to correct the field values. To avoid overshoots, the resulting value is bounded by the neighbouring grid cells of the backward lookup.
Args
field
 Field to be advected, one of
(CenteredGrid, StaggeredGrid)
velocity
 Vector field, need not be sampled at same locations as
field
. dt
 Time increment
correction_strength
 The estimated error is multiplied by this factor before being applied. The case correction_strength=0 equals semilagrangian advection. Set lower than 1.0 to avoid oscillations.
integrator
 ODE integrator for solving the movement.
Returns
Advected field of type
type(field)
Expand source code
def mac_cormack(field: GridType, velocity: Field, dt: float, correction_strength=1.0, integrator=euler) > GridType: """ MacCormack advection uses a forward and backward lookup to determine the firstorder error of semiLagrangian advection. It then uses that error estimate to correct the field values. To avoid overshoots, the resulting value is bounded by the neighbouring grid cells of the backward lookup. Args: field: Field to be advected, one of `(CenteredGrid, StaggeredGrid)` velocity: Vector field, need not be sampled at same locations as `field`. dt: Time increment correction_strength: The estimated error is multiplied by this factor before being applied. The case correction_strength=0 equals semilagrangian advection. Set lower than 1.0 to avoid oscillations. integrator: ODE integrator for solving the movement. Returns: Advected field of type `type(field)` """ v0 = sample(velocity, field.elements) points_bwd = integrator(field.elements, velocity, dt, v0=v0) points_fwd = integrator(field.elements, velocity, dt, v0=v0) # SemiLagrangian advection field_semi_la = field.with_values(reduce_sample(field, points_bwd)) # Inverse semiLagrangian advection field_inv_semi_la = field.with_values(reduce_sample(field_semi_la, points_fwd)) # correction new_field = field_semi_la + correction_strength * 0.5 * (field  field_inv_semi_la) # Address overshoots limits = field.closest_values(points_bwd) lower_limit = math.min(limits, [f'closest_{dim}' for dim in field.shape.spatial.names]) upper_limit = math.max(limits, [f'closest_{dim}' for dim in field.shape.spatial.names]) values_clamped = math.clip(new_field.values, lower_limit, upper_limit) return new_field.with_values(values_clamped)
def points(field: phi.field._point_cloud.PointCloud, velocity: phi.field._field.Field, dt: float, integrator=<function euler>)

Advects the sample points of a point cloud using a simple Euler step. Each point moves by an amount equal to the local velocity times
dt
.Args
field
 point cloud to be advected
velocity
 velocity sampled at the same points as the point cloud
dt
 Euler step time increment
integrator
 ODE integrator for solving the movement.
Returns
Advected point cloud
Expand source code
def points(field: PointCloud, velocity: Field, dt: float, integrator=euler): """ Advects the sample points of a point cloud using a simple Euler step. Each point moves by an amount equal to the local velocity times `dt`. Args: field: point cloud to be advected velocity: velocity sampled at the same points as the point cloud dt: Euler step time increment integrator: ODE integrator for solving the movement. Returns: Advected point cloud """ new_elements = integrator(field.elements, velocity, dt) return field.with_elements(new_elements)
def rk4(elements: phi.geom._geom.Geometry, velocity: phi.field._field.Field, dt: float, v0: phi.math._tensors.Tensor = None) ‑> phi.geom._geom.Geometry

RungeKutta4 integrator.
Expand source code
def rk4(elements: Geometry, velocity: Field, dt: float, v0: math.Tensor = None) > Geometry: """ RungeKutta4 integrator. """ if v0 is None: v0 = sample(velocity, elements) vel_half = sample(velocity, elements.shifted(0.5 * dt * v0)) vel_half2 = sample(velocity, elements.shifted(0.5 * dt * vel_half)) vel_full = sample(velocity, elements.shifted(dt * vel_half2)) vel_rk4 = (1 / 6.) * (v0 + 2 * (vel_half + vel_half2) + vel_full) return elements.shifted(dt * vel_rk4)
def runge_kutta_4(cloud: phi.field._field.SampledField, velocity: phi.field._field.Field, dt: float, accessible: phi.field._field.Field = None, occupied: phi.field._field.Field = None)

Lagrangian advection of particles using a fourthorder rungekutta scheme. If
accessible
andoccupied
are specified, the advection uses velocitydependent extrapolation ofvelocity
.Args
cloud
 PointCloud holding the particle positions as elements
velocity
 velocity Grid which should get used for advection
dt
 Time step for rungekutta
accessible
 Boundary conditions for restricting extrapolation to accessible positions
occupied
 Binary Grid indicating particle positions on the grid for extrapolation
Returns
PointCloud with advected particle positions and their corresponding values.
Expand source code
def runge_kutta_4(cloud: SampledField, velocity: Field, dt: float, accessible: Field = None, occupied: Field = None): """ Lagrangian advection of particles using a fourthorder rungekutta scheme. If `accessible` and `occupied` are specified, the advection uses velocitydependent extrapolation of `velocity`. Args: cloud: PointCloud holding the particle positions as elements velocity: velocity Grid which should get used for advection dt: Time step for rungekutta accessible: Boundary conditions for restricting extrapolation to accessible positions occupied: Binary Grid indicating particle positions on the grid for extrapolation Returns: PointCloud with advected particle positions and their corresponding values. """ warnings.warn("runge_kutta_4 is deprecated. Use points() with integrator=rk4 instead.", DeprecationWarning) assert isinstance(velocity, Grid), 'runge_kutta advection with extrapolation works for Grids only.' def extrapolation_helper(elements, t_shift, v_field, mask): shift = math.ceil(math.max(math.abs(elements.center  points.center)))  t_shift t_shift += shift v_field, mask = extrapolate_valid(v_field, mask, int(shift)) v_field *= accessible return v_field, mask, t_shift points = cloud.elements total_shift = 0 extrapolate = accessible is not None and occupied is not None #  Sample velocity at intermediate points and adjust velocitydependent # extrapolation to maximum shift of corresponding component  if extrapolate: assert isinstance(occupied, type(velocity)), 'occupation mask must have same type as velocity.' velocity, occupied = extrapolate_valid(velocity, occupied, 2) velocity *= accessible vel_k1 = sample(velocity, points) shifted_points = points.shifted(0.5 * dt * vel_k1) if extrapolate: velocity, occupied, total_shift = extrapolation_helper(shifted_points, total_shift, velocity, occupied) vel_k2 = sample(velocity, shifted_points) shifted_points = points.shifted(0.5 * dt * vel_k2) if extrapolate: velocity, occupied, total_shift = extrapolation_helper(shifted_points, total_shift, velocity, occupied) vel_k3 = sample(velocity, shifted_points) shifted_points = points.shifted(dt * vel_k3) if extrapolate: velocity, _, _ = extrapolation_helper(shifted_points, total_shift, velocity, occupied) vel_k4 = sample(velocity, shifted_points) #  Combine points with RK4 scheme  vel = (1/6.) * (vel_k1 + 2 * (vel_k2 + vel_k3) + vel_k4) new_points = points.shifted(dt * vel) return cloud.with_elements(new_points)
def semi_lagrangian(field: ~GridType, velocity: phi.field._field.Field, dt: float, integrator=<function euler>) ‑> ~GridType

SemiLagrangian advection with simple backward lookup.
This method samples the
velocity
at the grid points offield
to determine the lookup location for each grid point by walking backwards along the velocity vectors. The new values are then determined by samplingfield
at these lookup locations.Args
field
 quantity to be advected, stored on a grid (CenteredGrid or StaggeredGrid)
velocity
 vector field, need not be compatible with with
field
. dt
 time increment
integrator
 ODE integrator for solving the movement.
Returns
Field with same sample points as
field
Expand source code
def semi_lagrangian(field: GridType, velocity: Field, dt: float, integrator=euler) > GridType: """ SemiLagrangian advection with simple backward lookup. This method samples the `velocity` at the grid points of `field` to determine the lookup location for each grid point by walking backwards along the velocity vectors. The new values are then determined by sampling `field` at these lookup locations. Args: field: quantity to be advected, stored on a grid (CenteredGrid or StaggeredGrid) velocity: vector field, need not be compatible with with `field`. dt: time increment integrator: ODE integrator for solving the movement. Returns: Field with same sample points as `field` """ lookup = integrator(field.elements, velocity, dt) interpolated = reduce_sample(field, lookup) return field.with_values(interpolated)