Menu

CarRacing-v3 v1.1 reinforcement, The simplest deep learning.

iBoxDB
2025-04-07
2025-04-07
  • iBoxDB

    iBoxDB - 2025-04-07
    #!/usr/bin/env python
    # coding: utf-8
    
    '''The simplest deep learning, CarRacing reinforcement, Run in desktop UI'''
    '''for CarRacing-v3 v1.1'''
    '''python gym_car.py'''
    
    __credits__ = ["iBoxDB", "Bruce Yang CL-N", "2025-3"]
    
    # 3rd parts
    #https://pytorch.org  CPU version only
    import torch 
    #pip install "gymnasium[box2d]"
    #pip install pygame
    import gymnasium 
    #from gymnasium.envs.box2d.car_racing import CarRacing
    
    # included in torch
    import numpy
    from copy import deepcopy
    
    th = torch
    nn = th.nn
    np = numpy
    gym = gymnasium
    
    ndarray = np.ndarray
    Tensor = th.Tensor
    
    #np.random.seed(0)
    #th.manual_seed(0)
    #torch.autograd.set_detect_anomaly(True)
    
    
    th.set_num_threads(8)
    th.set_default_dtype(th.float64)
    
    #look for the CarRacing-v2, CarRacing-v3, CarRacing-v4 ... in PC
    #gym.pprint_registry() 
    gameName = "CarRacing-v3" 
    _continuous=True
    _domain_randomize=False
    class ImageEnv(gymnasium.Wrapper):
        def __init__(self,**kwargs):
            _env = gym.make(gameName, continuous=_continuous,
                            domain_randomize = _domain_randomize,
                            **kwargs)
            super().__init__(_env)
    
            in_dim_W,in_dim_H,in_dim_C = _env.observation_space.shape
            self.in_dim = (in_dim_C,in_dim_W,in_dim_H,)
            self.in_dim = (1,1,186)
            self.observation_space = gym.spaces.Box(
                low=0, high=255, shape=self.in_dim, dtype=np.float32
            )
    
    
        def reset(self):       
            self.env.reset()
            act = th.zeros((3,))
            buf = self.step(act.numpy())  
            return buf[0], act   
    
        def step(self,act): 
            outp = th.empty(self.in_dim)
            t_reward = 0
            terminal = truncate = False
            for i in range(outp.shape[0]):
                state, reward, terminal, truncate, _ = self.env.step(act)
                state = np.transpose(state,(2,0,1)) 
                g = state[0]*0.3333 + state[1]*0.3333 + state[2]*0.3333
                g[84:,:] = 255
                g = (g[:90,:])[::-1,:]
                a = np.bitwise_and(g>102 , g<108)
                g[a] = 0 
                a = g != 0
                g[a] = 255
                g = np.concatenate((np.argmin(g, axis=0), np.argmin(g, axis=1))).astype(np.float64)                 
                outp[i] = th.tensor((g/100.0)[np.newaxis,:])     
                t_reward += reward 
                if terminal or truncate :
                    break
                if self.env.render_mode:
                    self.env.render()
                    pass
    
            return outp,t_reward,terminal,truncate,None
    
    env = ImageEnv()
    
    action_dim = env.action_space.shape[0]
    in_dim = env.observation_space.shape[0]
    print(env.action_space)
    print(env.observation_space)
    print(action_dim, in_dim)
    
    device = torch.device("cpu")
    
    flatten = nn.Sequential(
                nn.Flatten(start_dim=-3),
            )
    
    def flattenWithAct(_model):
        d_flatten = deepcopy(flatten)
        def _add(state, act):
            r = d_flatten(state)
            r = th.cat((act,r), dim=-1)
            r = _model(r)
            return r
        return lambda state, act : _add(state, act), list(d_flatten.parameters())+list(_model.parameters())    
    
    in_dim = flatten(env.reset()[0]).shape[0]
    print(in_dim)
    
    #== Begin ==
    modelActor = nn.Sequential(
        nn.Linear(in_dim+action_dim,512),
        nn.ReLU(),
        nn.Linear(512,512),
        nn.ReLU(),
        nn.Linear(512,action_dim),   
    )  
    modelActor, ps = flattenWithAct(modelActor)
    modelActor_scale = nn.Parameter(th.ones(action_dim)*10)
    modelActorOptim = torch.optim.Adam(ps + [modelActor_scale],lr=0.01)
    
    
    modelCritic = nn.Sequential(
        nn.Linear(in_dim+action_dim,512),
        nn.ReLU(),
        nn.Linear(512,512),
        nn.ReLU(),
        nn.Linear(512,1),
    )
    modelCritic, ps = flattenWithAct(modelCritic)
    modelCriticOptim = torch.optim.Adam(ps,lr=0.01)
    
    
    def modelActor_forward(state,act):
        loc = modelActor(state,act)
        loc = (loc/10).sin() 
        scale = modelActor_scale/10 
        scale = scale.sin() ** 2
        return loc,scale
    
    
    def modelActor_getaction(state,act,train=False): 
        loc,scale = modelActor_forward(state,act)
        dist = torch.distributions.Normal (loc, scale)
    
        action = None
        if train and th.rand(1) < 0.2 :
            action = torch.tensor( env.action_space.sample() ) 
        else:
            action = dist.sample()
    
        one_d = len(action.shape) == 1 
        if one_d :
            action = action.unsqueeze(0)
    
        min = th.tensor( [-1,+0.1,  0] )
        max = th.tensor( [+1,  +1,0.1] )
        min = min.unsqueeze(0)
        max = max.unsqueeze(0)
        action = action.clamp(min, max)
    
        if one_d :
            action = action.squeeze(0)
    
        logprob = dist.log_prob(action)   
        return action, logprob
    
    def modelActor_logp_entropy(state,ary_act,action): 
        loc,scale = modelActor_forward(state,ary_act)
        dist = torch.distributions.Normal (loc, scale)
        logprob = dist.log_prob(action) 
        entropy = dist.entropy() 
        return logprob, entropy 
    
    
    def runHuman(steps):
        t_env = ImageEnv(render_mode="human") 
        state, act = t_env.reset() 
        for i in range(steps):  
            if i / 2 == 0 :
                state = state.unsqueeze(0)
                act = act.unsqueeze(0)
            action, _ = modelActor_getaction(state,act)
            if i / 2 == 0 :
                action = action.squeeze(0)
            state, reward, terminal, truncate, _ = t_env.step(action.numpy())
            if terminal or truncate : 
                state, act = t_env.reset()
            else:
                act = action                
        t_env.close()  
    
    runHuman(4)
    
    in_dim = env.observation_space.shape
    
    # train_agent
    train_agent_time_total = 35
    for train_agent_time in range(train_agent_time_total): 
        horizon_len = 10000
    
        print(f"{train_agent_time}/{train_agent_time_total} : explore_env({gameName}, {horizon_len})")
    
        ary_actions = th.zeros((horizon_len,action_dim,))
        states = th.zeros((horizon_len,*in_dim,))
        actions = th.zeros((horizon_len,action_dim,))
        logprobs = th.zeros((horizon_len,action_dim,))
        rewards = th.zeros(horizon_len)
        terminals = th.zeros(horizon_len)
        # don't use truncate here
        truncates = th.zeros(horizon_len)
    
    
        ary_state, ary_action = env.reset()
        ng_count = bg_count = 0 
        for i in range(horizon_len):
            ary_actions[i] = ary_action
            state = ary_state
            action, logprob = modelActor_getaction(state,ary_action,True)
            ary_state, reward, terminal, truncate, _ = env.step(action.numpy())
    
            if truncate :
                terminal = True 
    
            if reward > 0:
                reward += bg_count
                bg_count +=1
                ng_count = 0
                if ary_action[1] > 0:            
                    reward *= (1+ary_action[1])
    
            else: 
                reward -= (ng_count/10.0)
                ng_count += 1
                bg_count = 0
                terminal = terminal or (ng_count >= 50)      
                if ary_action[2] > 0:
                    reward *= (1+ary_action[2])  
    
            actions[i] = action
            states[i] = state
            logprobs[i] = logprob
            rewards[i] = reward
            terminals[i] = terminal
            truncates[i] = truncate
    
            if terminal:
                ary_state, ary_action = env.reset()
                ng_count = bg_count = 0 
            else:
                ary_action = action 
    
        ary_actions = ary_actions.detach()
        states = states.detach()
        actions = actions.detach()
        logprobs = logprobs.detach()
        rewards = rewards.detach()
        terminals = terminals.detach()
        truncates = truncates.detach()
    
        undones = th.logical_not(terminals).detach() 
    
        truncates = None
    
        print(f"end explore_env()")
    
        pos_len = len(states)
        objective_time_run = 0 if pos_len==0 else pos_len // 10 + 1 
        objective_time_run = np.min( (10,objective_time_run) )
        for objective_time in range(objective_time_run):
            '''advantages'''
    
            values = modelCritic(states,ary_actions)
            values = values.squeeze(1)
    
            next_value = modelCritic(ary_state,ary_action)
            print(f"next_value {next_value.item()}")
    
            gamma = 0.97    
            masks = undones * gamma
    
            lambda_gae_adv = 0.6
    
            advantages = th.empty_like(values)
            advantage = 0
    
            rw_horizon_len = len(rewards)
            for t in range(rw_horizon_len-1,-1,-1):
                delta = rewards[t] + masks[t]*next_value - values[t]
                advantages[t] = advantage = delta + masks[t] * lambda_gae_adv * advantage    
                next_value = values[t]
    
            reward_sums = advantages + values
            advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-5)
    
            reward_sums = reward_sums.detach()
            advantages = advantages.detach()
            advantage = None
    
            '''update network'''
    
            criterion = torch.nn.MSELoss() 
    
            obj_critic = (criterion(values, reward_sums)).mean()
            modelCriticOptim.zero_grad()
            obj_critic.backward()
            modelCriticOptim.step()
    
            new_logprobs, obj_entropys = modelActor_logp_entropy(states,ary_actions ,actions)
    
            ratio_clip = 0.25
            ratio = (new_logprobs - logprobs).exp()
    
            advantages = advantages.unsqueeze(1).expand_as(ratio)
    
            surrogate1 = advantages * ratio
            surrogate2 = advantages * ratio.clamp(1 - ratio_clip, 1 + ratio_clip)
            obj_surrogate = th.min(surrogate1, surrogate2)
    
            lambda_entropy = 0.01
            obj_actor = obj_surrogate.sum() + obj_entropys.sum() * lambda_entropy
            obj_actor = -obj_actor
    
            modelActorOptim.zero_grad()
            obj_actor.backward()
            modelActorOptim.step()
    
            print( obj_critic.item(), obj_actor.item(), modelActor_scale.detach().numpy() )
    
            if (modelActor_scale<8).any():   
                print("reset modelActor_scale")
                modelActor_scale.data =  (modelActor_scale.data * 0) + 10.0
    
        runHuman(250)
    
    print("END.")
    input("Show UI?")
    modelActor_scale.data =  (modelActor_scale.data * 0) + 3.0
    runHuman( 25000 )
    
     
  • iBoxDB

    iBoxDB - 2025-04-07

    CR01

    CR02

     

    Last edit: iBoxDB 2025-04-07

Log in to post a comment.

Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.