SolidWorks Videos

Wednesday, March 27, 2024

Teaching the Navier-Stokes Equations to Sand: Data-Less Physics Informed Neural Network (Includes PyTorch Code, Validated)

     Here is a simple code I for Data-less ๐Ÿ“ˆ Physics Informed Neural ๐Ÿง  Network ๐Ÿ•ธ️ to solve the steady-state 2D incompressible Navier-Stokes equations in Python using PyTorch. Already validated results will be uploaded here in good time. The code has all the comments ๐Ÿ‘จ‍๐Ÿซ you could possibly need. If you have questions, well... ๐Ÿ‘น

     Verdict, so far: FVM is King ๐Ÿ‘‘, still.

     Cite as: Fahad Butt (2024). NS-PINN (https://whenrobotslove.blogspot.com/2024/03/physics-informed-neural-network-code.html), Blogger. Retrieved Month Date, Year

Copyright (c) 2024 Fahad Butt

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

     Please note that a properly cooled ๐ŸงŠ GPU is recommended for running this code. If your CPU melts ๐Ÿ”ฅ, don't blame me ๐Ÿ˜ค. If you classify yourself as a poor peasant ๐Ÿ‘จ‍๐ŸŒพ, use: device = torch.device("cpu") # cuda for gpu, cpu for cpu ๐Ÿ˜.

The Code:

 #%% import necessary libraries

import os

import numpy as np

import random as rn

import matplotlib.pyplot as plt

import torch

import torch.nn as nn

import torch.nn.init as init

import torch.optim as optim

import glob

device = torch.device("cuda") # cuda for gpu, cpu for cpu


#%% the 2D navier-stokes equations and boundary conditions

def navier_stokes_equation(ns):

    ns.requires_grad_(True) # watch ns for gradient computation


    u = model(ns)[:, 0] # x velocity

    v = model(ns)[:, 1] # y velocity

    p = model(ns)[:, 2] # pressure

    

    du = torch.autograd.grad(u, ns, torch.ones_like(u), create_graph=True)[0] # du/dns

    dv = torch.autograd.grad(v, ns, torch.ones_like(v), create_graph=True)[0] # dv/dns

    dp = torch.autograd.grad(p, ns, torch.ones_like(p), create_graph=True)[0] # dp/dns

    

    u_x = du[:, 0] # du/dx

    u_y = du[:, 1] # du/dy

    v_x = dv[:, 0] # dv/dx

    v_y = dv[:, 1] # dv/dy

    p_x = dp[:, 0] # dp/dx

    p_y = dp[:, 1] # dp/dy

    

    u_xx = torch.autograd.grad(u_x, ns, torch.ones_like(u_x), create_graph=True)[0][:, 0] # d2u/dx2

    u_yy = torch.autograd.grad(u_y, ns, torch.ones_like(u_y), create_graph=True)[0][:, 1] # d2u/dy2

    v_xx = torch.autograd.grad(v_x, ns, torch.ones_like(v_x), create_graph=True)[0][:, 0] # d2v/dx2

    v_yy = torch.autograd.grad(v_y, ns, torch.ones_like(v_y), create_graph=True)[0][:, 1] # d2v/dy2

    

    x_mom = (u * u_x) + (v * u_y) + p_x - ((1 / Re) * (u_xx + u_yy)) # x momentum

    y_mom = (u * v_x) + (v * v_y) + p_y - ((1 / Re) * (v_xx + v_yy)) # y momentum

    cont = u_x + v_y # continuity

     

    # apply boundary conditions

    u_left_bc = u[ns[:, 0] == -0.5] - Uinf # x = 0

    u_right_bc = u_x[ns[:, 0] == 0.5] # x = L

    u_top_bc = u[ns[:, 1] == 0.5] # y = D

    u_bottom_bc = u[ns[:, 1] == -0.5] # y = 0

     

    v_left_bc = v[ns[:, 0] == -0.5] # x = 0

    v_right_bc = v_x[ns[:, 0] == 0.5] # x = L

    v_top_bc = v[ns[:, 1] == 0.5] # y = D

    v_bottom_bc = v[ns[:, 1] == -0.5] # y = 0

     

    p_left_bc = p_x[ns[:, 0] == -0.5] # x = 0

    p_right_bc = p[ns[:, 0] == 0.5] # x = L

    p_top_bc = p_y[ns[:, 1] == 0.5] # y = D

    p_bottom_bc = p_y[ns[:, 1] == -0.5] # y = 0


    return x_mom, y_mom, cont, \

        u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \

        v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \

        p_left_bc, p_right_bc, p_top_bc, p_bottom_bc


#%% define the loss function

def loss_fn(ns):

    global x_mom_loss, y_mom_loss, cont_loss, u_bc_loss, v_bc_loss, p_bc_loss

    

    ns = ns.clone().detach().requires_grad_(True).to(device)  # Move to GPU

    

    x_mom, y_mom, cont,\

        u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \

            v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \

                p_left_bc, p_right_bc, p_top_bc, p_bottom_bc = navier_stokes_equation(ns)

    

    x_mom_loss = nn.MSELoss()(x_mom, torch.zeros_like(x_mom)) # x momentum loss

    y_mom_loss = nn.MSELoss()(y_mom, torch.zeros_like(y_mom)) # y momentum loss

    cont_loss = nn.MSELoss()(cont, torch.zeros_like(cont)) # continuity loss

    

    u_bc_loss = nn.MSELoss()(u_left_bc, torch.zeros_like(u_left_bc)) + nn.MSELoss()(u_right_bc, torch.zeros_like(u_right_bc)) \

        + nn.MSELoss()(u_top_bc, torch.zeros_like(u_top_bc)) + nn.MSELoss()(u_bottom_bc, torch.zeros_like(u_bottom_bc)) # u momentum boundary condition loss

    v_bc_loss = nn.MSELoss()(v_left_bc, torch.zeros_like(v_left_bc)) + nn.MSELoss()(v_right_bc, torch.zeros_like(v_right_bc)) \

        + nn.MSELoss()(v_top_bc, torch.zeros_like(v_top_bc)) + nn.MSELoss()(v_bottom_bc, torch.zeros_like(v_bottom_bc)) # v momentum boundary condition loss

    p_bc_loss = nn.MSELoss()(p_left_bc, torch.zeros_like(p_left_bc)) + nn.MSELoss()(p_right_bc, torch.zeros_like(p_right_bc)) \

        + nn.MSELoss()(p_top_bc, torch.zeros_like(p_top_bc)) + nn.MSELoss()(p_bottom_bc, torch.zeros_like(p_bottom_bc)) # continuity boundary condition loss

    

    bc_loss = u_bc_loss + v_bc_loss + p_bc_loss # total boundary condition loss

    mse_loss = x_mom_loss + y_mom_loss + cont_loss # total equation loss

    

    total_loss =  mse_loss + bc_loss # total loss

            

    return total_loss


#%% calculate loss, compute gradients, and print loss for each LBFGS optimization step

def closure():

    optimizer.zero_grad() # clear gradients

    loss = loss_fn(train_points) # compute the loss

    loss.backward() # compute gradients

    return loss


#%% save and resume training

def save_checkpoint(epoch, model, optimizer): # save check point

    checkpoint = {

        'epoch': epoch,

        'model_state_dict': model.state_dict(),

        'optimizer_state_dict': optimizer.state_dict()}

    checkpoint_path = os.path.join(checkpoint_dir, f"checkpoint_epoch_{epoch}.pt")

    torch.save(checkpoint, checkpoint_path)

    print(f"Checkpoint saved at epoch {epoch}")


def load_checkpoint(model, optimizer): # load check point

    latest_checkpoint = max(glob.glob(os.path.join(checkpoint_dir, 'checkpoint_epoch_*.pt')), key=os.path.getctime)

    checkpoint = torch.load(latest_checkpoint)

    epoch = checkpoint['epoch']

    model.load_state_dict(checkpoint['model_state_dict'])

    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    print(f"Checkpoint loaded from epoch {epoch}")

    return model, optimizer, epoch


#%% show model summary

def count_parameters(model):

    return sum(p.numel() for p in model.parameters() if p.requires_grad) # show trainable parameters


#%% set same seeds for initialization

np.random.seed(20) # same random initializations every time numpy

rn.seed(20) # same random initializations every time for python 

torch.manual_seed(20) # set seed for reproducibility


#%% generate sample points (grid)

Uinf = 4/3 # free stream velocity

nu = 0.02733333333333333333333333333333 # kinematic viscosity

rho = 1 # density

L = 2.1 # Length of the plate

D = 0.41 # Width of the plate

Re = round(rho * Uinf * D / nu) # Reynolds number

num_training_points = 625 # per meter


# Generate points for boundaries

x_l = np.random.uniform(0, 0, num_training_points) # left boundary

y_l = np.random.uniform(0, D, num_training_points)


x_r = np.random.uniform(L, L, num_training_points) # right boundary

y_r = np.random.uniform(0, D, num_training_points)


x_b = np.random.uniform(0, L, num_training_points) # bottom boundary

y_b = np.random.uniform(0, 0, num_training_points)


x_t = np.random.uniform(0, L, num_training_points) # top boundary

y_t = np.random.uniform(D, D, num_training_points)


x_m = np.random.uniform(0.002, L - 0.002, 10000) # points inside domain

y_m = np.random.uniform(0.002, D - 0.002, 10000) # points inside domain


x = np.concatenate([x_l, x_r, x_b, x_t, x_m]) # x co-ordinates

y = np.concatenate([y_l, y_r, y_b, y_t, y_m]) # y co-ordinates


x = (x / L) - 0.5 # normalize x

y = (y / D) - 0.5 # normalize y


#%% display mesh

plt.figure(dpi = 300)

plt.scatter(x, y, s = 0.1, c = 'k', alpha = 1) # scatter plot

plt.xlim(0, L)

plt.ylim(0, D)

plt.grid(True)

plt.axis('equal')

plt.grid(False)

plt.show()


#%% create neural network model

model = nn.Sequential(

    nn.Linear(2, 40), # 2 inputs for x and y

    nn.Tanh(), # hidden layer

    nn.Linear(40, 40),

    nn.Tanh(), # hidden layer

    nn.Linear(40, 40),

    nn.Tanh(), # hidden layer

    nn.Linear(40, 40),

    nn.Tanh(), # hidden layer

    nn.Linear(40, 40),

    nn.Tanh(), # hidden layer

    nn.Linear(40, 40),

    nn.Tanh(), # hidden layer

    nn.Linear(40, 3)).to(device) # 3 outputs for u, v, p


for m in model.modules():

    if isinstance(m, nn.Linear):

        init.xavier_normal_(m.weight) # weights initialization

        init.constant_(m.bias, 0.0) # bias initialization


print(model) # show model details

num_trainable_params = count_parameters(model) # show trainable parameters

print("Number of trainable parameters:", num_trainable_params)


#%% prepare input data

train_points = np.vstack((x, y)).T # stack x, y together

train_points = torch.tensor(train_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU


#%% train using Adam

resume_training = False # True if resuming training

start_epoch = 1

optimizer = optim.Adam(model.parameters(), lr = 0.001) # select optimizer and parameters


checkpoint_dir = './pytorch_checkpoints' # directory to save checkpoints

os.makedirs(checkpoint_dir, exist_ok=True) # create a directory to save checkpoints

if resume_training:

    model, optimizer, start_epoch = load_checkpoint(model, optimizer)

    start_epoch += 1 # start from the next epoch

    

for epoch in range(5000):

    optimizer.zero_grad() # clear the gradients of all optimized variables

    loss_value = loss_fn(train_points) # compute prediction by passing inputs to the model

    loss_value.backward() # compute gradient of loss with respect to model parameters

    optimizer.step() # perform parameter up date

    if epoch % 1000 == 0:

        print(f"{epoch} {loss_value.item()}")

        # print(f"Adam Epoch {epoch}")


save_checkpoint(epoch, model, optimizer) # save model details

print(f"{epoch} {loss_value.item()}")


#%% train using LBFGS optimizer

optimizer = optim.LBFGS(model.parameters(), line_search_fn = 'strong_wolfe') # LBFGS optimizer

resume_training = False # True if resuming training

start_epoch = 0

checkpoint_dir = './pytorch_checkpoints'  # directory to save checkpoints

os.makedirs(checkpoint_dir, exist_ok=True)  # create a directory to save checkpoints

if resume_training:

    model, optimizer, start_epoch = load_checkpoint(model, optimizer)

    start_epoch += 1  # start from the next epoch


for epoch in range(20):

    optimizer.step(closure) # perform parameter up date

    if epoch % 5 == 0:

        print(f"Loss: {closure().item()}")


save_checkpoint(epoch, model, optimizer)

print(f"{epoch} {closure().item()}")


#%% prediction

x_t = np.linspace(0, L, 20) # initialize x

y_t = np.linspace(0, D, 20) # initialize y

x_t = (x_t / L) - 0.5  # normalize test data x-axis

y_t = (y_t / D) - 0.5  # normalize test data y-axis


# test_points = np.vstack((x_t, y_t)).T # stack x, y together

test_points_x, test_points_y = np.meshgrid(x_t, y_t) # create combinations of sample points along the domain

test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # stack x, y together

test_points = torch.tensor(test_points, dtype=torch.float32).to(device) # convert to tensor and move training points to GPU / CPU


predict = model(test_points).detach().cpu().numpy() # predict and move back to CPU


u = predict[:, 0] * Uinf # predict and bring back to dimensional form

v = predict[:, 1] * Uinf 

p = predict[:, 2] * Uinf * Uinf * rho

velocity_magnitude = np.sqrt(u**2 + v**2)  # calculate velocity magnitude


# plotting

plt.figure(dpi = 300)  # make a nice crisp image :)

plt.contourf(test_points_x * L, test_points_y * D, u.reshape(test_points_x.shape), cmap='jet', levels=256)  # contour plot

# plt.clim(0, 1)

plt.colorbar(orientation='vertical')

plt.axis('equal')

plt.show()


# plotting

plt.figure(dpi = 300)  # make a nice crisp image :)

plt.contourf(test_points_x * L, test_points_y * D, v.reshape(test_points_x.shape), cmap='jet', levels=256)  # contour plot

# plt.clim(0, 1) 

plt.colorbar(orientation='vertical')

plt.axis('equal')

plt.show()


# plotting

plt.figure(dpi = 300)  # make a nice crisp image :)

plt.contourf(test_points_x * L, test_points_y * D, p.reshape(test_points_x.shape), cmap='jet', levels=256)  # contour plot

plt.colorbar(orientation='vertical')

plt.axis('equal')

plt.show()


# plotting

plt.figure(dpi = 300)  # make a nice crisp image :)

plt.streamplot(test_points_x * L, test_points_y * D, u.reshape(test_points_x.shape),\

                v.reshape(test_points_x.shape), color = 'black', cmap = 'jet', density = 1, linewidth = 0.5,\

                    arrowstyle='->', arrowsize = 1)  # plot streamlines

plt.colorbar(orientation='vertical')

plt.axis('equal')

plt.show()


     The boundary conditions are taken from [1]. The results presented in Fig. 1 are from flow between two parallel flat plates. For these boundary conditions, analytical solutions to the Navier-Stokes equations are available. As this is "just another blog" ๐Ÿ˜•; just by eye-balling ๐Ÿ˜†, the results look very promising. I mean from Fig. 1, the velocities at left side are captured well-ish!

Fig. 1, Post processing

     Thank you for reading! If you want to hire me as your next PhD researcher, reach out for collaboration in research, please do so!

References

[1] Cahya Amalinadhi, Pramudita S. Palar, Rafael Stevenson and Lavi Zuhal. "On Physics-Informed Deep Learning for Solving Navier-Stokes Equations," AIAA 2022-1436. AIAA SCITECH 2022 Forum. January 2022. https://doi.org/10.2514/6.2022-1436

Sunday, February 4, 2024

Solution to Transient, In-Compressible Navier–Stokes Equations in 2D via Data-Less Physics-Informed Neural Network. (Includes Free Code)

     Here I share ๐Ÿ’ป Data-less Physics-Informed Neural ๐Ÿง  Network code written in Python ๐Ÿ using TensorFlow ๐Ÿงฎ Keras. In the current configuration, the code will try to learn flow through a pipe. The output at various time instances is shown in Fig. 1. To increase neural network's capacity to learn complex features, like vortex dominated flows increase neurons in the hidden layers. To increase neural network's capacity to learn flow physics like boundary layer mechanics, increase the hidden layers. This is as far as I have understood via self-learning. Nice little alien ๐Ÿ‘ฝ technology this ๐Ÿ˜Š.

     Please note that a properly cooled ๐ŸงŠ GPU is recommended for running this code. If your CPU melts ๐Ÿ”ฅ, don't blame me ๐Ÿ˜ค.

MIT License

Copyright (c) 2024 Fahad Butt

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Code

#%% import stuff

import os

import random as rn

import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf

from tensorflow import keras

from tensorflow.keras import layers

from tensorflow.keras.layers.experimental import preprocessing


#%% define 2D transient in-compressible navier stokes equations

def navier_stokes_residual(ns):

    with tf.GradientTape(persistent=True) as tape:

        tape.watch(ns)  # set the input variable ns for gradient computation

        

        # extract components from the input tensor ns

        u = model(ns)[:, 0]  # x velocity

        v = model(ns)[:, 1]  # y velocity

        p = model(ns)[:, 2]  # pressure


        # compute derivatives with respect to x, y, t, and second derivatives with resoect to x, y

        u_x = tape.gradient(u, ns)[:, 0] # first derivative of neural network output (u) with respect to x

        u_y = tape.gradient(u, ns)[:, 1] # first derivative of neural network output (u) with respect to y

        u_t = tape.gradient(u, ns)[:, 2] # first derivative of neural network output (u) with respect to t

        v_x = tape.gradient(v, ns)[:, 0] # first derivative of neural network output (v) with respect to x

        v_y = tape.gradient(v, ns)[:, 1] # first derivative of neural network output (v) with respect to y

        v_t = tape.gradient(v, ns)[:, 2] # first derivative of neural network output (v) with respect to t

        p_x = tape.gradient(p, ns)[:, 0] # first derivative of neural network output (p) with respect to x

        p_y = tape.gradient(p, ns)[:, 1] # first derivative of neural network output (p) with respect to y

        u_xx = tape.gradient(u_x, ns)[:, 0] # second derivative of neural network output (u) with respect to x

        u_yy = tape.gradient(u_y, ns)[:, 1] # second derivative of neural network output (u) with respect to y

        v_xx = tape.gradient(v_x, ns)[:, 0] # second derivative of neural network output (v) with respect to x

        v_yy = tape.gradient(v_y, ns)[:, 1] # second derivative of neural network output (v) with respect to y

        

        # define the equations to solve

        x_mom = u_t + (u * u_x) + (v * u_y) + ((1 / rho) * p_x) - (nu * (u_xx + u_yy))  # x momentum

        y_mom = v_t + (u * v_x) + (v * v_y) + ((1 / rho) * p_y) - (nu * (v_xx + v_yy))  # y momentum

        cont = u_x + v_y # continuity


        eq_residual = x_mom + y_mom + cont  # combined residuals


        # boundary conditions for x momentum

        u_left_bc = u[ns[:, 0] == 0] - Uinf # left boundary condition: u = Uinf (free stream velocity)

        u_right_bc = u_x[ns[:, 0] == L] - 0 # right boundary condition: du/dx = 0

        u_top_bc = u[ns[:, 1] == D] - 0 # top boundary condition: u = 0

        u_bottom_bc = u[ns[:, 1] == 0] - 0 # bottom boundary condition: u = 0


        # boundary conditions for y momentum

        v_left_bc = v[ns[:, 0] == 0] - 0 # left boundary condition: v = 0

        v_right_bc = v_x[ns[:, 0] == L] - 0 # right boundary condition: dv/dx = 0

        v_top_bc = v[ns[:, 1] == D] - 0 # top boundary condition: v = 0

        v_bottom_bc = v[ns[:, 1] == 0] - 0 # bottom boundary condition: v = 0


        # boundary conditions for pressure

        p_left_bc = p_x[ns[:, 0] == 0] - 0  # left boundary condition: dp/dx = 0

        p_right_bc = p_x[ns[:, 0] == L] - 0  # right boundary condition: dp/dx = 0

        p_top_bc = p_y[ns[:, 1] == D] - 0  # top boundary condition: dp/dy = 0

        p_bottom_bc = p_y[ns[:, 1] == 0] - 0  # bottom boundary condition: dp/dy = 0

        

        # initial conditions

        i_c_u = u[ns[:, 2] == 0] - 0  # initial condition u = 0

        i_c_v = v[ns[:, 2] == 0] - 0  # initial condition v = 0

        i_c_p = p[ns[:, 2] == 0] - 0  # initial condition p = 0


    return eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, v_left_bc, v_right_bc,\

        v_top_bc, v_bottom_bc, p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, i_c_u, i_c_v, i_c_p


#%% create a custom loss function

def navier_stokes_loss(ns):

    ns = tf.convert_to_tensor(ns, dtype=tf.float32)  # convert ns to TensorFlow tensor

    

    eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \

        v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \

        p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, \

            i_c_u, i_c_v, i_c_p = navier_stokes_residual(ns) # compute residuals for Navier-Stokes equations with specified boundary and initial conditions


    mse_loss = tf.reduce_mean(tf.square(eq_residual))  # compute mean squared error of the Navier-Stokes equation residuals

    

    u_bc_loss = tf.reduce_mean(tf.square(u_left_bc)) + tf.reduce_mean(tf.square(u_right_bc)) \

        + tf.reduce_mean(tf.square(u_top_bc)) + tf.reduce_mean(tf.square(u_bottom_bc))  # compute mean squared error of the u boundary conditions

        

    v_bc_loss = tf.reduce_mean(tf.square(v_left_bc)) + tf.reduce_mean(tf.square(v_right_bc)) \

        + tf.reduce_mean(tf.square(v_top_bc)) + tf.reduce_mean(tf.square(v_bottom_bc))  # compute mean squared error of the v boundary conditions

        

    p_bc_loss = tf.reduce_mean(tf.square(p_left_bc)) + tf.reduce_mean(tf.square(p_right_bc)) \

        + tf.reduce_mean(tf.square(p_top_bc)) + tf.reduce_mean(tf.square(p_bottom_bc))  # compute mean squared error of the p boundary conditions

        

    u_ic_loss = tf.reduce_mean(tf.square(i_c_u)) # compute mean squared error of u initial condition

    v_ic_loss = tf.reduce_mean(tf.square(i_c_v)) # compute mean squared error of v initial condition

    p_ic_loss = tf.reduce_mean(tf.square(i_c_p)) # compute mean squared error of p initial condition

    

    u_loss = u_ic_loss + u_bc_loss # total loss u momentum initial and boundary conditions

    v_loss = v_ic_loss + v_bc_loss # total loss v momentum initial and boundary conditions

    p_loss = p_ic_loss + p_bc_loss # total loss pressure initial and boundary conditions

               

    weight_mse = 1 # weight for MSE

    weight_u = 1 # weight for u

    weight_v = 1 # weight for v

    weight_p = 1 # weight for p

    

    total_loss = weight_u * u_loss + weight_v * v_loss + weight_p * p_loss + weight_mse * mse_loss

        

    return total_loss


#%% function to load and save model and optimizer states (for resume)

def save_checkpoint():

    checkpoint.save(file_prefix=checkpoint_prefix)

    

def load_checkpoint():

    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

    

#%% set same seeds for initialization

np.random.seed(0) # same random initializations every time numpy

rn.seed(0) # same random initializations every time for python 

tf.random.set_seed(0) # same random initializations every time for TensorFlow


#%% generate sample points (grid)

nu = 0.01 # kinematic viscosity

rho = 1 # density

Uinf = 1 # free stream velocity

l_square = 1 # length for Reynolds number calculator

Re = l_square * Uinf / nu # Reynolds number

travel = 0.5 # times the disturbance travels entire length of computational domain

L = 1 # length of domain

D = 1 # width of domain

TT = travel * L / Uinf # total time

Nx = 20 # number of points in x-axis

Ny = 20 # number of points in y-axis

Nt = 20 # number of steps in time

x = np.linspace(0, L, Nx) # initialize x

y = np.linspace(0, D, Ny) # initialize y

t = np.linspace(0, TT, Nt) # initialize t


# visualize space and time meshes

# X, Y = np.meshgrid(x, y)

# pcm = plt.pcolormesh(X, Y, np.zeros_like(X), edgecolors='k', linewidth=0.5, cmap='jet', shading='auto', alpha=1)

# plt.plot(t, t, 'o')


#%% define the physics informed neural network structure

model = keras.Sequential([

    preprocessing.Normalization(input_shape=(3,)), # normalize inputs (2 for x, y; 1 for t)

    layers.Dense(20, activation='tanh', input_shape=(3, ), kernel_initializer='GlorotNormal'), # hidden layer

    layers.Dense(20, activation='tanh'), # hidden layer

    layers.Dense(20, activation='tanh'), # hidden layer

    layers.Dense(3, activation=None) # output layer

])


#%% Neural network parameters

train_points_x, train_points_y, train_points_t = np.meshgrid(x, y, t) # create combinations of sample points along the domain

train_points = np.vstack((train_points_x.flatten(), train_points_y.flatten(), train_points_t.flatten())).T # flatten the grid to feed to neural network

model.layers[0].adapt(train_points) # set up the normalization layer

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) # specify learning rate and the optimizer

checkpoint_dir = './checkpoints' # check point directory (same as script)

checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) # checkpoint to save model and optimizer states

