SolidWorks Videos

Sunday, February 4, 2024

Solution to Transient, In-Compressible Navier–Stokes Equations in 2D via Data-Less Physics-Informed Neural Network. (Includes Free Code)

     Here I share ๐Ÿ’ป Data-less Physics-Informed Neural ๐Ÿง  Network code written in Python ๐Ÿ using TensorFlow ๐Ÿงฎ Keras. In the current configuration, the code will try to learn flow through a pipe. The output at various time instances is shown in Fig. 1. To increase neural network's capacity to learn complex features, like vortex dominated flows increase neurons in the hidden layers. To increase neural network's capacity to learn flow physics like boundary layer mechanics, increase the hidden layers. This is as far as I have understood via self-learning. Nice little alien ๐Ÿ‘ฝ technology this ๐Ÿ˜Š.

     Please note that a properly cooled ๐ŸงŠ GPU is recommended for running this code. If your CPU melts ๐Ÿ”ฅ, don't blame me ๐Ÿ˜ค.

MIT License

Copyright (c) 2024 Fahad Butt

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Code

#%% import stuff

import os

import random as rn

import numpy as np

import matplotlib.pyplot as plt

import tensorflow as tf

from tensorflow import keras

from tensorflow.keras import layers

from tensorflow.keras.layers.experimental import preprocessing


#%% define 2D transient in-compressible navier stokes equations

def navier_stokes_residual(ns):

    with tf.GradientTape(persistent=True) as tape:

        tape.watch(ns)  # set the input variable ns for gradient computation

        

        # extract components from the input tensor ns

        u = model(ns)[:, 0]  # x velocity

        v = model(ns)[:, 1]  # y velocity

        p = model(ns)[:, 2]  # pressure


        # compute derivatives with respect to x, y, t, and second derivatives with resoect to x, y

        u_x = tape.gradient(u, ns)[:, 0] # first derivative of neural network output (u) with respect to x

        u_y = tape.gradient(u, ns)[:, 1] # first derivative of neural network output (u) with respect to y

        u_t = tape.gradient(u, ns)[:, 2] # first derivative of neural network output (u) with respect to t

        v_x = tape.gradient(v, ns)[:, 0] # first derivative of neural network output (v) with respect to x

        v_y = tape.gradient(v, ns)[:, 1] # first derivative of neural network output (v) with respect to y

        v_t = tape.gradient(v, ns)[:, 2] # first derivative of neural network output (v) with respect to t

        p_x = tape.gradient(p, ns)[:, 0] # first derivative of neural network output (p) with respect to x

        p_y = tape.gradient(p, ns)[:, 1] # first derivative of neural network output (p) with respect to y

        u_xx = tape.gradient(u_x, ns)[:, 0] # second derivative of neural network output (u) with respect to x

        u_yy = tape.gradient(u_y, ns)[:, 1] # second derivative of neural network output (u) with respect to y

        v_xx = tape.gradient(v_x, ns)[:, 0] # second derivative of neural network output (v) with respect to x

        v_yy = tape.gradient(v_y, ns)[:, 1] # second derivative of neural network output (v) with respect to y

        

        # define the equations to solve

        x_mom = u_t + (u * u_x) + (v * u_y) + ((1 / rho) * p_x) - (nu * (u_xx + u_yy))  # x momentum

        y_mom = v_t + (u * v_x) + (v * v_y) + ((1 / rho) * p_y) - (nu * (v_xx + v_yy))  # y momentum

        cont = u_x + v_y # continuity


        eq_residual = x_mom + y_mom + cont  # combined residuals


        # boundary conditions for x momentum

        u_left_bc = u[ns[:, 0] == 0] - Uinf # left boundary condition: u = Uinf (free stream velocity)

        u_right_bc = u_x[ns[:, 0] == L] - 0 # right boundary condition: du/dx = 0

        u_top_bc = u[ns[:, 1] == D] - 0 # top boundary condition: u = 0

        u_bottom_bc = u[ns[:, 1] == 0] - 0 # bottom boundary condition: u = 0


        # boundary conditions for y momentum

        v_left_bc = v[ns[:, 0] == 0] - 0 # left boundary condition: v = 0

        v_right_bc = v_x[ns[:, 0] == L] - 0 # right boundary condition: dv/dx = 0

        v_top_bc = v[ns[:, 1] == D] - 0 # top boundary condition: v = 0

        v_bottom_bc = v[ns[:, 1] == 0] - 0 # bottom boundary condition: v = 0


        # boundary conditions for pressure

        p_left_bc = p_x[ns[:, 0] == 0] - 0  # left boundary condition: dp/dx = 0

        p_right_bc = p_x[ns[:, 0] == L] - 0  # right boundary condition: dp/dx = 0

        p_top_bc = p_y[ns[:, 1] == D] - 0  # top boundary condition: dp/dy = 0

        p_bottom_bc = p_y[ns[:, 1] == 0] - 0  # bottom boundary condition: dp/dy = 0

        

        # initial conditions

        i_c_u = u[ns[:, 2] == 0] - 0  # initial condition u = 0

        i_c_v = v[ns[:, 2] == 0] - 0  # initial condition v = 0

        i_c_p = p[ns[:, 2] == 0] - 0  # initial condition p = 0


    return eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, v_left_bc, v_right_bc,\

        v_top_bc, v_bottom_bc, p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, i_c_u, i_c_v, i_c_p


