Here is a simple code I for Data-less ๐ Physics Informed Neural ๐ง Network ๐ธ️ to solve the steady-state 2D incompressible Navier-Stokes equations in Python using PyTorch. Already validated results will be uploaded here in good time. The code has all the comments ๐จ๐ซ you could possibly need. If you have questions, well... ๐น
Verdict, so far: FVM is King ๐, still.
Cite as: Fahad Butt (2024). NS-PINN (https://whenrobotslove.blogspot.com/2024/03/physics-informed-neural-network-code.html), Blogger. Retrieved Month Date, Year
Copyright (c) 2024 Fahad Butt
The Code:
#%% import necessary libraries
import os
import numpy as np
import random as rn
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import glob
device = torch.device("cuda") # cuda for gpu, cpu for cpu
#%% the 2D navier-stokes equations and boundary conditions
def navier_stokes_equation(ns):
ns.requires_grad_(True) # watch ns for gradient computation
u = model(ns)[:, 0] # x velocity
v = model(ns)[:, 1] # y velocity
p = model(ns)[:, 2] # pressure
du = torch.autograd.grad(u, ns, torch.ones_like(u), create_graph=True)[0] # du/dns
dv = torch.autograd.grad(v, ns, torch.ones_like(v), create_graph=True)[0] # dv/dns
dp = torch.autograd.grad(p, ns, torch.ones_like(p), create_graph=True)[0] # dp/dns
u_x = du[:, 0] # du/dx
u_y = du[:, 1] # du/dy
v_x = dv[:, 0] # dv/dx
v_y = dv[:, 1] # dv/dy
p_x = dp[:, 0] # dp/dx
p_y = dp[:, 1] # dp/dy
u_xx = torch.autograd.grad(u_x, ns, torch.ones_like(u_x), create_graph=True)[0][:, 0] # d2u/dx2
u_yy = torch.autograd.grad(u_y, ns, torch.ones_like(u_y), create_graph=True)[0][:, 1] # d2u/dy2
v_xx = torch.autograd.grad(v_x, ns, torch.ones_like(v_x), create_graph=True)[0][:, 0] # d2v/dx2
v_yy = torch.autograd.grad(v_y, ns, torch.ones_like(v_y), create_graph=True)[0][:, 1] # d2v/dy2
x_mom = (u * u_x) + (v * u_y) + p_x - ((1 / Re) * (u_xx + u_yy)) # x momentum
y_mom = (u * v_x) + (v * v_y) + p_y - ((1 / Re) * (v_xx + v_yy)) # y momentum
cont = u_x + v_y # continuity
# apply boundary conditions
u_left_bc = u[ns[:, 0] == -0.5] - Uinf # x = 0
u_right_bc = u_x[ns[:, 0] == 0.5] # x = L
u_top_bc = u[ns[:, 1] == 0.5] # y = D
u_bottom_bc = u[ns[:, 1] == -0.5] # y = 0
v_left_bc = v[ns[:, 0] == -0.5] # x = 0
v_right_bc = v_x[ns[:, 0] == 0.5] # x = L
v_top_bc = v[ns[:, 1] == 0.5] # y = D
v_bottom_bc = v[ns[:, 1] == -0.5] # y = 0
p_left_bc = p_x[ns[:, 0] == -0.5] # x = 0
p_right_bc = p[ns[:, 0] == 0.5] # x = L
p_top_bc = p_y[ns[:, 1] == 0.5] # y = D
p_bottom_bc = p_y[ns[:, 1] == -0.5] # y = 0
return x_mom, y_mom, cont, \
u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \
v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \
p_left_bc, p_right_bc, p_top_bc, p_bottom_bc
#%% define the loss function
def loss_fn(ns):
global x_mom_loss, y_mom_loss, cont_loss, u_bc_loss, v_bc_loss, p_bc_loss
ns = ns.clone().detach().requires_grad_(True).to(device) # Move to GPU
x_mom, y_mom, cont,\
u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \
v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \
p_left_bc, p_right_bc, p_top_bc, p_bottom_bc = navier_stokes_equation(ns)
x_mom_loss = nn.MSELoss()(x_mom, torch.zeros_like(x_mom)) # x momentum loss
y_mom_loss = nn.MSELoss()(y_mom, torch.zeros_like(y_mom)) # y momentum loss
cont_loss = nn.MSELoss()(cont, torch.zeros_like(cont)) # continuity loss
u_bc_loss = nn.MSELoss()(u_left_bc, torch.zeros_like(u_left_bc)) + nn.MSELoss()(u_right_bc, torch.zeros_like(u_right_bc)) \
+ nn.MSELoss()(u_top_bc, torch.zeros_like(u_top_bc)) + nn.MSELoss()(u_bottom_bc, torch.zeros_like(u_bottom_bc)) # u momentum boundary condition loss
v_bc_loss = nn.MSELoss()(v_left_bc, torch.zeros_like(v_left_bc)) + nn.MSELoss()(v_right_bc, torch.zeros_like(v_right_bc)) \
+ nn.MSELoss()(v_top_bc, torch.zeros_like(v_top_bc)) + nn.MSELoss()(v_bottom_bc, torch.zeros_like(v_bottom_bc)) # v momentum boundary condition loss
p_bc_loss = nn.MSELoss()(p_left_bc, torch.zeros_like(p_left_bc)) + nn.MSELoss()(p_right_bc, torch.zeros_like(p_right_bc)) \
+ nn.MSELoss()(p_top_bc, torch.zeros_like(p_top_bc)) + nn.MSELoss()(p_bottom_bc, torch.zeros_like(p_bottom_bc)) # continuity boundary condition loss
bc_loss = u_bc_loss + v_bc_loss + p_bc_loss # total boundary condition loss
mse_loss = x_mom_loss + y_mom_loss + cont_loss # total equation loss
total_loss = mse_loss + bc_loss # total loss
return total_loss
#%% calculate loss, compute gradients, and print loss for each LBFGS optimization step
def closure():
optimizer.zero_grad() # clear gradients
loss = loss_fn(train_points) # compute the loss
loss.backward() # compute gradients
return loss
#%% save and resume training
def save_checkpoint(epoch, model, optimizer): # save check point
checkpoint = {
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict()}
checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")
torch.save(checkpoint, checkpoint_path)
print(f"Checkpoint saved at epoch {epoch}")
def load_checkpoint(model, optimizer): # load check point
latest_checkpoint = max(glob.glob(os.path.join(checkpoint_dir, 'checkpoint_epoch_*.pt')), key=os.path.getctime)
checkpoint = torch.load(latest_checkpoint)
epoch = checkpoint['epoch']
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
print(f"Checkpoint loaded from epoch {epoch}")
return model, optimizer, epoch
#%% show model summary
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad) # show trainable parameters
#%% set same seeds for initialization
np.random.seed(20) # same random initializations every time numpy
rn.seed(20) # same random initializations every time for python
torch.manual_seed(20) # set seed for reproducibility
#%% generate sample points (grid)
Uinf = 4/3 # free stream velocity
nu = 0.02733333333333333333333333333333 # kinematic viscosity
rho = 1 # density
L = 2.1 # Length of the plate
D = 0.41 # Width of the plate
Re = round(rho * Uinf * D / nu) # Reynolds number
num_training_points = 625 # per meter
# Generate points for boundaries
x_l = np.random.uniform(0, 0, num_training_points) # left boundary
y_l = np.random.uniform(0, D, num_training_points)
x_r = np.random.uniform(L, L, num_training_points) # right boundary
y_r = np.random.uniform(0, D, num_training_points)
x_b = np.random.uniform(0, L, num_training_points) # bottom boundary
y_b = np.random.uniform(0, 0, num_training_points)
x_t = np.random.uniform(0, L, num_training_points) # top boundary
y_t = np.random.uniform(D, D, num_training_points)
x_m = np.random.uniform(0.002, L - 0.002, 10000) # points inside domain
y_m = np.random.uniform(0.002, D - 0.002, 10000) # points inside domain
x = np.concatenate([x_l, x_r, x_b, x_t, x_m]) # x co-ordinates
y = np.concatenate([y_l, y_r, y_b, y_t, y_m]) # y co-ordinates
x = (x / L) - 0.5 # normalize x
y = (y / D) - 0.5 # normalize y
#%% display mesh
plt.figure(dpi = 300)
plt.scatter(x, y, s = 0.1, c = 'k', alpha = 1) # scatter plot
plt.xlim(0, L)
plt.ylim(0, D)
plt.grid(True)
plt.axis('equal')
plt.grid(False)
plt.show()
#%% create neural network model
model = nn.Sequential(
nn.Linear(2, 40), # 2 inputs for x and y
nn.Tanh(), # hidden layer
nn.Linear(40, 40),
nn.Tanh(), # hidden layer
nn.Linear(40, 40),
nn.Tanh(), # hidden layer
nn.Linear(40, 40),
nn.Tanh(), # hidden layer
nn.Linear(40, 40),
nn.Tanh(), # hidden layer
nn.Linear(40, 40),
nn.Tanh(), # hidden layer
nn.Linear(40, 3)).to(device) # 3 outputs for u, v, p
for m in model.modules():
if isinstance(m, nn.Linear):
init.xavier_normal_(m.weight) # weights initialization
init.constant_(m.bias, 0.0) # bias initialization
print(model) # show model details
num_trainable_params = count_parameters(model) # show trainable parameters
print("Number of trainable parameters:", num_trainable_params)
#%% prepare input data
train_points = np.vstack((x, y)).T # stack x, y together
train_points = torch.tensor(train_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU
#%% train using Adam
resume_training = False # True if resuming training
start_epoch = 1
optimizer = optim.Adam(model.parameters(), lr = 0.001) # select optimizer and parameters
checkpoint_dir = './pytorch_checkpoints' # directory to save checkpoints
os.makedirs(checkpoint_dir, exist_ok=True) # create a directory to save checkpoints
if resume_training:
model, optimizer, start_epoch = load_checkpoint(model, optimizer)
start_epoch += 1 # start from the next epoch
for epoch in range(5000):
optimizer.zero_grad() # clear the gradients of all optimized variables
loss_value = loss_fn(train_points) # compute prediction by passing inputs to the model
loss_value.backward() # compute gradient of loss with respect to model parameters
optimizer.step() # perform parameter up date
if epoch % 1000 == 0:
print(f"{epoch} {loss_value.item()}")
# print(f"Adam Epoch {epoch}")
save_checkpoint(epoch, model, optimizer) # save model details
print(f"{epoch} {loss_value.item()}")
#%% train using LBFGS optimizer
optimizer = optim.LBFGS(model.parameters(), line_search_fn = 'strong_wolfe') # LBFGS optimizer
resume_training = False # True if resuming training
start_epoch = 0
checkpoint_dir = './pytorch_checkpoints' # directory to save checkpoints
os.makedirs(checkpoint_dir, exist_ok=True) # create a directory to save checkpoints
if resume_training:
model, optimizer, start_epoch = load_checkpoint(model, optimizer)
start_epoch += 1 # start from the next epoch
for epoch in range(20):
optimizer.step(closure) # perform parameter up date
if epoch % 5 == 0:
print(f"Loss: {closure().item()}")
save_checkpoint(epoch, model, optimizer)
print(f"{epoch} {closure().item()}")
#%% prediction
x_t = np.linspace(0, L, 20) # initialize x
y_t = np.linspace(0, D, 20) # initialize y
x_t = (x_t / L) - 0.5 # normalize test data x-axis
y_t = (y_t / D) - 0.5 # normalize test data y-axis
# test_points = np.vstack((x_t, y_t)).T # stack x, y together
test_points_x, test_points_y = np.meshgrid(x_t, y_t) # create combinations of sample points along the domain
test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # stack x, y together
test_points = torch.tensor(test_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU
predict = model(test_points).detach().cpu().numpy() # predict and move back to CPU
u = predict[:, 0] * Uinf # predict and bring back to dimensional form
v = predict[:, 1] * Uinf
p = predict[:, 2] * Uinf * Uinf * rho
velocity_magnitude = np.sqrt(u**2 + v**2) # calculate velocity magnitude
# plotting
plt.figure(dpi = 300) # make a nice crisp image :)
plt.contourf(test_points_x * L, test_points_y * D, u.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot
# plt.clim(0, 1)
plt.colorbar(orientation='vertical')
plt.axis('equal')
plt.show()
# plotting
plt.figure(dpi = 300) # make a nice crisp image :)
plt.contourf(test_points_x * L, test_points_y * D, v.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot
# plt.clim(0, 1)
plt.colorbar(orientation='vertical')
plt.axis('equal')
plt.show()
# plotting
plt.figure(dpi = 300) # make a nice crisp image :)
plt.contourf(test_points_x * L, test_points_y * D, p.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot
plt.colorbar(orientation='vertical')
plt.axis('equal')
plt.show()
# plotting
plt.figure(dpi = 300) # make a nice crisp image :)
plt.streamplot(test_points_x * L, test_points_y * D, u.reshape(test_points_x.shape),\
v.reshape(test_points_x.shape), color = 'black', cmap = 'jet', density = 1, linewidth = 0.5,\
arrowstyle='->', arrowsize = 1) # plot streamlines
plt.colorbar(orientation='vertical')
plt.axis('equal')
plt.show()
The boundary conditions are taken from [1]. The results presented in Fig. 1 are from flow between two parallel flat plates. For these boundary conditions, analytical solutions to the Navier-Stokes equations are available. As this is "just another blog" ๐; just by eye-balling ๐, the results look very promising. I mean from Fig. 1, the velocities at left side are captured well-ish!
Fig. 1, Post processing |
Thank you for reading! If you want to hire me as your next PhD researcher, reach out for collaboration in research, please do so!
References
[1] Cahya Amalinadhi, Pramudita S. Palar, Rafael Stevenson and Lavi Zuhal. "On Physics-Informed Deep Learning for Solving Navier-Stokes Equations," AIAA 2022-1436. AIAA SCITECH 2022 Forum. January 2022. https://doi.org/10.2514/6.2022-1436