iBoxDB - 2025-04-19
#!/usr/bin/env python
# coding: utf-8

'''The simplest deep learning, Seq2Seq, Sequence to Sequence. Version 1.3'''
'''python seq2seq.py'''

__credits__ = ["iBoxDB", "Bruce Yang CL-N", "2025-4"]


# 3rd parts
#https://pytorch.org  CPU version only
import torch 
import torch.utils.data as data
import os
th = torch
nn = torch.nn


class BrucePredictor(nn.Module):
    def __init__(self,max_seq,input_categories,out_categories,d_dim=128):

        super().__init__()

        self.embed = nn.Embedding(input_categories,d_dim)

        lin_dim = d_dim*2                  
        self.encode = nn.Sequential( 
            nn.Conv1d(max_seq,d_dim,1),             
            nn.Linear(d_dim,lin_dim), 
            nn.ReLU(),
            nn.Conv1d(d_dim,max_seq,1),
            nn.Linear(lin_dim,lin_dim),
            nn.LayerNorm(lin_dim), 
        ) 

        self.decode = nn.Sequential( 
            nn.Linear(lin_dim,lin_dim),
            nn.ReLU(),
            nn.Linear(lin_dim,lin_dim),
            nn.LayerNorm(lin_dim),
            nn.Linear(lin_dim,out_categories),
        )

        self.cal = torch.nn.CrossEntropyLoss() 

    def forward(self,x):         
        x = self.embed(x)          
        x = self.encode(x)        
        py = self.decode(x) 
        return py

    @th.no_grad()
    def one(self,x:th.Tensor):
        py = self.forward(x.unsqueeze(-2))

        py = th.argmax(py,dim=-1,keepdim=True)
        py = py.squeeze(-1)[0]

        return py

    def optim(self):    
        return torch.optim.AdamW(self.parameters(), lr=0.01)

    def loss(self, x0, y0) -> th.Tensor:
        cal = self.cal
        y = self.forward(x0)

        y = y.view(-1, y.size(-1))
        y0 = y0.view(-1) 
        lossy = cal(y,y0)

        loss = lossy
        return loss

    def save(self, f="./seq.pt"):
        th.save( self.state_dict(),f ) 
    def load(self, f="./seq.pt"):
        if os.path.exists(f):
            l = th.load(f)
            self.load_state_dict(l)



class SeqDataset(data.Dataset):
    SOS_token = 0   # Empty
    EOS_token = 1   # Ending
    Index2Word = {0: "SOS", 1: "EOS"}
    def __init__(self,max_seq,input_categories,out_categories,size):
        super().__init__()    

        self.data = th.empty((size, max_seq)).long()
        th.fill_(self.data,SeqDataset.SOS_token)

        for i in range(len(self.data)): 
            l = th.randint(1,max_seq,(1,)).item()
            self.data[i,0:l] = th.randint(input_categories-2,size=(l,)) + 2
            self.data[i,l] = SeqDataset.EOS_token

        # Mapping one word to another word
        self.lanmap = th.randint(out_categories-2, size=(input_categories,max_seq,max_seq)) + 2

    def __len__(self):
        return self.data.size(0)

    def __getitem__(self, index):
        x = self.data[index]

        l = x.argmin()
        k = (x[l-2].item()) % len(x)

        offset = l // 3
        if offset % 2 == 1:
            offset = 0-offset
        offset = l + offset
        offset = offset % len(x)

        y = [self.lanmap[c,k,i] for i, c in enumerate(x) if i<offset]  

        t = th.zeros_like(x)
        t[0:offset] = th.Tensor(y)
        t[offset] = SeqDataset.EOS_token
        y = t.long()             
        return x,y


if __name__ == "__main__":
    th.set_num_threads(4)
    th.set_default_dtype(th.float64)

    hidden_dim = 64
    max_seq = 11
    input_categories = 9
    out_categories = 10   

    dataset =  SeqDataset(max_seq,input_categories,out_categories,110000)
    dataset_train, dataset_test = data.random_split(dataset,[0.9,0.1])
    train_Loader = data.DataLoader(dataset_train,batch_size=256, shuffle=True, drop_last=True)    
    test_Loader = data.DataLoader(dataset_test,batch_size=1, shuffle=True, drop_last=True)    


    predictor = BrucePredictor(max_seq,input_categories,out_categories,hidden_dim)
    opt = predictor.optim()
    predictor.train()
    total = 10
    e_loss = 0.0
    e_count = 0
    for e in range(total):
        print(f"train: {e}/{total}")
        e_loss = 0
        e_count = 0
        for x0,y0 in train_Loader:  
            loss = predictor.loss(x0,y0)

            print( f"Loss: {loss.item()}" )
            opt.zero_grad()
            loss.backward()
            opt.step()
            e_loss += loss.item()
            e_count += 1
        print( f"{e} Total Loss: {e_loss}, Avg:{e_loss/e_count}" )
        if (e_loss/e_count) < 0.01 :
            print("goto Eval")
            break


    predictor.eval()
    for i,(x,y) in enumerate(test_Loader):
        p = predictor.one(x[0]) 
        print(i)
        print("X:",x[0].detach().numpy())
        print("P:",p.detach().numpy())
        print("Y:",y[0].detach().numpy()) 
        if i > 99 :        
            break

    print("End.")