#%% create a custom loss function

def navier_stokes_loss(ns):

    ns = tf.convert_to_tensor(ns, dtype=tf.float32)  # convert ns to TensorFlow tensor

    

    eq_residual, u_left_bc, u_right_bc, u_top_bc, u_bottom_bc, \

        v_left_bc, v_right_bc, v_top_bc, v_bottom_bc, \

        p_left_bc, p_right_bc, p_top_bc, p_bottom_bc, \

            i_c_u, i_c_v, i_c_p = navier_stokes_residual(ns) # compute residuals for Navier-Stokes equations with specified boundary and initial conditions


    mse_loss = tf.reduce_mean(tf.square(eq_residual))  # compute mean squared error of the Navier-Stokes equation residuals

    

    u_bc_loss = tf.reduce_mean(tf.square(u_left_bc)) + tf.reduce_mean(tf.square(u_right_bc)) \

        + tf.reduce_mean(tf.square(u_top_bc)) + tf.reduce_mean(tf.square(u_bottom_bc))  # compute mean squared error of the u boundary conditions

        

    v_bc_loss = tf.reduce_mean(tf.square(v_left_bc)) + tf.reduce_mean(tf.square(v_right_bc)) \

        + tf.reduce_mean(tf.square(v_top_bc)) + tf.reduce_mean(tf.square(v_bottom_bc))  # compute mean squared error of the v boundary conditions

        

    p_bc_loss = tf.reduce_mean(tf.square(p_left_bc)) + tf.reduce_mean(tf.square(p_right_bc)) \

        + tf.reduce_mean(tf.square(p_top_bc)) + tf.reduce_mean(tf.square(p_bottom_bc))  # compute mean squared error of the p boundary conditions

        

    u_ic_loss = tf.reduce_mean(tf.square(i_c_u)) # compute mean squared error of u initial condition

    v_ic_loss = tf.reduce_mean(tf.square(i_c_v)) # compute mean squared error of v initial condition

    p_ic_loss = tf.reduce_mean(tf.square(i_c_p)) # compute mean squared error of p initial condition

    

    u_loss = u_ic_loss + u_bc_loss # total loss u momentum initial and boundary conditions

    v_loss = v_ic_loss + v_bc_loss # total loss v momentum initial and boundary conditions

    p_loss = p_ic_loss + p_bc_loss # total loss pressure initial and boundary conditions

               

    weight_mse = 1 # weight for MSE

    weight_u = 1 # weight for u

    weight_v = 1 # weight for v

    weight_p = 1 # weight for p

    

    total_loss = weight_u * u_loss + weight_v * v_loss + weight_p * p_loss + weight_mse * mse_loss

        

    return total_loss


#%% function to load and save model and optimizer states (for resume)

def save_checkpoint():

    checkpoint.save(file_prefix=checkpoint_prefix)

    

def load_checkpoint():

    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

    

#%% set same seeds for initialization

np.random.seed(0) # same random initializations every time numpy

rn.seed(0) # same random initializations every time for python 

tf.random.set_seed(0) # same random initializations every time for TensorFlow


#%% generate sample points (grid)

nu = 0.01 # kinematic viscosity

rho = 1 # density

Uinf = 1 # free stream velocity

l_square = 1 # length for Reynolds number calculator

Re = l_square * Uinf / nu # Reynolds number

travel = 0.5 # times the disturbance travels entire length of computational domain

L = 1 # length of domain

D = 1 # width of domain

TT = travel * L / Uinf # total time

Nx = 20 # number of points in x-axis

Ny = 20 # number of points in y-axis

Nt = 20 # number of steps in time

x = np.linspace(0, L, Nx) # initialize x

y = np.linspace(0, D, Ny) # initialize y

t = np.linspace(0, TT, Nt) # initialize t


# visualize space and time meshes

# X, Y = np.meshgrid(x, y)

