Menu

Directly denoising Diffusion model the Simplest deep learning

iBoxDB
2025-06-01
2025-06-01
  • iBoxDB

    iBoxDB - 2025-06-01
    #!/usr/bin/env python
    # coding: utf-8
    
    '''Directly denoising Diffusion model, the Simplest deep learning.'''
    '''Version 1.0'''
    '''python diff.py'''
    
    __credits__ = ["iBoxDB", "Bruce Yang CL-N", "2025-5"]
    
    
    # 3rd parts
    #https://pytorch.org  CPU version only
    import torch
    
    import torch.nn as nn
    import torch.utils.data as tdata
    from torch.optim import Adam,AdamW
    
    from torchvision.datasets import MNIST, CIFAR10
    from torchvision.utils import save_image, make_grid
    
    import torchvision.transforms as transforms
    import torchvision.transforms.functional as TF
    
    import matplotlib.pyplot as plt
    import numpy as np
    
    import math
    import os
    
    th = torch
    DataLoader = tdata.DataLoader
    
    
    '''
    if over trained, remove the *.pt files.
    set 'epochs' to 20
    and re-train several times till best results.
    it using random noisy, results varying.
    '''
    epochs = 60  #0 
    
    train_batch_size = 36
    th.set_default_dtype(th.float32)
    th.set_num_threads(4)
    
    
    dataset_path = '~/datasets'   #download to here
    print(os.path.expanduser(dataset_path))
    
    dataset = 'MNIST'
    img_size = (1, 28, 28) 
    img_size = (1, 16, 16)
    
    transform = transforms.Compose([
            transforms.Resize( (img_size[1],img_size[2]) ,  transforms.InterpolationMode.BILINEAR ),
            transforms.ToTensor(),
    ])
    
    train_dataset = MNIST(dataset_path, transform=transform, train=True, download=True)
    #test_dataset  = MNIST(dataset_path, transform=transform, train=False, download=True)
    
    
    print(len(train_dataset))
    generator1 = torch.Generator().manual_seed(69981)
    train_dataset , _ = tdata.random_split(train_dataset,[0.01,0.99], generator1)
    
    '''
    train_dataset = [x for x in train_dataset if x[1] == 9 ]
    train_dataset = train_dataset[0:train_batch_size*2]
    '''
    
    train_dataset = [x for x in train_dataset]
    print(len(train_dataset))
    
    def draw_sample_image(x, postfix, block=True):
        plt.close('all')
        plt.figure(figsize=(5,5))
        plt.axis("off")
        plt.title(postfix)
        im = make_grid(x.detach().cpu(),
                        nrow=int(math.sqrt(len(x))),
                        scale_each=True, normalize=False)
        im = TF.resize(im,(im.size(1),im.size(2))) 
        im = np.transpose(im, (1, 2, 0))
        plt.imshow(im)
        plt.show(block=block)
        if not block:
            plt.pause(3)
            plt.close('all')
    
    
    class Denoiser(nn.Module):
    
        def __init__(self):
            super(Denoiser, self).__init__()
            C,H,W = img_size  
            self.unet = nn.Sequential(
                nn.Conv2d(C,32,4,2,1),
                nn.InstanceNorm2d(32),
                nn.Conv2d(32,64,4,2,1),
                nn.InstanceNorm2d(64),
                nn.Conv2d(64,128,1),
                nn.SiLU(),
                nn.Conv2d(128,128,2),
                nn.LeakyReLU(),
                nn.Conv2d(128,256,2),
                nn.LeakyReLU(),
    
                nn.ConvTranspose2d(256,128,2),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(128,64,2),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(64,32,4,2,1),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(32,C,4,2,1),
            )
    
        def forward(self,x):
            return self.bruce_forward(x)
    
        def bruce_forward(self,x): 
            res = self.unet(x)
            return x - res
    
    
    class Diffusion(nn.Module):
    
        def __init__(self):
            super(Diffusion, self).__init__()
            self.model = Denoiser()
    
        def scale_to_minus_one_to_one(self, x): 
            return x * 2 - 1
    
        def reverse_scale_to_zero_to_one(self, x):
            return (x + 1) / 2
    
        def bruce_noisy(self, x_zeros, ranLen=31): 
            x_zeros = x_zeros.detach() 
            x_zeros = self.scale_to_minus_one_to_one(x_zeros) 
    
            rs = []
            es = []
    
            for _ in range(1):
                target = torch.rand_like(x_zeros)
                target = self.scale_to_minus_one_to_one(target)
    
                alpha = 20 / 100
                epsilon = target - x_zeros * alpha
                rs.append(target)
                es.append(epsilon)
    
            for _ in range(ranLen-1):
                alpha = th.randint(21,100,(1,)).item() / 100
                epsilon = torch.rand_like(x_zeros)
                epsilon = self.scale_to_minus_one_to_one(epsilon)
    
                epsilon = epsilon * (1-alpha)
                noisy_sample = x_zeros * alpha + epsilon
                rs.append(noisy_sample)
                es.append(epsilon)
    
            return rs, es
    
    
        @th.no_grad()
        def sample(self,time=64):
            target = torch.rand(img_size)       
            target = self.scale_to_minus_one_to_one(target)
    
            rs = []
    
            target = target.unsqueeze(0)
            for alpha in range(20,20+time):
    
                epsilon = self.model(target).detach()
                if alpha == 20:    
                    pass
    
                alpha = alpha / 100
                x_zeros = (target - epsilon) / (alpha) 
    
                a = x_zeros.squeeze(0)
                a = a.clamp(-1,1)
                a =  self.reverse_scale_to_zero_to_one(a) 
                rs.append(a)
    
                alpha += 0.01
                epsilon = torch.rand_like(x_zeros)
                epsilon = self.scale_to_minus_one_to_one(epsilon)
                epsilon = epsilon * (1-alpha)
                epsilon = epsilon * 0.5
    
                target = x_zeros * alpha + epsilon
                target = target.clamp(-5,5)
    
            return th.stack(rs)    
    
        def forward(self,x,y):
            inputX = []
            inputY = []
            for a,_ in zip(x,y):
                rs,es = self.bruce_noisy(a) 
                inputX += rs
                inputY += es           
            inputX = th.stack(inputX)
            inputY = th.stack(inputY)
            return self.model(inputX), inputY
    
    
    denoising_loss = nn.MSELoss()
    diffusion = Diffusion()
    lr = 0.01
    optimizer = AdamW(diffusion.parameters(), lr=lr)
    
    if os.path.exists("diff.pt"):
        a = th.load("diff.pt")
        try:
            diffusion.load_state_dict(a["d"])
            optimizer.load_state_dict(a["o"])
            print("load diff.pt")
        except Exception as e:
            print(e)
    
    
    train_loader = DataLoader(dataset=train_dataset, batch_size=train_batch_size, shuffle=True,)
    for x,y in train_loader:
        x = x[0:36] 
        print(th.min(x), th.max(x))   
        #draw_sample_image(x,"Show")
    
        x = x[0]
        rs,es = diffusion.bruce_noisy(x,36)
        x = th.stack(rs)
        print(th.min(x), th.max(x))
        x = (x+1)/2
        #draw_sample_image(x,"Noisy")
    
        x = th.stack(es)
        print(th.min(x), th.max(x)) 
        x *= (1/1.21)
        x = (x+1)/2
        #draw_sample_image(x,"De Noisy")
    
        break
    
    count_loader = len(train_loader)
    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("Model Parameters: ", count_parameters(diffusion), count_loader)
    
    def show_samples(time=70,block=True):
        es = []
        for l in range(64):
            x = diffusion.sample(time)[-1]
            es.append(x)
        x = th.stack(es)
        draw_sample_image(x,"Samples",block)       
    
    diffusion.train()
    for epoch in range(epochs):
        noise_prediction_loss = 0
    
        train_loader = DataLoader(dataset=train_dataset, batch_size=train_batch_size, shuffle=True,) 
        for batch_idx, (x, y) in enumerate(train_loader):
            optimizer.zero_grad()
    
            x,y = diffusion(x,y) 
            loss = denoising_loss(x.view(-1), y.view(-1))
            noise_prediction_loss += loss.item()
            loss.backward()
            optimizer.step() 
            print(f"{batch_idx} / {count_loader}.", loss.item())
    
        noise_prediction_loss = noise_prediction_loss / count_loader    
        print("Epoch", epoch + 1, f"/ {epochs} complete.", " L: ", noise_prediction_loss)
        a = {"d":diffusion.state_dict(),
             "o":optimizer.state_dict()}
        th.save(a, "diff.pt")
        print("save diff.pt")
        if epoch % 10 == 1:
            show_samples(70,False)
    
        if noise_prediction_loss < 0.005 :
            print(epoch+1, " Goto Eval, remove diff.pt before re-train ")
            break
    
    diffusion.eval()
    
    for _ in range(2):
        show_samples()
    
    for l in range(10):
        x = diffusion.sample(81)
        print(th.min(x), th.max(x)) 
        draw_sample_image(x,"Sample Single")
    
    
    print("End.")
    
     
  • iBoxDB

    iBoxDB - 2025-06-01

    Image

    image2

     

    Last edit: iBoxDB 2025-06-01
  • iBoxDB

    iBoxDB - 2025-06-01

    images

    img5

     

    Last edit: iBoxDB 2025-06-01

Log in to post a comment.

Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.