resume_training = False # set to True if resume is required

start_epoch = 1

if resume_training:

    load_checkpoint() # load the latest checkpoint to resume training

    start_epoch = int(tf.train.latest_checkpoint(checkpoint_dir).\

                      split('-')[-1]) # determine the starting epoch based on the loaded checkpoint

max_epochs = 20000 # maximum iterations

epoch = 0 # iteration counter

loss_value = float('inf') # initial loss

min_loss = 1e-30 # minimum loss (any acceptable value)


#%% Train the neural network

while epoch <= max_epochs and loss_value >= min_loss:

    with tf.GradientTape() as tape: # use tf.GradientTape to record operations for automatic differentiation

        loss_value = navier_stokes_loss(train_points) # compute total loss by evaluating Navier-Stokes loss function on training points

    gradients = tape.gradient(loss_value, model.trainable_variables) # calculate gradients of total loss with respect to trainable variables of model

    optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # update model trainable variables using calculated gradients and specified optimizer

    epoch += 1  # increases iteration counter by 1

    if epoch % 100 == 0:

        print(f"Epoch {epoch}, Loss: {loss_value.numpy()}") # output loss while training


save_checkpoint() # save training progress for resume


#%% predicting and plotting

target_time_step = TT # in seconds

x_t = np.linspace(0, L, 50) # initialize x