# pcm = plt.pcolormesh(X, Y, np.zeros_like(X), edgecolors='k', linewidth=0.5, cmap='jet', shading='auto', alpha=1)

# plt.plot(t, t, 'o')


#%% define the physics informed neural network structure

model = keras.Sequential([

    preprocessing.Normalization(input_shape=(3,)), # normalize inputs (2 for x, y; 1 for t)

    layers.Dense(20, activation='tanh', input_shape=(3, ), kernel_initializer='GlorotNormal'), # hidden layer

    layers.Dense(20, activation='tanh'), # hidden layer

    layers.Dense(20, activation='tanh'), # hidden layer

    layers.Dense(3, activation=None) # output layer

])


#%% Neural network parameters

train_points_x, train_points_y, train_points_t = np.meshgrid(x, y, t) # create combinations of sample points along the domain

train_points = np.vstack((train_points_x.flatten(), train_points_y.flatten(), train_points_t.flatten())).T # flatten the grid to feed to neural network

model.layers[0].adapt(train_points) # set up the normalization layer

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01) # specify learning rate and the optimizer

checkpoint_dir = './checkpoints' # check point directory (same as script)

checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

checkpoint = tf.train.Checkpoint(model=model, optimizer=optimizer) # checkpoint to save model and optimizer states

resume_training = False # set to True if resume is required

start_epoch = 1

if resume_training:

    load_checkpoint() # load the latest checkpoint to resume training

    start_epoch = int(tf.train.latest_checkpoint(checkpoint_dir).\

                      split('-')[-1]) # determine the starting epoch based on the loaded checkpoint

max_epochs = 20000 # maximum iterations

epoch = 0 # iteration counter

loss_value = float('inf') # initial loss

min_loss = 1e-30 # minimum loss (any acceptable value)


#%% Train the neural network

while epoch <= max_epochs and loss_value >= min_loss:

    with tf.GradientTape() as tape: # use tf.GradientTape to record operations for automatic differentiation

        loss_value = navier_stokes_loss(train_points) # compute total loss by evaluating Navier-Stokes loss function on training points

    gradients = tape.gradient(loss_value, model.trainable_variables) # calculate gradients of total loss with respect to trainable variables of model

    optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # update model trainable variables using calculated gradients and specified optimizer

    epoch += 1  # increases iteration counter by 1

    if epoch % 100 == 0:

        print(f"Epoch {epoch}, Loss: {loss_value.numpy()}") # output loss while training


save_checkpoint() # save training progress for resume


#%% predicting and plotting

target_time_step = TT # in seconds

x_t = np.linspace(0, L, 50) # initialize x

y_t = np.linspace(0, D, 50) # initialize y


test_points_x, test_points_y = np.meshgrid(x_t, y_t) # create pairs of sample points along the domain (for testing)

test_points = np.vstack((test_points_x.flatten(), test_points_y.flatten())).T # flatten the grid to feed to neural network model

test_points_t = np.column_stack([test_points, np.full_like(test_points[:, 0], target_time_step)]) # generate test points for the target time step


# predict velocity components and pressure at target time step

u_predicted_t = model.predict(test_points_t)[:, 0] # predict u velocity

v_predicted_t = model.predict(test_points_t)[:, 1] # predict v velocity

p_predicted_t = model.predict(test_points_t)[:, 2] # predict pressure

velocity_magnitude_t = np.sqrt(u_predicted_t**2 + v_predicted_t**2) # calculate velocity magnitude


# plotting

plt.figure(dpi=600) # make nice crisp image :)

plt.contourf(test_points_x, test_points_y, velocity_magnitude_t.reshape(test_points_x.shape), cmap='jet', levels=256) # contour plot

# plt.streamplot(test_points_x, test_points_y, u_predicted_t.reshape(test_points_x.shape),\

#                 v_predicted_t.reshape(test_points_x.shape), color='black', cmap='jet') # plot streamlines

plt.colorbar(label="Velocity Magnitude", orientation='horizontal')

plt.xlabel('Length [m]')

plt.ylabel('Width [m]')

plt.title(f'Velocity Magnitude Prediction at Time {target_time_step}s using PINN for Transient Navier-Stokes')

plt.axis('equal')

plt.clim(0, 1)

plt.show()


Fig. 1, PINN solution progression

     Cite as: Fahad Butt (2024). NS-PINN (https://whenrobotslove.blogspot.com/2024/02/solution-to-transient-in-complressible.html), Blogger. Retrieved Month Date, Year

     I tried to add as many comments in the code as possible. If you still can't understand, open a book ๐Ÿ“–? Thank you for reading! If you want to hide me as your next PhD student, please reach out!

No comments:

Post a Comment