Menu

LunarLander-v3 Continuous v1.5, The simplest deep learning.

iBoxDB
2025-03-11
23 hours ago
  • iBoxDB

    iBoxDB - 2025-03-11
    #!/usr/bin/env python
    # coding: utf-8
    
    '''The simplest deep learning, Continuous reinforcement, Run in desktop UI'''
    '''for LunarLander-v3 Continuous v1.5'''
    '''python gym_lunar.py'''
    
    __credits__ = ["iBoxDB", "Bruce Yang CL-N", "2025-2"]
    
    # 3rd parts
    #https://pytorch.org  CPU version only
    import torch 
    #pip install "gymnasium[box2d]"
    #pip install pygame
    import gymnasium 
    #from gymnasium.envs.box2d.lunar_lander import LunarLander, LunarLanderContinuous
    
    # included in torch
    import numpy
    
    th = torch
    nn = th.nn
    np = numpy
    gym = gymnasium
    
    ndarray = np.ndarray
    Tensor = th.Tensor
    
    #np.random.seed(0)
    #th.manual_seed(0)
    
    th.set_num_threads(4)
    th.set_default_dtype(th.float32)
    
    with th.no_grad():
        '''The simplest deep learning, Continuous reinforcement, Run in desktop UI'''
        pass
    
    #gym.pprint_registry()
    #gymnasium.envs.box2d.lunar_lander
    #print(gym.envs.__file__) 
    
    #look for the LunarLander-v2, LunarLander-v3, LunarLander-v4 ... in PC
    #gym.pprint_registry() 
    gameName = "LunarLander-v3"
    _continuous = True
    _wind_power = 3.0
    enable_wind = True
    env = gym.make(gameName, continuous=_continuous,  enable_wind=enable_wind,
                   wind_power=_wind_power)
    
    env.reset() 
    
    in_dim = env.observation_space.shape[0]
    action_dim = env.action_space.shape[0]
    
    print(env.action_space)
    print(action_dim, in_dim)
    
    device = torch.device("cpu")
    
    modelActor = nn.Sequential(
        nn.Linear(in_dim,256),
        nn.ReLU(),
        nn.Linear(256,256),
        nn.ReLU(),
        nn.Linear(256,action_dim),   
    ) 
    
    modelActor_scale = nn.Parameter(th.ones(action_dim)*10)
    t = list(modelActor.parameters())
    t.append(modelActor_scale)
    modelActorOptim = torch.optim.AdamW(t,lr=0.01)
    
    
    def modelActor_forward(state):
        loc = modelActor(state)
        loc = (loc/10).sin() 
        scale = modelActor_scale/10 
        scale = scale.sin() ** 2
        return loc,scale
    
    
    def modelActor_getaction(state,show=False): 
        loc,scale = modelActor_forward(state)
        if show :
            scale = th.ones_like(loc) * 0.05
        dist = torch.distributions.Normal (loc, scale)
        action = dist.sample()
        action = action.clamp(-1,+1)
        logprob = dist.log_prob(action)   
        return action, logprob
    
    def modelActor_logp_entropy(state,action): 
        loc,scale = modelActor_forward(state)
        dist = torch.distributions.Normal (loc, scale)
        logprob = dist.log_prob(action) 
        entropy = dist.entropy() 
        return logprob, entropy 
    
    
    def modelActor_printNormal(state):
        loc,scale = modelActor_forward(state)  
        return loc.clone(),scale.clone()
    
    
    def runHuman(steps,Show=False):
        t_env = gym.make(gameName, continuous=_continuous,  enable_wind=enable_wind,
                         wind_power=_wind_power,
                          render_mode="human") 
        ary_state, _ = t_env.reset() 
        for i in range(steps): 
            state = torch.tensor(ary_state)
            if i / 2 == 0 :
                state = state.unsqueeze(0)
            action, _ = modelActor_getaction(state,Show)
            if i / 2 == 0 :
                action = action.squeeze(0)
            ary_action = action.numpy()
    
            ary_state, reward, terminal, truncate, _ = t_env.step(ary_action)
            t_env.render()
            #print(ary_action, reward, terminal, truncate)
            if terminal or truncate : 
              #print(reward)
              ary_state, _ = t_env.reset()         
        t_env.close()  
    
    runHuman(2)
    
    modelCritic = nn.Sequential(
        nn.Linear(in_dim,256),
        nn.ReLU(),
        nn.Linear(256,256),
        nn.ReLU(),
        nn.Linear(256,1),
    ) 
    
    modelCriticOptim = torch.optim.AdamW(modelCritic.parameters(),lr=0.01)
    
    
    
    # train_agent
    train_agent_time_total = 50
    for train_agent_time in range(train_agent_time_total): 
        horizon_len = 10000
    
        print(f"{train_agent_time}/{train_agent_time_total} : explore_env({gameName}, {horizon_len}, Wind={enable_wind})")
    
        states = th.zeros((horizon_len,in_dim,))
        actions = th.zeros((horizon_len,action_dim,))
        logprobs = th.zeros((horizon_len,action_dim,))
        rewards = th.zeros(horizon_len)
        terminals = th.zeros(horizon_len)
        # don't use truncate here
        truncates = th.zeros(horizon_len)
    
    
        ary_state, _ = env.reset()
        for i in range(horizon_len):
            state = torch.tensor(ary_state)
            action, logprob = modelActor_getaction(state)
            ary_action = action.numpy()
            ary_state, reward, terminal, truncate, _ = env.step(ary_action)
            #print(ary_action, reward, terminal, truncate)
            if truncate :
                terminal = True
                reward = -150
    
            # go to this place
            target = 2 - np.sqrt(state[0] * state[0] + state[1] * state[1])
            target *= 8
    
            reward += target
    
            if terminal: 
                ary_state, _ = env.reset()
    
            states[i] = state
            actions[i] = action
            logprobs[i] = logprob
            rewards[i] = reward
            terminals[i] = terminal
            truncates[i] = truncate
    
    
        states = states.detach()
        actions = actions.detach()
        logprobs = logprobs.detach()
        rewards = rewards.detach()
        terminals = terminals.detach()
        truncates = truncates.detach()
    
        undones = th.logical_not(terminals).detach() 
    
        truncates = None
    
    
        print(f"end explore_env()")
    
    
        for objective_time in range(10):
            '''advantages'''
    
            #th.set_grad_enabled(True)
            #torch.autograd.set_detect_anomaly(True)
    
            values = modelCritic(states)
            values = values.squeeze(1)
            state = torch.tensor(ary_state)
            next_value = modelCritic(state)
            print(f"initial {next_value.item()}")
    
            gamma = 0.97
            masks = undones * gamma
    
            lambda_gae_adv = 0.95
            advantages = th.empty_like(values)
            advantage = 0
    
            for t in range(horizon_len-1,-1,-1):
                delta = rewards[t] + masks[t]*next_value - values[t]
                advantages[t] = advantage = delta + masks[t] * lambda_gae_adv * advantage    
                next_value = values[t]
    
            reward_sums = advantages + values
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-5)
    
            reward_sums = reward_sums.detach()
            advantages = advantages.detach()
            advantage = None
            #print(advantages.shape)
    
    
            '''update network'''
    
            criterion = torch.nn.MSELoss() 
    
            obj_critic = (criterion(values, reward_sums)).mean()
            modelCriticOptim.zero_grad()
            obj_critic.backward()
            modelCriticOptim.step()
    
            new_logprobs, obj_entropys = modelActor_logp_entropy(states, actions)
    
            ratio_clip = 0.25
            ratio = (new_logprobs - logprobs.detach()).exp()
            #print(ratio.shape, advantages.shape )
            advantages = advantages.unsqueeze(1).expand_as(ratio)
            #print("ratio " + str(ratio.mean().item()))
            surrogate1 = advantages * ratio
            surrogate2 = advantages * ratio.clamp(1 - ratio_clip, 1 + ratio_clip)
            obj_surrogate = th.min(surrogate1, surrogate2)
    
            lambda_entropy = 0.01
            obj_actor = obj_surrogate.sum() + obj_entropys.mean() * lambda_entropy
            obj_actor = -obj_actor
            modelActorOptim.zero_grad()
            obj_actor.backward()
            modelActorOptim.step()
            print( obj_critic.item(), obj_actor.item(), modelActor_scale.detach().numpy() )
    
        runHuman(200)
    
    print("END.")
    input("Show UI?")
    runHuman( 25000 , True )
    
     

    Last edit: iBoxDB 23 hours ago
  • iBoxDB

    iBoxDB - 23 hours ago

    L1Image

     

    Last edit: iBoxDB 23 hours ago

Log in to post a comment.