y_t = np.linspace(0, D, 50) # initialize y


test_points_x, test_points_y = np.meshgrid(x_t, y_t) # create pairs of sample points along the domain (for testing)

test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # flatten the grid to feed to neural network model

test_points_t = np.column_stack([test_points, np.full_like(test_points[:, 0], target_time_step)]) # generate test points for the target time step


# predict velocity components and pressure at target time step

u_predicted_t = model.predict(test_points_t)[:, 0] # predict u velocity

v_predicted_t = model.predict(test_points_t)[:, 1] # predict v velocity

p_predicted_t = model.predict(test_points_t)[:, 2] # predict pressure

velocity_magnitude_t = np.sqrt(u_predicted_t**2 + v_predicted_t**2) # calculate velocity magnitude


# plotting

plt.figure(dpi=600) # make nice crisp image :)

plt.contourf(test_points_x, test_points_y, velocity_magnitude_t.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot

# plt.streamplot(test_points_x, test_points_y, u_predicted_t.reshape(test_points_x.shape),\

#                 v_predicted_t.reshape(test_points_x.shape), color='black', cmap='jet') # plot streamlines

plt.colorbar(label="Velocity Magnitude", orientation='horizontal')

plt.xlabel('Length [m]')

plt.ylabel('Width [m]')

plt.title(f'Velocity Magnitude Prediction at Time {target_time_step}s using PINN for Transient Navier-Stokes')

plt.axis('equal')

plt.clim(0, 1)

plt.show()


Fig. 1, PINN solution progression

     Cite as: Fahad Butt (2024). NS-PINN (https://whenrobotslove.blogspot.com/2024/02/solution-to-transient-in-complressible.html), Blogger. Retrieved Month Date, Year

     I tried to add as many comments in the code as possible. If you still can't understand, open a book ๐Ÿ“–? Thank you for reading! If you want to hide me as your next PhD student, please reach out!

Sunday, January 21, 2024

Dataless Physics Informed Neural Network (PINN) to Solve Laplace Equation using TensorFlow Keras in Python (Free Code)

     Here is code to solve 2D Laplace ๐Ÿงจ equation using PINN๐Ÿง . This ๐Ÿ‘ฝ technology works with/without previous data. Soon I will add data loss using finite difference so to improve the neural network's accuracy. Soon ๐Ÿคฃ. To convert this to 3D, add T_z and T_zz terms in laplace_equation and loss_fn functions.


import random as rn

import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf

from tensorflow import keras

from tensorflow.keras import layers

from tensorflow.keras.layers.experimental import preprocessing


# set same seeds for initialization

np.random.seed(0) # same random initializations every time numpy

rn.seed(0) # same random initializations every time for python 

tf.random.set_seed(0) # same random initializations every time for TensorFlow


# define the physics informed neural network structure

model = keras.Sequential([ # define a feedforward neural network with backpropagation and gradient descent with ADAM

    preprocessing.Normalization(input_shape=(2,)), # normalize inputs (2 for 2D, 3 for 3D)

    layers.Dense(50, activation='tanh', input_shape=(2, ), kernel_initializer='zeros'), # first hidden layer

    layers.Dense(50, activation='tanh'), # second hidden layer

    layers.Dense(1, activation=None) # output layer

])


# define the 2D Laplace equation and boundary conditions

def laplace_equation(le):

    with tf.GradientTape(persistent=True) as tape:

        tape.watch(le) # set the input variable le for gradient computation

        T = model(le) # predict the temperature field T using the neural network model

        T_x = tape.gradient(T, le)[:, 0]  # first derivative of neural network output with respect to x

        T_y = tape.gradient(T, le)[:, 1]  # first derivative of neural network output with respect to y

        T_xx = tape.gradient(T_x, le)[:, 0]  # second derivative of neural network output with respect to x

        T_yy = tape.gradient(T_y, le)[:, 1]  # second derivative of neural network output with respect to y

        eq_residual = T_xx + T_yy  # 2D Laplace equation: T_xx + T_yy = 0

        # apply boundary conditions

        left_bc = T[le[:, 0] == 0] - 0  # left boundary condition: T(x=0, y) = 0

        right_bc = T[le[:, 0] == 1] - 5  # right boundary condition: T(x=1, y) = 5

        top_bc = T[le[:, 1] == 1] - 20  # top boundary condition: T(x, y=1) = 20

        bottom_bc = T[le[:, 1] == 0] - 10  # bottom boundary condition: T(x, y=0) = 10

    return eq_residual, left_bc, right_bc, top_bc, bottom_bc


# define the loss function for 2D Laplace equation

def loss_fn(le):

    le = tf.convert_to_tensor(le, dtype=tf.float32) # convert le to TensorFlow tensor

    eq_residual, left_bc, right_bc, top_bc, \

        bottom_bc = laplace_equation(le)  # calculate the residuals and boundary conditions using the laplace_equation function

    mse_loss = tf.reduce_mean(tf.square(eq_residual)) # compute the mean squared error of the Laplace equation residuals

    bc_loss = tf.reduce_mean(tf.square(left_bc)) + tf.reduce_mean(tf.square(right_bc)) \

        + tf.reduce_mean(tf.square(top_bc)) + tf.reduce_mean(tf.square(bottom_bc)) # compute the mean squared error of the boundary conditions

    return mse_loss + bc_loss


# train the network

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) # specify learning rate and optimizer

max_epochs = 2000 # maximum iterations

epoch = 0 # iteration counter

loss_value = float('inf') # initial loss

min_loss = 0.01 # minimum loss (any acceptable value)


# generate sample points (grid)

L = 1 # length of the plate

D = 1 # width of the plate

Nx = 100 # number of points in x-axis

Ny = 100 # number of points in y-axis


train_points_x, train_points_y = np.meshgrid(np.linspace(0, L, Nx), np.linspace(0, D, Ny)) # create pairs of sample points along the domain

train_points = np.vstack((train_points_x.flatten(), train_points_y.flatten())).T # flatten the grid to feed to neural network

model.layers[0].adapt(train_points) # set up the normalization layer


# train the neural network

while epoch <= max_epochs and loss_value >= min_loss:

    with tf.GradientTape() as tape:

        loss_value = loss_fn(train_points)

    gradients = tape.gradient(loss_value, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    epoch += 1 # increases iteration counter by 1

    if epoch % 100 == 0:

        print(f"Epoch {epoch}, Loss: {loss_value.numpy()}") # output loss while training


# prediction

test_points_x, test_points_y = np.meshgrid(np.linspace(0, L, Nx), np.linspace(0, D, Ny)) # create pairs of sample points along the domain (for testing)

test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # flatten the grid to feed to neural network

predicted_temperature = model(test_points).numpy().reshape(test_points_x.shape) # neural network output


# plotting

plt.figure(dpi=600) # make nice crisp image :)

plt.contourf(test_points_x, test_points_y, predicted_temperature, cmap='turbo', levels=256) # contours of temperature

plt.colorbar(label='Temperature')

plt.xlabel('Length [m]')

plt.ylabel('Width [m]')

plt.title('2D Laplace Equation Solution using PINN')

plt.clim(0, 20) # Set legend limits

plt.show() # make plot


     Fig. 1, shows comparison with FDM using both Neumann and Dirichlet boundary conditions.  To apply Neumann boundary conditions, simply put T_x as 0 instead of T in the laplace_equation function. Within Fig. 1, right side is Dirichlet while Neumann is on the right side. Personally, FVM is still the king ๐Ÿ‘‘.


Fig. 1, Top row PINN bottom row FDM

Update 01

     This new code has functions and handles the normalization of inputs manually. This version also has less derivatives to compute by some trickery, so it runs faster. Navier-Stokes version soon! I am not sure if PINN for Navier-Stokes equations go in CFD blog or in this blog? ๐Ÿ˜• Also, this Update 1 code uses Keras 3 which is available on tensorflow-2.16.1.

#%% import stuff
import random as rn
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
from keras import layers

#%% define the 2D Laplace equation and boundary conditions
def laplace_equation(le):
    with tf.GradientTape(persistent=True) as tape2:
        tape2.watch(le) # watch the input variable le for 2nd order gradient computation
        
        with tf.GradientTape(persistent=True) as tape1:
            tape1.watch(le) # set the input variable le for 1st order gradient computation
            
            T = model(le) # predict the temperature field T using the neural network model
        
        dT = tape1.gradient(T, le) # dT/dle
        T_x = dT[:, 0] # d2T/dx2
        T_y = dT[:, 1] # d2T/dy2
        
    T_xx = tape2.gradient(T_x, le)[:, 0] # d2T/dx2
    T_yy = tape2.gradient(T_y, le)[:, 1] # d2T/dy2

    eq_residual = T_xx + T_yy # 2D Laplace equation: T_xx + T_yy = 0
        
    # apply boundary conditions
    left_bc = T[le[:, 0] == 0] - 0 # left boundary condition: T(x = 0, y) = 0
    right_bc = T_x[le[:, 0] == 1] - 0 # right boundary condition: dT/dx(x = L, y) = 0
    top_bc = T[le[:, 1] == 1] - 1 # top boundary condition: T(x, y = 1) = 1
    bottom_bc = T_y[le[:, 1] == 0] - 0 # bottom boundary condition: dT/dy(x, y = 0) = 0

    return eq_residual, left_bc, right_bc, top_bc, bottom_bc

#%% define the loss function for 2D Laplace equation
def loss_fn(le):
    le = tf.convert_to_tensor(le, dtype=tf.float32) # convert le to TensorFlow tensor

    eq_residual, left_bc, right_bc, top_bc,\
        bottom_bc = laplace_equation(le) # calculate the residuals and boundary conditions using the laplace_equation function
    
    mse_loss = tf.reduce_mean(tf.square(eq_residual)) # compute the mean squared error of the Laplace equation residuals
    
    bc_loss = tf.reduce_mean(tf.square(left_bc)) + tf.reduce_mean(tf.square(right_bc)) \
        + tf.reduce_mean(tf.square(top_bc)) + tf.reduce_mean(tf.square(bottom_bc)) # compute the mean squared error of the boundary conditions
    
    total_loss = mse_loss + bc_loss # total loss
    
    return total_loss

#%% set same seeds for initialization
np.random.seed(42) # same random initializations every time numpy
rn.seed(42) # same random initializations every time for python 
tf.random.set_seed(42) # same random initializations every time for TensorFlow / Keras

#%% generate sample points (grid)
L = 3.25 # length of the plate
D = 2.75 # width of the plate
Nx = 20 # number of points in x-axis
Ny = 20 # number of points in y-axis
x = np.linspace(0, L, Nx) # mesh in x axis
y = np.linspace(0, D, Ny) # mesh in y axis
x = x / L # normalize x
y = y / D # normalize y

#%% create the neural network
model = keras.Sequential([layers.Input(shape=(2,)), # input layer, 2 for x and y
                          layers.Dense(20, activation = 'tanh', kernel_initializer='GlorotNormal', bias_initializer='zeros'), # hidden layer
                          layers.Dense(1, activation = None)]) # output layer, 1 for T
model.summary() # display model

#%% define the physics informed neural network structure
train_points_x, train_points_y = np.meshgrid(x, y) # create pairs of sample points along the domain
train_points = np.vstack((train_points_x.flatten(), train_points_y.flatten())).T # flatten the grid to feed to neural network
optimizer = tf.keras.optimizers.Adam() # specify learning rate and optimizer
max_epochs = 10000 # maximum iterations

for epoch in range(1, max_epochs + 1): # training loop
    with tf.GradientTape() as tape3:
        loss_value = loss_fn(train_points) # compute total loss by evaluating Navier-Stokes loss function on training points
    gradients = tape3.gradient(loss_value, model.trainable_variables) # calculate gradients of total loss with respect to trainable variables of model
    optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # update model trainable variables using calculated gradients and specified optimizer
    epoch += 1  # increases iteration counter by 1
    if epoch % 100 == 0:
        # print(f"Epoch {epoch}")
        print(f"Epoch {epoch}, Loss: {loss_value.numpy()}")

#%% prediction
x_t = np.linspace(0, L, 10)  # initialize x
y_t = np.linspace(0, D, 10)  # initialize y
x_t = x_t / L # notmalize test data x-axis
y_t = y_t / D # notmalize test data y-axis
test_points_x, test_points_y = np.meshgrid(x_t, y_t)  # create pairs of sample points along the domain (for testing)
test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T  # flatten the grid to feed to the neural network model
predict = model.predict(test_points) # predict temperature

# plotting
plt.figure(dpi = 300) # make nice crisp image
plt.contourf(test_points_x * L, test_points_y * D, predict.reshape(test_points_x.shape), cmap = 'jet', levels = 32) # contours of temperature
plt.colorbar(orientation='vertical')
plt.clim(0, 1) # set legend limits
plt.axis("equal")
plt.show() # show plot

     If you want to hire me as your PhD student or collaborate in cutting-edge research, please reach out!

Wednesday, January 17, 2024

Feed Forward Back Propagating Neural Network (Regression) Theory

     This post is about the theory about the code shared in Feed Forward Back Propagating Neural Network (Regression)๐Ÿ‘พ. As far as I understand this alien technology ๐Ÿ‘ฝ.














     Thank you for reading! If you want to hire me as you brand new PhD student or want to collaborate on research, please reach out!

Sunday, January 14, 2024

      After successfully creating my own neural network from scratch using math and calculus only [here], I have taught myself to do the same in TensorFlow using CPU only. So one can run this without CUDA GPU. I have added comments; if the readers are like me and migrating from MATLAB; the comments will help, probably ๐Ÿ˜•. The results are shown in Fig. 1.

# import libraries

import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf

from tensorflow.keras import layers, models, optimizers, callbacks


# generate data

x = np.linspace(0, 1 * np.pi, round((1 * np.pi) / (np.pi / 16))).reshape(-1, 1)  # reshape is like x' in MATLAB, create neural network inputs from 0 to pi, with spacing of pi/16

y = x**2 * np.sin(x) # the function we want neural network to learn


# create neural network

tf.random.set_seed(42) # initialize neural network weights and biases to same values everytime to ensure reproducibility

model = models.Sequential() # create feed forward back propagating neural network

model.add(layers.Dense(5, activation='sigmoid', input_shape=(1,))) # 1st hidden layer, input_shape=(1,) means input must be column vector

model.add(layers.Dense(1)) # output layer, no activation in case of regression problems

custom_optimizer = optimizers.Adam(learning_rate=0.1) # learning rate for Adam optimizer

model.compile(optimizer=custom_optimizer, loss='mean_squared_error') # choose loss function

early_stopping = callbacks.EarlyStopping( # stopping criterion 

    monitor='loss', # monitor loss

    min_delta=0.0001, # minimum change to qualify as an improvement

    patience=100, # number of epochs with no improvement to stop training

    mode='min' # stop when the loss has stopped decreasing

)


# train the model

model.fit(x, y, epochs=1000, verbose=0, callbacks=[early_stopping]) 


# make predictions

x_predict = np.linspace(0, 1 * np.pi, round((1 * np.pi) / (np.pi / 31))).reshape(-1, 1) # create prediction data, entirely different from training data

y_predict = model.predict(x_predict) # neural network prediction


# plotting

plt.figure(dpi=300) # image resolution

plt.scatter(x, y, label='Actual Data', facecolors='none', edgecolors='red', alpha=1)

plt.plot(x_predict, y_predict, color='blue', label='Neural Network Predictions')

plt.xlabel('x')

plt.ylabel('f(x)')

plt.legend()

plt.show()


Fig. 1, Comparison of results

     Results are compared between TensorFlow and my own code [here] Fig. 2. For a simple case shown, TensorFlow takes 2.5s from pressing the run button to plotting. My own code takes 0.5 s. It's neck and neck ๐Ÿคฃ. Obviously, learning rate, hidden layers, hidden layer neurons, epochs and optimizers are same in both cases.


Fig. 2, Own code, VS analytical VS TF

     If you want to hire me as your awesome PhD student, please reach out! Thank you very much for reading!