import functools
import numpy as np
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt
import jax.scipy
from jaxtyping import Array, Float # https://github.com/google/jaxtyping
import jax_dataclasses as jdc
import scipy.optimize
import pandas as pd
# Global flag to set a specific platform, must be used at startup.
jax.config.update('jax_platform_name', 'cpu')
# use 64-bit precision for better accuracy
jax.config.update('jax_enable_x64', True)
This notebook simulates the 1D heat equation with advection and a point source. It numerically solves the equation to determine the temperature distribution along the length of a fluid tube and estimates the phase shift at different sensor locations.
The governing equation used for the simulation is:
$$ \frac{\partial T}{\partial t} = \alpha \frac{\partial^2 T}{\partial x^2} - v \frac{\partial T}{\partial x} + \frac{A}{\rho c} (1 + \sin(\omega_d t)) \delta(x) $$where ( T(x, t) ) is the temperature distribution, ( \alpha ) is the thermal diffusivity, ( v ) is the fluid velocity, and ( \delta(x) ) represents the point heat source at ( x = 0 ).
The 1D heat equation is solved in the frequency domain using Fourier Transforms. We simulate the system by calculating the temperature distribution at different positions downstream from the point source. The phase of the temperature signal at various sensor locations is extracted and analyzed.
@jdc.pytree_dataclass
class Fluid:
thermal_conductivity: Float
density: Float
thermal_capacitance: Float
@property
def heat_diffusivity(self):
return self.thermal_conductivity / (self.density * self.thermal_capacitance)
@jdc.pytree_dataclass
class SimulationParameters:
fluid: Fluid
drive_freq: float
velocity: float = 0.1
drive_amplitude: float = 1
sensor_x: jdc.Static[float] = 0.03
Read some gas parameters from a csv file.
df = pd.read_csv("gas_thermal_properties_25C.csv")
# Convert df to dict of Fluids
fluids = {}
for i, row in df.iterrows():
fluids[row["Gas"]] = Fluid(
thermal_conductivity=row["Thermal Conductivity (W/m·K)"],
density=row["Density (kg/m³)"],
thermal_capacitance=row["Thermal Capacitance (kJ/kg·K)"]*1e3,
)
del df
We use a trapzoidal integration scheme to solve the 1D heat equation numerically.
@functools.partial(jnp.vectorize, signature='()->()', excluded={0})
def temperature(params: SimulationParameters, t, num_points: int = 100000) -> Float:
"""Temperature at the sensor location at time t.
Args:
params: Simulation parameters.
t: Time.
num_points: Number of points to use in the integral.
Returns:
Temperature at the sensor location at time t.
"""
dt = t / num_points
ts = jnp.arange(num_points) * dt
Q = params.drive_amplitude * (1 + jnp.sin(2 * jnp.pi * params.drive_freq * ts))
tau = jnp.maximum(t - ts, dt)
ys = Q / ((4 * jnp.pi * params.fluid.heat_diffusivity * tau)**0.5) * jnp.exp(-(params.sensor_x - params.velocity * tau)**2 / (4 * params.fluid.heat_diffusivity * tau))
return jax.scipy.integrate.trapezoid(ys, ts) / (params.fluid.density * params.fluid.thermal_capacitance)
params = SimulationParameters(fluid=fluids["CO2"], drive_freq=10, velocity=0.1, sensor_x=0.03)
ts = jnp.linspace(0, 2, 1000)
temperatures1 = temperature(params, ts)
params = SimulationParameters(fluid=fluids["N2"], drive_freq=10, velocity=0.1, sensor_x=0.03)
ts = jnp.linspace(0, 2, 1000)
temperatures2 = temperature(params, ts)
plt.plot(ts, temperatures1, label="CO2")
plt.plot(ts, temperatures2, label="N2")
plt.xlabel("Time (s)")
plt.ylabel("Temperature (K)")
plt.legend()
plt.grid()
plt.show()
@functools.cache
def simulate_frequency_response(params, measurement_time=0.2, sample_duration=1, dt=1e-3):
"""Simulate the system and find the phase of the signal at the sensor location.
Args:
params: SimulationParameters
measurement_time: float, time to start measuring the signal
sample_duration: float, duration to sample the signal
dt: float, time step for the simulation
Returns:
complex: the response of the system at the drive frequency
"""
ts = jnp.arange(0, measurement_time + sample_duration, dt)
temperatures = temperature(params, ts)
# slice after the equilibrium time
sliced_ts = ts[ts >= measurement_time]
sliced_temperatures = temperatures[ts >= measurement_time]
ts, temperatures = sliced_ts, sliced_temperatures
return jnp.sum(temperatures * jnp.exp(-2j * jnp.pi * params.drive_freq * ts) * dt)
# for sensor_x in [0.04, 0.03, 0.02, 0.01, 0.0]:
# response = simulate_frequency_response(
# SimulationParameters(fluid=fluids["CO2"], drive_freq=10, sensor_x=sensor_x))
# print(f"Sensor position: {sensor_x}, response={response}")
def simulate_sensing(fluid, drive_freq=10, x0=0.01, x1=0.03, velocity=0.1):
"""Simulate sensing the velocity and heat diffusivity of a fluid using two sensors at different locations.
Args:
fluid: Fluid, the fluid to sense
drive_freq: float, the frequency of the drive signal
x0: float, the position of the first sensor
x1: float, the position of the second sensor
velocity: float, the velocity of the fluid
Returns:
Tuple[float, float]: the estimated heat diffusivity and velocity
"""
# Simulate sensing at two different locations
response0 = simulate_frequency_response(SimulationParameters(fluid=fluid, drive_freq=drive_freq, sensor_x=x0, velocity=velocity))
response1 = simulate_frequency_response(SimulationParameters(fluid=fluid, drive_freq=drive_freq, sensor_x=x1, velocity=velocity))
print(f"response0: {response0}, response1: {response1}")
ratio = response0 / response1
K = np.log(ratio)
omega = 2 * np.pi * drive_freq
if x0 > 0 and x1 > 0:
K /= (x0 - x1)
a, b = K.real, K.imag
solved_heat_diffusivity = omega * a / (b**3 + a**2 * b)
solved_velocity = omega * (a**2 - b**2) / (b**3 + a**2 * b)
elif x0 < 0 and x1 > 0:
a, b = K.real, K.imag
solved_heat_diffusivity = ((x0 + x1)**2 * (a * omega * (x0 + x1)**2 +
((-x0 + x1) * np.sqrt(omega**2 * (-4 * b**2 * x0 * x1 + a**2 * (x0 + x1)**2))) /
np.sign(a))) / (2 * b * (b**2 * (x0 - x1)**2 + a**2 * (x0 + x1)**2))
solved_velocity = -(((x0 + x1)**2 * (b**2 * omega * (x0 - x1) +
np.sqrt(omega**2 * (-4 * b**2 * x0 * x1 + a**2 * (x0 + x1)**2)) *
np.abs(a))) /
(b * (b**2 * (x0 - x1)**2 + a**2 * (x0 + x1)**2)))
else:
raise ValueError("Invalid sensor positions")
delta = (solved_velocity**2 + (4j * solved_heat_diffusivity * omega))**0.5 / (2 * solved_heat_diffusivity)
lambda0 = solved_velocity / (2 * solved_heat_diffusivity) + delta
lambda1 = solved_velocity / (2 * solved_heat_diffusivity) - delta
solved_thermal_conductivity = jnp.abs(jnp.exp(lambda1 * x1) / (response1 * -2j) / (lambda1 - lambda0)) # assuming sine wave
print(f"true heat_diffusivity: {fluid.heat_diffusivity}, true velocity: {velocity}, true thermal_conductivity: {fluid.thermal_conductivity}")
print(f"heat_diffusivity: {solved_heat_diffusivity}, velocity: {solved_velocity} thermal_conductivity: {solved_thermal_conductivity}")
print(f"heat_diffusivity error: {jnp.abs(solved_heat_diffusivity - fluid.heat_diffusivity) / fluid.heat_diffusivity * 100} %")
print(f"velocity error: {jnp.abs(solved_velocity - velocity) / velocity * 100} %")
print(f"thermal_conductivity error: {jnp.abs(solved_thermal_conductivity - fluid.thermal_conductivity) / fluid.thermal_conductivity * 100} %")
print()
return solved_heat_diffusivity, solved_velocity
# Need 2 * drive_freq < v / |x1 - x0|
simulate_sensing(fluids["N2"], velocity=0.2, drive_freq=10, x0=-0.011, x1=0.01)
response0: (1.1813609246656124e-47+3.0077171245154053e-47j), response1: (0.00011414578201715294+0.0018739209451578561j) true heat_diffusivity: 2.1834061135371177e-05, true velocity: 0.2, true thermal_conductivity: 0.026 heat_diffusivity: 2.1834061135371188e-05, velocity: 0.20000000000000015 thermal_conductivity: 0.025999999999999985 heat_diffusivity error: 4.6552930781096347e-14 % velocity error: 6.938893903907228e-14 % thermal_conductivity error: 5.3376106953132526e-14 %
(2.1834061135371188e-05, 0.20000000000000015)