Menu

Directly denoising Diffusion model the Simplest deep learning Ver 1.1

iBoxDB
2025-06-15
2025-06-15
  • iBoxDB

    iBoxDB - 2025-06-15
    #!/usr/bin/env python
    # coding: utf-8
    
    '''Directly denoising Diffusion model, the Simplest deep learning.'''
    '''Version 1.1'''
    '''python diff_c.py'''
    
    __credits__ = ["iBoxDB", "Bruce Yang CL-N", "2025-6"]
    
    
    # 3rd parts
    #https://pytorch.org  CPU version only
    import torch
    
    import torch.nn as nn
    import torch.utils.data as tdata
    from torch.optim import Adam,AdamW,SGD
    
    from torchvision.datasets import MNIST, CIFAR10
    from torchvision.utils import save_image, make_grid
    from PIL import ImageEnhance, Image
    
    import torchvision.transforms as transforms
    import torchvision.transforms.functional as TF
    
    import matplotlib.pyplot as plt
    import numpy as np
    
    import math
    import os 
    
    th = torch
    DataLoader = tdata.DataLoader
    
    
    '''
    if over trained, remove the *.pt files.
    set 'epochs' to 20
    and re-train several times till best results.
    it using random noisy, results varying.
    '''
    epochs = 400  #0 
    
    train_batch_size = 36
    th.set_num_threads(4)
    
    
    th.set_default_dtype(th.float32)
    device = th.device("cpu")
    th.set_default_device(device)
    
    dataset_path = '~/datasets'   #download to here
    print(os.path.expanduser(dataset_path))
    
    dataset = 'CIFAR10'
    img_size = (3, 32, 32)
    img_size = (3, 34, 34)
    print(img_size)
    
    
    transform = transforms.Compose([
            transforms.Resize( (img_size[1],img_size[2]) ,  transforms.InterpolationMode.BILINEAR,),
            transforms.ToTensor(),
    ])
    
    # Download online 170MB
    train_dataset = CIFAR10(dataset_path, transform=transform, train=True, download=True)  
    print(len(train_dataset))
    
    
    generator1 = torch.Generator(device=device).manual_seed(53782)
    train_dataset , _ = tdata.random_split(train_dataset,[0.02,0.98], generator1)
    train_dataset = [(x,y) for x,y in train_dataset if y in (8,)]
    train_dataset = [(x.to(device),y) for x,y in train_dataset]
    print(len(train_dataset))
    
    
    def draw_sample_image(x, postfix, block=True):
        plt.close('all')
        plt.figure(figsize=(5,5))
        plt.axis("off")
        plt.title(postfix)
        im = make_grid(x.detach().cpu().to(th.float32),
                        nrow=int(math.sqrt(len(x))),
                        scale_each=True, normalize=False)
    
        #im = TF.resize(im,(im.size(1),im.size(2))) 
        im = im.numpy() * 255
        im = np.transpose(im, (1, 2, 0))
        im = im.astype(np.int8)
        im = Image.fromarray(im,mode="RGB")
        im = ImageEnhance.Contrast(im).enhance(1.2)
        im = ImageEnhance.Sharpness(im).enhance(1.2)
    
        plt.imshow(im)
        plt.show(block=block)
        if not block:
            plt.pause(3)
            plt.close('all')
    
    
    class Denoiser(nn.Module):
    
        def __init__(self):
            super().__init__()
            C,H,W = img_size  
    
            self.unet = nn.Sequential( 
                nn.Conv2d(C,32,4,2),
                nn.InstanceNorm2d(32),
                nn.Conv2d(32,64,4,2,1),
                nn.InstanceNorm2d(64),
                nn.Conv2d(64,128,4,2,1),
                nn.SiLU(),
                nn.Conv2d(128,128,2),
                nn.LeakyReLU(),
                nn.Conv2d(128,256,2),
                nn.LeakyReLU(),
                nn.Conv2d(256,512,2),
                nn.LeakyReLU(),
    
                nn.ConvTranspose2d(512,256,2),
                nn.InstanceNorm2d(256),
                nn.ConvTranspose2d(256,128,2),
                nn.InstanceNorm2d(128),
                nn.ConvTranspose2d(128,128,2,),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(128,64,4,2,1,),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(64,32,4,2,1,),
                nn.LeakyReLU(),
                nn.ConvTranspose2d(32,C,4,2,),
            )
    
    
    
        def forward(self,x):
            return self.bruce_forward(x)
    
        def bruce_forward(self,x):
            x = x.detach()
            r = self.unet(x)
            return x - r
    
    
    class Diffusion(nn.Module):
    
        def __init__(self):
            super(Diffusion, self).__init__()
            self.model = Denoiser()
    
        def scale_to_minus_one_to_one(self, x): 
            return x * 2 - 1
    
        def reverse_scale_to_zero_to_one(self, x):
            return (x + 1) / 2
    
        def bruce_noisy(self, x_zeros, ranLen=31): 
            x_zeros = x_zeros.detach() 
            x_zeros = self.scale_to_minus_one_to_one(x_zeros) 
    
            rs = []
            es = []
    
            for _ in range(1):
                target = torch.rand_like(x_zeros)
                target = self.scale_to_minus_one_to_one(target)
    
                alpha = 20 / 100
                epsilon = target - x_zeros * alpha
                rs.append(target)
                es.append(epsilon)
    
            for _ in range(ranLen-1):
                alpha = th.randint(21,100,(1,)).item() / 100
                epsilon = torch.rand_like(x_zeros)
                epsilon = self.scale_to_minus_one_to_one(epsilon)
    
                epsilon = epsilon * (1-alpha)
                noisy_sample = x_zeros * alpha + epsilon
                rs.append(noisy_sample)
                es.append(epsilon)
    
            return rs, es
    
    
        @th.no_grad()
        def sample(self,time=64):
            target = torch.rand(img_size)       
            target = self.scale_to_minus_one_to_one(target)
    
            rs = []
    
            target = target.unsqueeze(0)
            for alpha in range(20,20+time):
    
                epsilon = self.model(target).detach()
                if alpha == 20:    
                    pass
    
                alpha = alpha / 100
                x_zeros = (target - epsilon) / (alpha) 
    
                a = x_zeros.squeeze(0)
                a = a.clamp(-1,1)
                a =  self.reverse_scale_to_zero_to_one(a) 
                rs.append(a)
    
                alpha += 0.01
                epsilon = torch.rand_like(x_zeros)
                epsilon = self.scale_to_minus_one_to_one(epsilon)
                epsilon = epsilon * (1-alpha)
                epsilon = epsilon * 0.5
    
                target = x_zeros * alpha + epsilon
                target = target.clamp(-5,5)
    
            return th.stack(rs)    
    
        def forward(self,x,y):
            inputX = []
            inputY = []
            for a,_ in zip(x,y):
                rs,es = self.bruce_noisy(a) 
                inputX += rs
                inputY += es           
            inputX = th.stack(inputX)
            inputY = th.stack(inputY)
            return self.model(inputX), inputY
    
    
    denoising_loss = nn.MSELoss()
    diffusion = Diffusion()
    lr = 0.001
    optimizer = AdamW(diffusion.parameters(), lr=lr)
    
    if os.path.exists("diff_c.pt"):
        a = th.load("diff_c.pt")
        try:
            diffusion.load_state_dict(a["d"])
            optimizer.load_state_dict(a["o"])
            print("load diff_c.pt")
        except Exception as e:
            print(e)
    
    
    train_loader = DataLoader(dataset=train_dataset, batch_size=train_batch_size, shuffle=True,
                              generator=th.Generator(device))
    for x,y in train_loader:
        x = x[0:16] 
        print(th.min(x), th.max(x))   
        #draw_sample_image(x,"Show",False)
    
        x = x[0]
        rs,es = diffusion.bruce_noisy(x,16)
        x = th.stack(rs)
        print(th.min(x), th.max(x))
        x = (x+1)/2
        #draw_sample_image(x,"Noisy",False)
    
        x = th.stack(es)
        print(th.min(x), th.max(x)) 
        x *= (1/1.21)
        x = (x+1)/2
        #draw_sample_image(x,"De Noisy",False)
    
        break
    
    count_loader = len(train_loader)
    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("Model Parameters: ", count_parameters(diffusion), count_loader)
    
    def show_samples(time=70,block=True):
        es = []
        for l in range(16):
            x = diffusion.sample(time)[-1]
            es.append(x)
        x = th.stack(es)
        draw_sample_image(x,"Samples",block)       
    
    diffusion.train()
    for epoch in range(epochs):
        noise_prediction_loss = 0
    
        train_loader = DataLoader(dataset=train_dataset, batch_size=train_batch_size, shuffle=True,
                                  generator=th.Generator(device)) 
        for batch_idx, (x, y) in enumerate(train_loader):
            optimizer.zero_grad()
    
            x,y = diffusion(x,y) 
            loss = denoising_loss(x.view(-1), y.view(-1))
            noise_prediction_loss += loss.item()
            loss.backward()
            optimizer.step() 
            print(f"{batch_idx} / {count_loader}.", loss.item())
    
        noise_prediction_loss = noise_prediction_loss / count_loader    
        print("Epoch", epoch + 1, f"/ {epochs} complete.", " L: ", noise_prediction_loss)
        a = {"d":diffusion.state_dict(),
             "o":optimizer.state_dict()}
        th.save(a, "diff_c.pt")
        print("save diff_c.pt")
        if epoch % 10 == 1:
            show_samples(70,False)
    
        if noise_prediction_loss < 0.001 :
            print(epoch+1, " Goto Eval, remove diff.pt before re-train ")
            break
    
    diffusion.eval()
    
    for v in range(5):  
        x = diffusion.sample(80)
        print(th.min(x), th.max(x)) 
        for i in range(5):
            if i in (0,1,2):
                continue
            x0 = x[i*16 : i*16+16]
            draw_sample_image(x0, f"Sample {v}-{i}",False)
    
    for _ in range(2):
        show_samples()
    
    print("End.")
    
     
  • iBoxDB

    iBoxDB - 2025-06-15

    Img

     

    Last edit: iBoxDB 2025-06-15

Log in to post a comment.

Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.