SolidWorks Videos

Wednesday, March 27, 2024

Teaching the Navier-Stokes Equations to Sand: Data-Less Physics Informed Neural Network (Includes PyTorch Code, Validated)

     Here is a simple code I wrote for Data-less 📈 Physics Informed Neural 🧠 Network 🕸️ to solve the steady-state 2D incompressible Navier-Stokes equations in Python using PyTorch. Already validated results will be uploaded here in good time. The code has all the comments 👨‍🏫 you could possibly need. If you have questions, well... 👹

     Verdict, so far: FVM is King 👑, well.

     Cite as: Fahad Butt (2024). NS-PINN (https://whenrobotslove.blogspot.com/2024/03/physics-informed-neural-network-code.html), Blogger. Retrieved Month Date, Year

     Please note that a properly cooled 🧊 GPU is recommended for running this code. If your CPU melts 🔥, don't blame me 😤. If you classify yourself as a poor peasant 👨‍🌾, use: device = torch.device("cpu") # cuda for gpu, cpu for cpu 😝.

The Code:

#%% licence

#Copyright <2024> <Fahad Butt>

# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


#%% import necessary libraries

import os

import numpy as np

import random as rn

import matplotlib.pyplot as plt

import torch

import torch.nn as nn

import torch.nn.init as init

import torch.optim as optim

# import torch_directml # enable for any gpu except CUDA gpu

import glob

device = torch.device("cpu") # cuda for CUDA gpu, cpu for cpu, torch_directml.device() for any other gpu


#%% 2D navier-stokes equations and boundary conditions

def navier_stokes_equation(ns):

    ns.requires_grad_(True) # watch ns for gradient computation

    

    out = model(ns) # output from model

    

    u = out[:, 0] # x velocity

    v = out[:, 1] # y velocity

    p = out[:, 2] # pressure

    

    du = torch.autograd.grad(u, ns, torch.ones_like(u), create_graph=True)[0] # du/dns

    dv = torch.autograd.grad(v, ns, torch.ones_like(v), create_graph=True)[0] # dv/dns

    dp = torch.autograd.grad(p, ns, torch.ones_like(p), create_graph=True)[0] # dp/dns

    

    u_x = du[:, 0] # du/dx

    u_y = du[:, 1] # du/dy

    v_x = dv[:, 0] # dv/dx

    v_y = dv[:, 1] # dv/dy

    p_x = dp[:, 0] # dp/dx

    p_y = dp[:, 1] # dp/dy

    

    u_xx = torch.autograd.grad(u_x, ns, torch.ones_like(u_x), create_graph=True)[0][:, 0] # d2u/dx2

    u_yy = torch.autograd.grad(u_y, ns, torch.ones_like(u_y), create_graph=True)[0][:, 1] # d2u/dy2

    v_xx = torch.autograd.grad(v_x, ns, torch.ones_like(v_x), create_graph=True)[0][:, 0] # d2v/dx2

    v_yy = torch.autograd.grad(v_y, ns, torch.ones_like(v_y), create_graph=True)[0][:, 1] # d2v/dy2

    

    x_mom = (u * u_x) + (v * u_y) + p_x - ((1 / Re) * (u_xx + u_yy)) # x momentum

    y_mom = (u * v_x) + (v * v_y) + p_y - ((1 / Re) * (v_xx + v_yy)) # y momentum

    cont = u_x + v_y # continuity

     

    # apply boundary conditions

    u_left_bc = u[ns[:, 0] == 0] - Uinf # x = 0

    u_right_bc = u_x[ns[:, 0] == L] # x = L

    u_top_bc = u[ns[:, 1] == D] # y = D

    u_bottom_bc = u[ns[:, 1] == 0] # y = 0


    v_left_bc = v[ns[:, 0] == 0] # x = 0

    v_right_bc = v_x[ns[:, 0] == L] # x = L

    v_top_bc = v[ns[:, 1] == D] # y = D

    v_bottom_bc = v[ns[:, 1] == 0] # y = 0

    

    p_left_bc = p_x[ns[:, 0] == 0] # x = 0

    p_right_bc = p[ns[:, 0] == L] # x = L

    p_top_bc = p_y[ns[:, 1] == D] # y = D

    p_bottom_bc = p_y[ns[:, 1] == 0] # y = 0


    return x_mom, y_mom, cont, \

        u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \

            v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \

                p_left_bc, p_right_bc, p_top_bc, p_bottom_bc


#%% define loss function

def loss_fn(ns):

    global x_mom_loss, y_mom_loss, cont_loss, u_bc_loss, v_bc_loss, p_bc_loss, mse_loss, bc_loss

    

    ns = ns.clone().detach().requires_grad_(True).to(device)  # move to "device" and use x32 data type

    

    x_mom, y_mom, cont, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc,\

        v_left_bc, v_right_bc, v_top_bc, v_bottom_bc,\

            p_left_bc, p_right_bc, p_top_bc, p_bottom_bc = navier_stokes_equation(ns) # compute residuals for Navier-Stokes equations with specified boundary and initial conditions

    

    x_mom_loss = nn.MSELoss()(x_mom, torch.zeros_like(x_mom)) # x momentum loss

    y_mom_loss = nn.MSELoss()(y_mom, torch.zeros_like(y_mom)) # y momentum loss

    cont_loss = nn.MSELoss()(cont, torch.zeros_like(cont)) # continuity loss

    

    u_bc_loss = nn.MSELoss()(u_left_bc, torch.zeros_like(u_left_bc)) + nn.MSELoss()(u_right_bc, torch.zeros_like(u_right_bc)) \

        + nn.MSELoss()(u_top_bc, torch.zeros_like(u_top_bc)) + nn.MSELoss()(u_bottom_bc, torch.zeros_like(u_bottom_bc)) # x momentum boundary conditions loss

            

    v_bc_loss = nn.MSELoss()(v_left_bc, torch.zeros_like(v_left_bc)) + nn.MSELoss()(v_right_bc, torch.zeros_like(v_right_bc)) \

        + nn.MSELoss()(v_top_bc, torch.zeros_like(v_top_bc)) + nn.MSELoss()(v_bottom_bc, torch.zeros_like(v_bottom_bc)) # y momentum boundary conditions loss

            

    p_bc_loss = nn.MSELoss()(p_left_bc, torch.zeros_like(p_left_bc)) + nn.MSELoss()(p_right_bc, torch.zeros_like(p_right_bc)) \

        + nn.MSELoss()(p_top_bc, torch.zeros_like(p_top_bc)) + nn.MSELoss()(p_bottom_bc, torch.zeros_like(p_bottom_bc)) # pressure boundary conditions loss

            

    bc_loss = (u_bc_loss + v_bc_loss + p_bc_loss) / 1000 # total boundary condition loss

    mse_loss = (x_mom_loss + y_mom_loss + cont_loss) / 1000 # total equation loss

    

    total_loss =  mse_loss + bc_loss # total loss

            

    return total_loss


#%% save and resume training

def save_checkpoint(epoch, model, optimizer): # save check point

    checkpoint = {

        'epoch': epoch,

        'model_state_dict': model.state_dict(),

        'optimizer_state_dict': optimizer.state_dict()}

    checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")

    torch.save(checkpoint, checkpoint_path)

    print(f"Checkpoint saved at epoch {epoch}")


def load_checkpoint(model, optimizer): # load check point

    latest_checkpoint = max(glob.glob(os.path.join(checkpoint_dir, 'checkpoint_epoch_*.pt')), key=os.path.getctime)

    checkpoint = torch.load(latest_checkpoint)

    epoch = checkpoint['epoch']

    model.load_state_dict(checkpoint['model_state_dict'])

    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    print(f"Checkpoint loaded from epoch {epoch}")

    return model, optimizer, epoch


#%% show model summary

def count_parameters(model):

    return sum(p1.numel() for p1 in model.parameters() if p1.requires_grad) # show trainable parameters


#%% set same seeds for initialization

np.random.seed(20) # same random initializations every time numpy

rn.seed(20) # same random initializations every time for python 

torch.manual_seed(20) # set seed for reproducibility


#%% generate sample points (grid)

Uinf = 1 # free stream velocity

nu = 0.1 # kinematic viscosity

rho = 1 # density

length_square = 1 # length of square 

L = round(5 * length_square) # Length of plate

D = round(1 * length_square) # Width of plate

Re = round(rho * Uinf * length_square / nu) # Reynolds number


num_training_points = 250 # per length


# Generate points for boundaries

x_l = np.random.uniform(0, 0, num_training_points) # left boundary

y_l = np.random.uniform(0, D, num_training_points)


x_r = np.random.uniform(L, L, num_training_points) # right boundary

y_r = np.random.uniform(0, D, num_training_points)


x_b = np.random.uniform(0, L, num_training_points) # bottom boundary

y_b = np.random.uniform(0, 0, num_training_points)


x_t = np.random.uniform(0, L, num_training_points) # top boundary

y_t = np.random.uniform(D, D, num_training_points)


x_m = np.random.uniform(0, L, 1000) # mid field

y_m = np.random.uniform(0, D, 1000)


x = np.concatenate([x_l, x_r, x_b, x_t, x_m]) # x co-ordinates

y = np.concatenate([y_l, y_r, y_b, y_t, y_m]) # y co-ordinates


#%% display mesh

plt.figure(dpi = 300)

plt.scatter(x, y, s = 0.1, c = 'k', alpha = 1) # scatter plot

plt.xlim(0, L)

plt.ylim(0, D)

plt.grid(True)

plt.axis('equal')

plt.grid(False)

plt.show()


#%% create neural network model

h = 50 # number of neurons per hidden layer

model = nn.Sequential(

    nn.Linear(2, h), # 2 inputs for x and y

    nn.Tanh(), # hidden layer

    nn.Linear(h, h),

    nn.Tanh(), # hidden layer

    nn.Linear(h, h),

    nn.Tanh(), # hidden layer

    nn.Linear(h, h),

    nn.Tanh(), # hidden layer

    nn.Linear(h, h),

    nn.Tanh(), # hidden layer

    nn.Linear(h, 3)).to(device) # 3 outputs for u, v, p


for m in model.modules():

    if isinstance(m, nn.Linear):

        init.xavier_normal_(m.weight) # weights initialization

        init.constant_(m.bias, 0.0) # bias initialization


print(model) # show model details

num_trainable_params = count_parameters(model) # show trainable parameters

print("Number of trainable parameters:", num_trainable_params)


#%% prepare input data

train_points = np.vstack((x, y)).T # stack x, y together

train_points = torch.tensor(train_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU, use float64 if mroe precision is required


#%% train using Adam

resume_training = False # True if resuming training

start_epoch = 1

optimizer = optim.Adam(model.parameters(), lr = 0.00001) # select optimizer and parameters


checkpoint_dir = './pytorch_checkpoints' # directory to save checkpoints

os.makedirs(checkpoint_dir, exist_ok=True) # create a directory to save checkpoints

if resume_training:

    model, optimizer, start_epoch = load_checkpoint(model, optimizer)

    start_epoch += 1 # start from next epoch

    

for epoch in range(10001):

    optimizer.zero_grad() # clear gradients of all optimized variables

    loss_value = loss_fn(train_points) # compute prediction by passing inputs to model

    loss_value.backward() # compute gradient of loss with respect to model parameters

    optimizer.step() # perform parameter up date

    if epoch % 100 == 0:

        print(f"{epoch} {loss_value.item()}")


save_checkpoint(epoch, model, optimizer) # save model details

print(f"{epoch} {loss_value.item()}")

print("Final Loss Values:")

print("x_mom_loss:", x_mom_loss.item())

print("y_mom_loss:", y_mom_loss.item())

print("cont_loss:", cont_loss.item())

print("u_bc_loss:", u_bc_loss.item())

print("v_bc_loss:", v_bc_loss.item())

print("p_bc_loss:", p_bc_loss.item())

print("mse_loss:", mse_loss.item())

print("bc_loss:", bc_loss.item())


#%% prediction

num_testing_points = 2500 # per length


# Generate points for boundaries

x_l_te = np.random.uniform(0, 0, num_testing_points) # left boundary

y_l_te = np.random.uniform(0, D, num_testing_points)


x_r_te = np.random.uniform(L, L, num_testing_points) # right boundary

y_r_te = np.random.uniform(0, D, num_testing_points)


x_b_te = np.random.uniform(0, L, num_testing_points) # bottom boundary

y_b_te = np.random.uniform(0, 0, num_testing_points)


x_t_te = np.random.uniform(0, L, num_testing_points) # top boundary

y_t_te = np.random.uniform(D, D, num_testing_points)


x_m_te = np.random.uniform(0, L, 50000) # mid field

y_m_te = np.random.uniform(0, D, 50000)


x_te = np.concatenate([x_l_te, x_r_te, x_b_te, x_t_te, x_m_te]) # x co-ordinates

y_te = np.concatenate([y_l_te, y_r_te, y_b_te, y_t_te, y_m_te]) # y co-ordinates


test_points = np.vstack((x_te, y_te)).T # stack x, y together

test_points = torch.tensor(test_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU, use float64 if mroe precision is required


predict = model(test_points).detach().cpu().numpy() # predict and move back to CPU


u = predict[:, 0] * Uinf # predict and bring back to dimensional form

v = predict[:, 1] * Uinf 

p = predict[:, 2] * Uinf * Uinf * rho

velocity_magnitude = np.sqrt(u**2 + v**2)  # calculate velocity magnitude


# plotting

plt.figure(dpi = 300)

sc = plt.scatter(x_te, y_te, c = u, cmap = 'jet', s = 1, edgecolor = 'none', alpha = 1)

plt.colorbar(orientation = 'vertical')

plt.gca().set_aspect('equal', adjustable = 'box')

plt.xticks([0, L])

plt.yticks([0, D])

plt.xlabel('x [m]')

plt.ylabel('y [m]')

plt.axis('off')

plt.show()


plt.figure(dpi = 300)

sc = plt.scatter(x_te, y_te, c = v, cmap = 'jet', s = 1, edgecolor = 'none', alpha = 1)

plt.colorbar(orientation = 'vertical')

plt.gca().set_aspect('equal', adjustable = 'box')

plt.xticks([0, L])

plt.yticks([0, D])

plt.xlabel('x [m]')

plt.ylabel('y [m]')

plt.axis('off')

plt.show()


plt.figure(dpi = 300)

sc = plt.scatter(x_te, y_te, c = p, cmap = 'jet', s = 1, edgecolor = 'none', alpha = 1)

plt.colorbar(orientation = 'vertical')

plt.gca().set_aspect('equal', adjustable = 'box')

plt.xticks([0, L])

plt.yticks([0, D])

plt.xlabel('x [m]')

plt.ylabel('y [m]')

plt.axis('off')

plt.show()

Flow between Flat - Plates

     The boundary conditions are taken from [2]. The results presented in Fig. 1 are from flow between two parallel flat plates. For these boundary conditions, analytical solutions to the Navier-Stokes equations are available. As this is "just another blog" 😕; just by eye-balling 😆, the results look very promising. I mean from Fig. 1, the velocities at left side are captured well-ish!

Fig. 1, post processing

Lid-Driven Cavity

     The case of flow between two flat plates shown in Fig. 1 is self-validating as area times velocity should be same for inlet and outlet, which it is. Lid-Driven Cavity however, is a benchmark problem for many numerical methods in fluid dynamics because of complex vortex dynamics. Here I present, the results of Lid-Driven Cavity problem. The results are very good 😎. The results are presented in Fig. 2. The comparison of PINN results with [1] is shown in Fig. 3. The validation of CFD is available to be read here. While the CFD code is available here.

     

Fig. 2, Post processing

Fig. 3, A comparison

External Fluid Dynamics

Here is a video of the PINN learning flow around obstacle i.e. a square cylinder.

Fig. 4, The animation


     The PINN is now capable to predict flow around curved objects. The flow around circular cylinder is mentioned. I use polar coordinates for applying Neuman boundary conditions for pressure to satisfy the no-slip wall. The results are shown in Fig. 5. The code is also made available for fellow researchers to improve upon and use in the research, please cite properly. The results show u and v velocities at the left portion while pressure and streamlines are shown on the right side. As compared to the commercial software that shall not be named 😃, the results are very accurate. The Reynolds number is kept at 10to avoid making the loss landscape complex. Please refer to this post for more details!

Fig. 5, The post processing

Code

# Copyright <2024> <FAHAD BUTT>
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#%% import necessary libraries
import os
import numpy as np
import random as rn
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
# import torch_directml # enable for AMD and Intel gpu
import glob
from scipy.interpolate import griddata
device = torch.device("cpu") # cuda for CUDA gpu, cpu for cpu, mps for Apple GPU, torch_directml.device() for any other gpu

#%% 2D navier-stokes equations and boundary conditions
def navier_stokes_equation(ns):
    ns.requires_grad_(True) # watch ns for gradient computation
    
    out = model(ns) # output from model
    
    u = out[:, 0] # x velocity
    v = out[:, 1] # y velocity
    p = out[:, 2] # pressure
    
    du = torch.autograd.grad(u, ns, torch.ones_like(u), create_graph = True)[0] # du/dns
    dv = torch.autograd.grad(v, ns, torch.ones_like(v), create_graph = True)[0] # dv/dns
    dp = torch.autograd.grad(p, ns, torch.ones_like(p), create_graph = True)[0] # dp/dns
    
    u_x = du[:, 0] # du/dx
    u_y = du[:, 1] # du/dy
    v_x = dv[:, 0] # dv/dx
    v_y = dv[:, 1] # dv/dy
    p_x = dp[:, 0] # dp/dx
    p_y = dp[:, 1] # dp/dy
    
    u_xx = torch.autograd.grad(u_x, ns, torch.ones_like(u_x), create_graph = True)[0][:, 0] # d2u/dx2
    u_yy = torch.autograd.grad(u_y, ns, torch.ones_like(u_y), create_graph = True)[0][:, 1] # d2u/dy2
    v_xx = torch.autograd.grad(v_x, ns, torch.ones_like(v_x), create_graph = True)[0][:, 0] # d2v/dx2
    v_yy = torch.autograd.grad(v_y, ns, torch.ones_like(v_y), create_graph = True)[0][:, 1] # d2v/dy2
    
    x_mom = (u * u_x) + (v * u_y) + p_x - ((1 / Re) * (u_xx + u_yy)) # x momentum
    y_mom = (u * v_x) + (v * v_y) + p_y - ((1 / Re) * (v_xx + v_yy)) # y momentum
    cont = u_x + v_y # continuity
     
    # apply boundary conditions
    u_left_bc = u[ns[:, 0] == 0] - Uinf # x = 0
    u_right_bc = u_x[ns[:, 0] == L] # x = L
    u_top_bc = u[ns[:, 1] == D] - Uinf # y = D
    u_bottom_bc = u[ns[:, 1] == 0] - Uinf # y = 0
         
    v_left_bc = v[ns[:, 0] == 0] # x = 0
    v_right_bc = v_x[ns[:, 0] == L] # x = L
    v_top_bc = v[ns[:, 1] == D] # y = D
    v_bottom_bc = v[ns[:, 1] == 0] # y = 0
    
    p_left_bc = p_x[ns[:, 0] == 0] # x = 0
    p_right_bc = p[ns[:, 0] == L] # x = L
    p_top_bc = p_y[ns[:, 1] == D] # y = D
    p_bottom_bc = p_y[ns[:, 1] == 0] # y = 0
    
    distance = torch.sqrt((ns[:, 0] - center_x) ** 2 + (ns[:, 1] - center_y) ** 2) # mark radius of circle
    on_circle_boundary = (torch.abs(distance - radius) <= tolerance) # filter points near circle boundary
    p_r = ((p_x * (ns[:, 0] - center_x)) / radius) + ((p_y * (ns[:, 1] - center_y)) / radius) # dp/dr
    
    # apply no-slip boundary conditions on circle boundary
    u_boundary = u[on_circle_boundary] 
    v_boundary = v[on_circle_boundary]
    p_gradient_bc = p_r[on_circle_boundary]
    
    return x_mom, y_mom, cont, \
        u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \
            v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \
                p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, \
                    u_boundary, v_boundary, p_gradient_bc
                
#%% define loss function
def loss_fn(ns):
    global x_mom_loss, y_mom_loss, cont_loss, u_bc_loss, v_bc_loss, p_bc_loss, mse_loss, bc_loss
    
    ns = ns.clone().detach().requires_grad_(True).to(device) # move to "device" and use x32 data type
    
    x_mom, y_mom, cont, \
        u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \
            v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \
                p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, \
                    u_boundary, v_boundary, p_gradient_bc \
                        = navier_stokes_equation(ns) # compute residuals for Navier-Stokes equations with specified boundary and initial conditions
    
    x_mom_loss = nn.MSELoss()(x_mom, torch.zeros_like(x_mom)) # x momentum loss
    y_mom_loss = nn.MSELoss()(y_mom, torch.zeros_like(y_mom)) # y momentum loss
    cont_loss = nn.MSELoss()(cont, torch.zeros_like(cont)) # continuity loss
    
    u_bc_loss = nn.MSELoss()(u_left_bc, torch.zeros_like(u_left_bc)) + nn.MSELoss()(u_right_bc, torch.zeros_like(u_right_bc)) \
        + nn.MSELoss()(u_top_bc, torch.zeros_like(u_top_bc)) + nn.MSELoss()(u_bottom_bc, torch.zeros_like(u_bottom_bc)) \
            + nn.MSELoss()(u_boundary, torch.zeros_like(u_boundary)) # u boundary conditions loss
        
    v_bc_loss = nn.MSELoss()(v_left_bc, torch.zeros_like(v_left_bc)) + nn.MSELoss()(v_right_bc, torch.zeros_like(v_right_bc)) \
        + nn.MSELoss()(v_top_bc, torch.zeros_like(v_top_bc)) + nn.MSELoss()(v_bottom_bc, torch.zeros_like(v_bottom_bc)) \
            + nn.MSELoss()(v_boundary, torch.zeros_like(v_boundary)) # v boundary conditions loss
        
    p_bc_loss = nn.MSELoss()(p_left_bc, torch.zeros_like(p_left_bc)) + nn.MSELoss()(p_right_bc, torch.zeros_like(p_right_bc)) \
        + nn.MSELoss()(p_top_bc, torch.zeros_like(p_top_bc)) + nn.MSELoss()(p_bottom_bc, torch.zeros_like(p_bottom_bc)) \
            + nn.MSELoss()(p_gradient_bc, torch.zeros_like(p_gradient_bc)) # contunuity boundary conditions loss
    
    bc_loss = (u_bc_loss + v_bc_loss + p_bc_loss) / 216 # total boundary condition loss
    mse_loss = (x_mom_loss + y_mom_loss + cont_loss) / 2500 # total equation loss
    
    total_loss =  mse_loss + bc_loss # total loss
            
    return total_loss

#%% save and resume training
def save_checkpoint(epoch, model, optimizer): # save check point
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict()}
    checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")
    torch.save(checkpoint, checkpoint_path)
    print(f"Checkpoint saved at epoch {epoch}")

def load_checkpoint(model, optimizer): # load check point
    latest_checkpoint = max(glob.glob(os.path.join(checkpoint_dir, 'checkpoint_epoch_*.pt')), key=os.path.getctime)
    checkpoint = torch.load(latest_checkpoint)
    epoch = checkpoint['epoch']
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    print(f"Checkpoint loaded from epoch {epoch}")
    return model, optimizer, epoch

#%% show model summary
def count_parameters(model):
    return sum(p1.numel() for p1 in model.parameters() if p1.requires_grad) # show trainable parameters

#%% set same seeds for initialization
np.random.seed(20) # same random initializations every time numpy
rn.seed(20) # same random initializations every time for python 
torch.manual_seed(20) # set seed for reproducibility

#% generate sample points (grid)
Uinf = 1 # free stream velocity
nu = 1/10 # kinematic viscosity
rho = 1 # density
length_square = 1 # length of square / diameter of cylinder
L = 10 * length_square # Length of plate
D = 5 * length_square # Width of plate
Re = rho * Uinf * length_square / nu # Reynolds number
center_x, center_y = L / 3, D / 2 # center of circle
radius = 0.5 # circle radius

num_training_points = 50 # points per length

num_boundary_points = 17 # points on circle

angles = np.linspace(0, 2 * np.pi, num_boundary_points) # angle for each point

# x and y coordinates for each point on circle boundary
x_boundary = center_x + radius * np.cos(angles)
y_boundary = center_y + radius * np.sin(angles)

# Generate points for boundaries
x_l = np.random.uniform(0, 0, num_training_points) # left boundary
y_l = np.random.uniform(0, D, num_training_points)

x_r = np.random.uniform(L, L, num_training_points) # right boundary
y_r = np.random.uniform(0, D, num_training_points)

x_b = np.random.uniform(0, L, num_training_points) # bottom boundary
y_b = np.random.uniform(0, 0, num_training_points)

x_t = np.random.uniform(0, L, num_training_points) # top boundary
y_t = np.random.uniform(D, D, num_training_points)

x_m = np.random.uniform(0, L, 2500) # points inside domain
y_m = np.random.uniform(0, D, 2500)

x = np.concatenate([x_l, x_r, x_b, x_t, x_m, x_boundary]) # x co-ordinates
y = np.concatenate([y_l, y_r, y_b, y_t, y_m, y_boundary]) # y co-ordinates

distance = np.sqrt((x - center_x) ** 2 + (y - center_y) ** 2) # distance of each point from center

outside_circle = distance >= radius # filter out points inside circle
x_filtered = x[outside_circle]
y_filtered = y[outside_circle]

tolerance = 0.01 # mark circumference
on_hole_boundary = (np.abs(distance - radius) <= tolerance)

x_train = np.concatenate([x_filtered, x_boundary]) # x co-ordinates
y_train = np.concatenate([y_filtered, y_boundary]) # y co-ordinates

distance_train = np.sqrt((x_train - center_x) ** 2 + (y_train - center_y) ** 2) # mark points on circle boundary
on_hole_boundary = (np.abs(distance_train - radius) <= tolerance)

x_hole = x_train[on_hole_boundary] # x co ordinates with points on boundary included
y_hole = y_train[on_hole_boundary] # y co ordinates with points on boundary included

#%% display mesh
plt.figure(dpi = 300)
plt.scatter(x_train, y_train, s = 0.1, c = 'k', alpha = 1) # scatter plot
plt.xlim(0, L)
plt.ylim(0, D)
plt.gca().set_aspect('equal', adjustable = 'box')
plt.grid(False)
plt.axis('off')
plt.show()

#%% create neural network model
h = 50 # number of neurons per hidden layer
model = nn.Sequential(
    nn.Linear(2, h), # 2 inputs for x and y
    nn.Tanh(), # hidden layer
    nn.Linear(h, h),
    nn.Tanh(), # hidden layer
    nn.Linear(h, h),
    nn.Tanh(), # hidden layer
    nn.Linear(h, h),
    nn.Tanh(), # hidden layer
    nn.Linear(h, h),
    nn.Tanh(), # hidden layer
    nn.Linear(h, 3)).to(device) # 3 outputs for u, v, p

for m in model.modules():
    if isinstance(m, nn.Linear):
        init.xavier_normal_(m.weight) # weights initialization
        init.constant_(m.bias, 0.0) # bias initialization

print(model) # show model details
num_trainable_params = count_parameters(model) # show trainable parameters
print("Number of trainable parameters:", num_trainable_params)

#%% prepare input data
train_points = np.vstack((x_train, y_train)).T # stack x, y together
train_points = torch.tensor(train_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU, use float64 if more precision is required

#%% train using Adam
resume_training = False # True if resuming training
start_epoch = 1
optimizer = optim.Adam(model.parameters(), lr = 0.0001) # select optimizer and parameters

checkpoint_dir = './pytorch_checkpoints' # directory to save checkpoints
os.makedirs(checkpoint_dir, exist_ok = True) # create a directory to save checkpoints
if resume_training:
    model, optimizer, start_epoch = load_checkpoint(model, optimizer)
    start_epoch += 1 # start from next epoch
    
for epoch in range(100001):
    optimizer.zero_grad() # clear gradients of all optimized variables
    loss_value = loss_fn(train_points) # compute prediction by passing inputs to model
    loss_value.backward() # compute gradient of loss with respect to model parameters
    optimizer.step() # perform parameter up date
    if epoch % 100 == 0:
        print(f"{epoch} {loss_value.item()}")

save_checkpoint(epoch, model, optimizer) # save model details
print(f"{epoch} {loss_value.item()}") # show loss values
print("Final Loss Values:")
print("x_mom_loss:", x_mom_loss.item())
print("y_mom_loss:", y_mom_loss.item())
print("cont_loss:", cont_loss.item())
print("u_bc_loss:", u_bc_loss.item())
print("v_bc_loss:", v_bc_loss.item())
print("p_bc_loss:", p_bc_loss.item())
print("mse_loss:", mse_loss.item())
print("bc_loss:", bc_loss.item())

#%% prediction
num_training_points = 1000 # per length

# Generate points for boundaries
x_l_te = np.random.uniform(0, 0, num_training_points) # left boundary
y_l_te = np.random.uniform(1, D, num_training_points)

x_r_te = np.random.uniform(L, L, num_training_points) # right boundary
y_r_te = np.random.uniform(0, D, num_training_points)

x_b_te = np.random.uniform(1, L, num_training_points) # bottom boundary
y_b_te = np.random.uniform(0, 0, num_training_points)

x_t_te = np.random.uniform(0, L, num_training_points) # top boundary
y_t_te = np.random.uniform(D, D, num_training_points)

x_m_te = np.random.uniform(0, L, 100000)
y_m_te = np.random.uniform(0, D, 100000)

x_te = np.concatenate([x_l_te, x_r_te, x_b_te, x_t_te, x_m_te]) # x co-ordinates
y_te = np.concatenate([y_l_te, y_r_te, y_b_te, y_t_te, y_m_te]) # y co-ordinates

distance_te = np.sqrt((x_te - center_x) ** 2 + (y_te - center_y) ** 2) # distance of each point from center

outside_circle_te = distance_te >= radius # filter out points inside circle

x_te_filtered = x_te[outside_circle_te]
y_te_filtered = y_te[outside_circle_te]

test_points = np.vstack((x_te_filtered, y_te_filtered)).T # stack x, y together
test_points = torch.tensor(test_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU, use float64 if more precision is required

predict = model(test_points).detach().cpu().numpy() # predict and move back to CPU

u = predict[:, 0] * Uinf # predict and bring back to dimensional form
v = predict[:, 1] * Uinf 
p = predict[:, 2] * Uinf * Uinf * rho
velocity_magnitude = np.sqrt(u**2 + v**2) # calculate velocity magnitude

# plotting
plt.figure(dpi = 300)
sc = plt.scatter(x_te_filtered, y_te_filtered, c = u, cmap = 'jet', s = 1, edgecolor='none', alpha = 1)
plt.colorbar(orientation='vertical')
plt.gca().set_aspect('equal', adjustable = 'box')
plt.axis('off')
plt.show()

plt.figure(dpi = 300)
sc = plt.scatter(x_te_filtered, y_te_filtered, c = v, cmap = 'jet', s = 1, edgecolor='none', alpha = 1)
plt.colorbar(orientation='vertical')
plt.gca().set_aspect('equal', adjustable = 'box')
plt.axis('off')
plt.show()

plt.figure(dpi = 300)
sc = plt.scatter(x_te_filtered, y_te_filtered, c = p, cmap = 'jet', s = 1, edgecolor = 'none', alpha = 1)
plt.colorbar(orientation='vertical')
plt.gca().set_aspect('equal', adjustable = 'box')
plt.axis('off')
plt.show()

#%% save PINN output
np.save('X_PINN.npy', x_te)
np.save('Y_PINN.npy', y_te)
np.save('u_PINN.npy', u)
np.save('v_PINN.npy', v)
np.save('p_PINN.npy', p)

#%% streamlines
x_t_mesh = np.linspace(0, L, 200) # initialize x
y_t_mesh = np.linspace(0, D, 100) # initialize y

test_points_x, test_points_y = np.meshgrid(x_t_mesh, y_t_mesh) # create combinations of sample points along the domain
test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # stack x, y together
test_points = torch.tensor(test_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU

distance_test = np.sqrt((test_points[:, 0] - center_x)**2 + (test_points[:, 1] - center_y)**2) # distance of each test point from center of circle
outside_circle_test = distance_test >= radius # exclude points of circle
test_points_filtered = test_points[outside_circle_test]

predict_filtered = model(test_points_filtered).detach().cpu().numpy() # predict using the filtered points

test_points_x_filtered = test_points_filtered[:, 0].cpu().numpy() # prepare for reshaping
test_points_y_filtered = test_points_filtered[:, 1].cpu().numpy() # prepare for reshaping

# filtered predictions
u_filtered = predict_filtered[:, 0] * Uinf
v_filtered = predict_filtered[:, 1] * Uinf 
p_filtered = predict_filtered[:, 2] * Uinf * Uinf * rho
velocity_magnitude_filtered = np.sqrt(u_filtered**2 + v_filtered**2)

# define regular grid
x_grid = np.linspace(0, L, 200)
y_grid = np.linspace(0, D, 100)
x_mesh, y_mesh = np.meshgrid(x_grid, y_grid)

# interpolate u and v components onto regular grid
u_grid = griddata((test_points_x_filtered, test_points_y_filtered), u_filtered, (x_mesh, y_mesh), method = 'cubic')
v_grid = griddata((test_points_x_filtered, test_points_y_filtered), v_filtered, (x_mesh, y_mesh), method = 'cubic')

mask = np.sqrt((x_mesh - center_x)**2 + (y_mesh - center_y)**2) < radius # create mask for inside of cylinder
u_grid[mask] = np.nan  # create void
v_grid[mask] = np.nan

plt.figure(dpi=300)
plt.streamplot(x_grid, y_grid, u_grid, v_grid, color='black', cmap='jet', density = 4, linewidth = 0.5,
               arrowstyle = '->', arrowsize = 1) # plot streamlines
plt.gca().set_aspect('equal', adjustable='box')
plt.axis('off')
plt.show()

     Thank you for reading! If you want to hire me as your next PostDoc researcher, reach out for collaboration in research, please do so!

References

[1] U. Ghia, K. N. Ghia, and C. T. Shin, “High-Re Solutions for Incompressible Flow Using the Navier-Stokes Equations and a Multigrid Method”, Journal of Computational Physics 48, 387-411 (1982)

[2] Cahya Amalinadhi, Pramudita S. Palar, Rafael Stevenson and Lavi Zuhal. "On Physics-Informed Deep Learning for Solving Navier-Stokes Equations," AIAA 2022-1436. AIAA SCITECH 2022 Forum. January 2022. https://doi.org/10.2514/6.2022-1436