#%% import stuff
import os
import random as rn
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
#%% define 2D transient in-compressible navier stokes equations
def navier_stokes_residual(ns):
with tf.GradientTape(persistent=True) as tape: # set the input variable ns for gradient computation
# extract components from the input tensor ns
u = model(ns)[:, 0] # x velocity
v = model(ns)[:, 1] # y velocity
p = model(ns)[:, 2] # pressure
# compute derivatives with respect to x, y, t, and second derivatives with resoect to x, y
u_x = tape.gradient(u, ns)[:, 0] # first derivative of neural network output (u) with respect to x
u_y = tape.gradient(u, ns)[:, 1] # first derivative of neural network output (u) with respect to y
u_t = tape.gradient(u, ns)[:, 2] # first derivative of neural network output (u) with respect to t
v_x = tape.gradient(v, ns)[:, 0] # first derivative of neural network output (v) with respect to x
v_y = tape.gradient(v, ns)[:, 1] # first derivative of neural network output (v) with respect to y
v_t = tape.gradient(v, ns)[:, 2] # first derivative of neural network output (v) with respect to t
p_x = tape.gradient(p, ns)[:, 0] # first derivative of neural network output (p) with respect to x
p_y = tape.gradient(p, ns)[:, 1] # first derivative of neural network output (p) with respect to y
u_xx = tape.gradient(u_x, ns)[:, 0] # second derivative of neural network output (u) with respect to x
u_yy = tape.gradient(u_y, ns)[:, 1] # second derivative of neural network output (u) with respect to y
v_xx = tape.gradient(v_x, ns)[:, 0] # second derivative of neural network output (v) with respect to x
v_yy = tape.gradient(v_y, ns)[:, 1] # second derivative of neural network output (v) with respect to y
# define the equations to solve
x_mom = u_t + (u * u_x) + (v * u_y) + ((1 / rho) * p_x) - (nu * (u_xx + u_yy)) # x momentum
y_mom = v_t + (u * v_x) + (v * v_y) + ((1 / rho) * p_y) - (nu * (v_xx + v_yy)) # y momentum
cont = u_x + v_y # continuity
eq_residual = x_mom + y_mom + cont # combined residuals
# boundary conditions for x momentum
u_left_bc = u[ns[:, 0] == 0] - Uinf # left boundary condition: u = Uinf (free stream velocity)
u_right_bc = u_x[ns[:, 0] == L] - 0 # right boundary condition: du/dx = 0
u_top_bc = u[ns[:, 1] == D] - 0 # top boundary condition: u = 0
u_bottom_bc = u[ns[:, 1] == 0] - 0 # bottom boundary condition: u = 0
# boundary conditions for y momentum
v_left_bc = v[ns[:, 0] == 0] - 0 # left boundary condition: v = 0
v_right_bc = v_x[ns[:, 0] == L] - 0 # right boundary condition: dv/dx = 0
v_top_bc = v[ns[:, 1] == D] - 0 # top boundary condition: v = 0
v_bottom_bc = v[ns[:, 1] == 0] - 0 # bottom boundary condition: v = 0
# boundary conditions for pressure
p_left_bc = p_x[ns[:, 0] == 0] - 0 # left boundary condition: dp/dx = 0
p_right_bc = p_x[ns[:, 0] == L] - 0 # right boundary condition: dp/dx = 0
p_top_bc = p_y[ns[:, 1] == D] - 0 # top boundary condition: dp/dy = 0
p_bottom_bc = p_y[ns[:, 1] == 0] - 0 # bottom boundary condition: dp/dy = 0
# initial conditions
i_c_u = u[ns[:, 2] == 0] - 0 # initial condition u = 0
i_c_v = v[ns[:, 2] == 0] - 0 # initial condition v = 0
i_c_p = p[ns[:, 2] == 0] - 0 # initial condition p = 0
return eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, v_left_bc, v_right_bc,\
v_top_bc, v_bottom_bc, p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, i_c_u, i_c_v, i_c_p
#%% create a custom loss function
def navier_stokes_loss(ns):
ns = tf.convert_to_tensor(ns, dtype=tf.float32) # convert ns to TensorFlow tensor
eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \
v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \
p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, \
i_c_u, i_c_v, i_c_p = navier_stokes_residual(ns) # compute residuals for Navier-Stokes equations with specified boundary and initial conditions
mse_loss = tf.reduce_mean(tf.square(eq_residual)) # compute mean squared error of the Navier-Stokes equation residuals
u_bc_loss = tf.reduce_mean(tf.square(u_left_bc)) + tf.reduce_mean(tf.square(u_right_bc)) \
+ tf.reduce_mean(tf.square(u_top_bc)) + tf.reduce_mean(tf.square(u_bottom_bc)) # compute mean squared error of the u boundary conditions
v_bc_loss = tf.reduce_mean(tf.square(v_left_bc)) + tf.reduce_mean(tf.square(v_right_bc)) \
+ tf.reduce_mean(tf.square(v_top_bc)) + tf.reduce_mean(tf.square(v_bottom_bc)) # compute mean squared error of the v boundary conditions
p_bc_loss = tf.reduce_mean(tf.square(p_left_bc)) + tf.reduce_mean(tf.square(p_right_bc)) \
+ tf.reduce_mean(tf.square(p_top_bc)) + tf.reduce_mean(tf.square(p_bottom_bc)) # compute mean squared error of the p boundary conditions
u_ic_loss = tf.reduce_mean(tf.square(i_c_u)) # compute mean squared error of u initial condition
v_ic_loss = tf.reduce_mean(tf.square(i_c_v)) # compute mean squared error of v initial condition
p_ic_loss = tf.reduce_mean(tf.square(i_c_p)) # compute mean squared error of p initial condition
u_loss = u_ic_loss + u_bc_loss # total loss u momentum initial and boundary conditions
v_loss = v_ic_loss + v_bc_loss # total loss v momentum initial and boundary conditions
p_loss = p_ic_loss + p_bc_loss # total loss pressure initial and boundary conditions
weight_mse = 1 # weight for MSE
weight_u = 1 # weight for u
weight_v = 1 # weight for v
weight_p = 1 # weight for p
total_loss = weight_u * u_loss + weight_v * v_loss + weight_p * p_loss + weight_mse * mse_loss
return total_loss
#%% function to load and save model and optimizer states (for resume)
def save_checkpoint():
def load_checkpoint():
#%% set same seeds for initialization
np.random.seed(0) # same random initializations every time numpy
rn.seed(0) # same random initializations every time for python
tf.random.set_seed(0) # same random initializations every time for TensorFlow
#%% generate sample points (grid)
nu = 0.01 # kinematic viscosity
rho = 1 # density
Uinf = 1 # free stream velocity
l_square = 1 # length for Reynolds number calculator
Re = l_square * Uinf / nu # Reynolds number
travel = 0.5 # times the disturbance travels entire length of computational domain
L = 1 # length of domain
D = 1 # width of domain
TT = travel * L / Uinf # total time
Nx = 20 # number of points in x-axis
Ny = 20 # number of points in y-axis
Nt = 20 # number of steps in time
x = np.linspace(0, L, Nx) # initialize x
y = np.linspace(0, D, Ny) # initialize y
t = np.linspace(0, TT, Nt) # initialize t
# visualize space and time meshes
# X, Y = np.meshgrid(x, y)
# pcm = plt.pcolormesh(X, Y, np.zeros_like(X), edgecolors='k', linewidth=0.5, cmap='jet', shading='auto', alpha=1)
# plt.plot(t, t, 'o')
#%% define the physics informed neural network structure
model = keras.Sequential([
preprocessing.Normalization(input_shape=(3,)), # normalize inputs (2 for x, y; 1 for t)
layers.Dense(20, activation='tanh', input_shape=(3, ), kernel_initializer='GlorotNormal'), # hidden layer
layers.Dense(20, activation='tanh'), # hidden layer
layers.Dense(20, activation='tanh'), # hidden layer
layers.Dense(3, activation=None) # output layer
#%% Neural network parameters
train_points_x, train_points_y, train_points_t = np.meshgrid(x, y, t) # create combinations of sample points along the domain
train_points = np.vstack((train_points_x.flatten(), train_points_y.flatten(), train_points_t.flatten())).T # flatten the grid to feed to neural network
model.layers[0].adapt(train_points) # set up the normalization layer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) # specify learning rate and the optimizer
checkpoint_dir = './checkpoints' # check point directory (same as script)
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) # checkpoint to save model and optimizer states
resume_training = False # set to True if resume is required
start_epoch = 1
if resume_training:
load_checkpoint() # load the latest checkpoint to resume training
start_epoch = int(tf.train.latest_checkpoint(checkpoint_dir).\
split('-')[-1]) # determine the starting epoch based on the loaded checkpoint
max_epochs = 20000 # maximum iterations
epoch = 0 # iteration counter
loss_value = float('inf') # initial loss
min_loss = 1e-30 # minimum loss (any acceptable value)
#%% Train the neural network
while epoch <= max_epochs and loss_value >= min_loss:
with tf.GradientTape() as tape: # use tf.GradientTape to record operations for automatic differentiation
loss_value = navier_stokes_loss(train_points) # compute total loss by evaluating Navier-Stokes loss function on training points
gradients = tape.gradient(loss_value, model.trainable_variables) # calculate gradients of total loss with respect to trainable variables of model
optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # update model trainable variables using calculated gradients and specified optimizer
epoch += 1 # increases iteration counter by 1
if epoch % 100 == 0:
print(f"Epoch {epoch}, Loss: {loss_value.numpy()}") # output loss while training
save_checkpoint() # save training progress for resume
#%% predicting and plotting
target_time_step = TT # in seconds
x_t = np.linspace(0, L, 50) # initialize x
y_t = np.linspace(0, D, 50) # initialize y
test_points_x, test_points_y = np.meshgrid(x_t, y_t) # create pairs of sample points along the domain (for testing)
test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # flatten the grid to feed to neural network model
test_points_t = np.column_stack([test_points, np.full_like(test_points[:, 0], target_time_step)]) # generate test points for the target time step
# predict velocity components and pressure at target time step
u_predicted_t = model.predict(test_points_t)[:, 0] # predict u velocity
v_predicted_t = model.predict(test_points_t)[:, 1] # predict v velocity
p_predicted_t = model.predict(test_points_t)[:, 2] # predict pressure
velocity_magnitude_t = np.sqrt(u_predicted_t**2 + v_predicted_t**2) # calculate velocity magnitude
# plotting
plt.figure(dpi=600) # make nice crisp image :)
plt.contourf(test_points_x, test_points_y, velocity_magnitude_t.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot
# plt.streamplot(test_points_x, test_points_y, u_predicted_t.reshape(test_points_x.shape),\
# v_predicted_t.reshape(test_points_x.shape), color='black', cmap='jet') # plot streamlines
plt.colorbar(label="Velocity Magnitude", orientation='horizontal')
plt.xlabel('Length [m]')
plt.ylabel('Width [m]')
plt.title(f'Velocity Magnitude Prediction at Time {target_time_step}s using PINN for Transient Navier-Stokes')
plt.clim(0, 1)
Fig. 1, PINN solution progression |
Cite as: Fahad Butt (2024). NS-PINN (, Blogger.
If you still can't understand, open a book ๐? Thank you for reading! If you want to hide me as your next PhD student, please reach out!
