import functools

import numpy as np
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt
import jax.scipy
from jaxtyping import Array, Float  # https://github.com/google/jaxtyping

import jax_dataclasses as jdc

import scipy.optimize
import pandas as pd

# Global flag to set a specific platform, must be used at startup.
jax.config.update('jax_platform_name', 'cpu')
# use 64-bit precision for better accuracy
jax.config.update('jax_enable_x64', True)

Simulation of the 1D Heat Equation with Advection¶

This notebook simulates the 1D heat equation with advection and a point source. It numerically solves the equation to determine the temperature distribution along the length of a fluid tube and estimates the phase shift at different sensor locations.

The governing equation used for the simulation is:

$$ \frac{\partial T}{\partial t} = \alpha \frac{\partial^2 T}{\partial x^2} - v \frac{\partial T}{\partial x} + \frac{A}{\rho c} (1 + \sin(\omega_d t)) \delta(x) $$

where ( T(x, t) ) is the temperature distribution, ( \alpha ) is the thermal diffusivity, ( v ) is the fluid velocity, and ( \delta(x) ) represents the point heat source at ( x = 0 ).

Approach Overview¶

The 1D heat equation is solved in the frequency domain using Fourier Transforms. We simulate the system by calculating the temperature distribution at different positions downstream from the point source. The phase of the temperature signal at various sensor locations is extracted and analyzed.

Key Steps:¶

  • Define the simulation parameters such as the fluid properties, drive frequency, and sensor locations.
  • Simulate the temperature distribution along the tube over time.
  • Use Fourier Transform to compute the phase of the temperature signal at each sensor location.
@jdc.pytree_dataclass
class Fluid:
    thermal_conductivity: Float
    density: Float
    thermal_capacitance: Float

    @property
    def heat_diffusivity(self):
        return self.thermal_conductivity / (self.density * self.thermal_capacitance)

@jdc.pytree_dataclass
class SimulationParameters:
    fluid: Fluid
    drive_freq: float
    velocity: float = 0.1
    drive_amplitude: float = 1
    sensor_x: jdc.Static[float] = 0.03

Read some gas parameters from a csv file.

df = pd.read_csv("gas_thermal_properties_25C.csv")
# Convert df to dict of Fluids
fluids = {}
for i, row in df.iterrows():
    fluids[row["Gas"]] = Fluid(
        thermal_conductivity=row["Thermal Conductivity (W/m·K)"],
        density=row["Density (kg/m³)"],
        thermal_capacitance=row["Thermal Capacitance (kJ/kg·K)"]*1e3,
    )
del df

Solving the 1D Heat Equation Numerically¶

We use a trapzoidal integration scheme to solve the 1D heat equation numerically.

@functools.partial(jnp.vectorize, signature='()->()', excluded={0})
def temperature(params: SimulationParameters, t, num_points: int = 100000) -> Float:
  """Temperature at the sensor location at time t.
  
  Args:
    params: Simulation parameters.
    t: Time.
    num_points: Number of points to use in the integral.
  
  Returns:
    Temperature at the sensor location at time t.
  """
  dt = t / num_points
  ts = jnp.arange(num_points) * dt
  Q = params.drive_amplitude * (1 + jnp.sin(2 * jnp.pi * params.drive_freq * ts))
  tau = jnp.maximum(t - ts, dt)
  ys = Q / ((4 * jnp.pi * params.fluid.heat_diffusivity * tau)**0.5) * jnp.exp(-(params.sensor_x - params.velocity * tau)**2 / (4 * params.fluid.heat_diffusivity * tau))
  return jax.scipy.integrate.trapezoid(ys, ts) / (params.fluid.density * params.fluid.thermal_capacitance)

Run the Simulation and Plot Time-domain Results¶

params = SimulationParameters(fluid=fluids["CO2"], drive_freq=10, velocity=0.1, sensor_x=0.03)

ts = jnp.linspace(0, 2, 1000)
temperatures1 = temperature(params, ts)

params = SimulationParameters(fluid=fluids["N2"], drive_freq=10, velocity=0.1, sensor_x=0.03)
ts = jnp.linspace(0, 2, 1000)
temperatures2 = temperature(params, ts)

plt.plot(ts, temperatures1, label="CO2")
plt.plot(ts, temperatures2, label="N2")
plt.xlabel("Time (s)")
plt.ylabel("Temperature (K)")
plt.legend()

plt.grid()
plt.show()
@functools.cache
def simulate_frequency_response(params, measurement_time=0.2, sample_duration=1, dt=1e-3):
    """Simulate the system and find the phase of the signal at the sensor location.
    
    Args:
      params: SimulationParameters
      measurement_time: float, time to start measuring the signal
      sample_duration: float, duration to sample the signal
      dt: float, time step for the simulation

    Returns:
      complex: the response of the system at the drive frequency
    """

    ts = jnp.arange(0, measurement_time + sample_duration, dt)
    temperatures = temperature(params, ts)

    # slice after the equilibrium time
    sliced_ts = ts[ts >= measurement_time]
    sliced_temperatures = temperatures[ts >= measurement_time]

    ts, temperatures = sliced_ts, sliced_temperatures

    return jnp.sum(temperatures * jnp.exp(-2j * jnp.pi * params.drive_freq * ts) * dt)


# for sensor_x in [0.04, 0.03, 0.02, 0.01, 0.0]:
#   response = simulate_frequency_response(
#     SimulationParameters(fluid=fluids["CO2"], drive_freq=10, sensor_x=sensor_x))
#   print(f"Sensor position: {sensor_x}, response={response}")

Simulate Sensing and Parameter Estimation¶

  • Simulate and find the responses at two different sensor locations
  • Estimate the velocity and fluid's thermal diffusivity using the equation derived in the write-up
def simulate_sensing(fluid, drive_freq=10, x0=0.01, x1=0.03, velocity=0.1):
  """Simulate sensing the velocity and heat diffusivity of a fluid using two sensors at different locations.
  
  Args:
    fluid: Fluid, the fluid to sense
    drive_freq: float, the frequency of the drive signal
    x0: float, the position of the first sensor
    x1: float, the position of the second sensor
    velocity: float, the velocity of the fluid
  
  Returns:
    Tuple[float, float]: the estimated heat diffusivity and velocity
  """
  # Simulate sensing at two different locations
  response0 = simulate_frequency_response(SimulationParameters(fluid=fluid, drive_freq=drive_freq, sensor_x=x0, velocity=velocity))
  response1 = simulate_frequency_response(SimulationParameters(fluid=fluid, drive_freq=drive_freq, sensor_x=x1, velocity=velocity))

  print(f"response0: {response0}, response1: {response1}")

  ratio = response0 / response1
  K = np.log(ratio)
  omega = 2 * np.pi * drive_freq
  if x0 > 0 and x1 > 0:
    K /= (x0 - x1)
    a, b = K.real, K.imag
    solved_heat_diffusivity = omega * a / (b**3 + a**2 * b)
    solved_velocity = omega * (a**2 - b**2) / (b**3 + a**2 * b)
  elif x0 < 0 and x1 > 0:
    a, b = K.real, K.imag
    solved_heat_diffusivity = ((x0 + x1)**2 * (a * omega * (x0 + x1)**2 + 
        ((-x0 + x1) * np.sqrt(omega**2 * (-4 * b**2 * x0 * x1 + a**2 * (x0 + x1)**2))) /
        np.sign(a))) / (2 * b * (b**2 * (x0 - x1)**2 + a**2 * (x0 + x1)**2))
    solved_velocity = -(((x0 + x1)**2 * (b**2 * omega * (x0 - x1) + 
         np.sqrt(omega**2 * (-4 * b**2 * x0 * x1 + a**2 * (x0 + x1)**2)) * 
         np.abs(a))) / 
        (b * (b**2 * (x0 - x1)**2 + a**2 * (x0 + x1)**2)))
  else:
      raise ValueError("Invalid sensor positions")

  delta = (solved_velocity**2 + (4j * solved_heat_diffusivity * omega))**0.5 / (2 * solved_heat_diffusivity)
  lambda0 = solved_velocity / (2 * solved_heat_diffusivity) + delta
  lambda1 = solved_velocity / (2 * solved_heat_diffusivity) - delta
  solved_thermal_conductivity = jnp.abs(jnp.exp(lambda1 * x1) / (response1 * -2j) / (lambda1 - lambda0)) # assuming sine wave
  print(f"true heat_diffusivity: {fluid.heat_diffusivity}, true velocity: {velocity}, true thermal_conductivity: {fluid.thermal_conductivity}")
  print(f"heat_diffusivity: {solved_heat_diffusivity}, velocity: {solved_velocity} thermal_conductivity: {solved_thermal_conductivity}")
  print(f"heat_diffusivity error: {jnp.abs(solved_heat_diffusivity - fluid.heat_diffusivity) / fluid.heat_diffusivity * 100} %")
  print(f"velocity error: {jnp.abs(solved_velocity - velocity) / velocity * 100} %")
  print(f"thermal_conductivity error: {jnp.abs(solved_thermal_conductivity - fluid.thermal_conductivity) / fluid.thermal_conductivity * 100} %")
  print()
  return solved_heat_diffusivity, solved_velocity

# Need 2 * drive_freq < v / |x1 - x0|
simulate_sensing(fluids["N2"], velocity=0.2, drive_freq=10, x0=-0.011, x1=0.01)
response0: (1.1813609246656124e-47+3.0077171245154053e-47j), response1: (0.00011414578201715294+0.0018739209451578561j)
true heat_diffusivity: 2.1834061135371177e-05, true velocity: 0.2, true thermal_conductivity: 0.026
heat_diffusivity: 2.1834061135371188e-05, velocity: 0.20000000000000015 thermal_conductivity: 0.025999999999999985
heat_diffusivity error: 4.6552930781096347e-14 %
velocity error: 6.938893903907228e-14 %
thermal_conductivity error: 5.3376106953132526e-14 %

(2.1834061135371188e-05, 0.20000000000000015)