#!/usr/bin/env python# coding: utf-8'''The simplest deep learning, CarRacing reinforcement, Run in desktop UI''''''for CarRacing-v3 v1.1''''''python gym_car.py'''__credits__=["iBoxDB","Bruce Yang CL-N","2025-3"]# 3rd parts#https://pytorch.org CPU version onlyimporttorch#pip install "gymnasium[box2d]"#pip install pygameimportgymnasium#from gymnasium.envs.box2d.car_racing import CarRacing# included in torchimportnumpyfromcopyimportdeepcopyth=torchnn=th.nnnp=numpygym=gymnasiumndarray=np.ndarrayTensor=th.Tensor#np.random.seed(0)#th.manual_seed(0)#torch.autograd.set_detect_anomaly(True)th.set_num_threads(8)th.set_default_dtype(th.float64)#look for the CarRacing-v2, CarRacing-v3, CarRacing-v4 ... in PC#gym.pprint_registry() gameName="CarRacing-v3"_continuous=True_domain_randomize=FalseclassImageEnv(gymnasium.Wrapper):def__init__(self,**kwargs):_env=gym.make(gameName,continuous=_continuous,domain_randomize=_domain_randomize,**kwargs)super().__init__(_env)in_dim_W,in_dim_H,in_dim_C=_env.observation_space.shapeself.in_dim=(in_dim_C,in_dim_W,in_dim_H,)self.in_dim=(1,1,186)self.observation_space=gym.spaces.Box(low=0,high=255,shape=self.in_dim,dtype=np.float32)defreset(self):self.env.reset()act=th.zeros((3,))buf=self.step(act.numpy())returnbuf[0],actdefstep(self,act):outp=th.empty(self.in_dim)t_reward=0terminal=truncate=Falseforiinrange(outp.shape[0]):state,reward,terminal,truncate,_=self.env.step(act)state=np.transpose(state,(2,0,1))g=state[0]*0.3333+state[1]*0.3333+state[2]*0.3333g[84:,:]=255g=(g[:90,:])[::-1,:]a=np.bitwise_and(g>102,g<108)g[a]=0a=g!=0g[a]=255g=np.concatenate((np.argmin(g,axis=0),np.argmin(g,axis=1))).astype(np.float64)outp[i]=th.tensor((g/100.0)[np.newaxis,:])t_reward+=rewardifterminalortruncate:breakifself.env.render_mode:self.env.render()passreturnoutp,t_reward,terminal,truncate,Noneenv=ImageEnv()action_dim=env.action_space.shape[0]in_dim=env.observation_space.shape[0]print(env.action_space)print(env.observation_space)print(action_dim,in_dim)device=torch.device("cpu")flatten=nn.Sequential(nn.Flatten(start_dim=-3),)defflattenWithAct(_model):d_flatten=deepcopy(flatten)def_add(state,act):r=d_flatten(state)r=th.cat((act,r),dim=-1)r=_model(r)returnrreturnlambdastate,act:_add(state,act),list(d_flatten.parameters())+list(_model.parameters())in_dim=flatten(env.reset()[0]).shape[0]print(in_dim)#== Begin ==modelActor=nn.Sequential(nn.Linear(in_dim+action_dim,512),nn.ReLU(),nn.Linear(512,512),nn.ReLU(),nn.Linear(512,action_dim),)modelActor,ps=flattenWithAct(modelActor)modelActor_scale=nn.Parameter(th.ones(action_dim)*10)modelActorOptim=torch.optim.Adam(ps+[modelActor_scale],lr=0.01)modelCritic=nn.Sequential(nn.Linear(in_dim+action_dim,512),nn.ReLU(),nn.Linear(512,512),nn.ReLU(),nn.Linear(512,1),)modelCritic,ps=flattenWithAct(modelCritic)modelCriticOptim=torch.optim.Adam(ps,lr=0.01)defmodelActor_forward(state,act):loc=modelActor(state,act)loc=(loc/10).sin()scale=modelActor_scale/10scale=scale.sin()**2returnloc,scaledefmodelActor_getaction(state,act,train=False):loc,scale=modelActor_forward(state,act)dist=torch.distributions.Normal(loc,scale)action=Noneiftrainandth.rand(1)<0.2:action=torch.tensor(env.action_space.sample())else:action=dist.sample()one_d=len(action.shape)==1ifone_d:action=action.unsqueeze(0)min=th.tensor([-1,+0.1,0])max=th.tensor([+1,+1,0.1])min=min.unsqueeze(0)max=max.unsqueeze(0)action=action.clamp(min,max)ifone_d:action=action.squeeze(0)logprob=dist.log_prob(action)returnaction,logprobdefmodelActor_logp_entropy(state,ary_act,action):loc,scale=modelActor_forward(state,ary_act)dist=torch.distributions.Normal(loc,scale)logprob=dist.log_prob(action)entropy=dist.entropy()returnlogprob,entropydefrunHuman(steps):t_env=ImageEnv(render_mode="human")state,act=t_env.reset()foriinrange(steps):ifi/2==0:state=state.unsqueeze(0)act=act.unsqueeze(0)action,_=modelActor_getaction(state,act)ifi/2==0:action=action.squeeze(0)state,reward,terminal,truncate,_=t_env.step(action.numpy())ifterminalortruncate:state,act=t_env.reset()else:act=actiont_env.close()runHuman(4)in_dim=env.observation_space.shape# train_agenttrain_agent_time_total=35fortrain_agent_timeinrange(train_agent_time_total):horizon_len=10000print(f"{train_agent_time}/{train_agent_time_total} : explore_env({gameName}, {horizon_len})")ary_actions=th.zeros((horizon_len,action_dim,))states=th.zeros((horizon_len,*in_dim,))actions=th.zeros((horizon_len,action_dim,))logprobs=th.zeros((horizon_len,action_dim,))rewards=th.zeros(horizon_len)terminals=th.zeros(horizon_len)# don't use truncate heretruncates=th.zeros(horizon_len)ary_state,ary_action=env.reset()ng_count=bg_count=0foriinrange(horizon_len):ary_actions[i]=ary_actionstate=ary_stateaction,logprob=modelActor_getaction(state,ary_action,True)ary_state,reward,terminal,truncate,_=env.step(action.numpy())iftruncate:terminal=Trueifreward>0:reward+=bg_countbg_count+=1ng_count=0ifary_action[1]>0:reward*=(1+ary_action[1])else:reward-=(ng_count/10.0)ng_count+=1bg_count=0terminal=terminalor(ng_count>=50)ifary_action[2]>0:reward*=(1+ary_action[2])actions[i]=actionstates[i]=statelogprobs[i]=logprobrewards[i]=rewardterminals[i]=terminaltruncates[i]=truncateifterminal:ary_state,ary_action=env.reset()ng_count=bg_count=0else:ary_action=actionary_actions=ary_actions.detach()states=states.detach()actions=actions.detach()logprobs=logprobs.detach()rewards=rewards.detach()terminals=terminals.detach()truncates=truncates.detach()undones=th.logical_not(terminals).detach()truncates=Noneprint(f"end explore_env()")pos_len=len(states)objective_time_run=0ifpos_len==0elsepos_len//10+1objective_time_run=np.min((10,objective_time_run))forobjective_timeinrange(objective_time_run):'''advantages'''values=modelCritic(states,ary_actions)values=values.squeeze(1)next_value=modelCritic(ary_state,ary_action)print(f"next_value {next_value.item()}")gamma=0.97masks=undones*gammalambda_gae_adv=0.6advantages=th.empty_like(values)advantage=0rw_horizon_len=len(rewards)fortinrange(rw_horizon_len-1,-1,-1):delta=rewards[t]+masks[t]*next_value-values[t]advantages[t]=advantage=delta+masks[t]*lambda_gae_adv*advantagenext_value=values[t]reward_sums=advantages+valuesadvantages=(advantages-advantages.mean())/(advantages.std()+1e-5)reward_sums=reward_sums.detach()advantages=advantages.detach()advantage=None'''update network'''criterion=torch.nn.MSELoss()obj_critic=(criterion(values,reward_sums)).mean()modelCriticOptim.zero_grad()obj_critic.backward()modelCriticOptim.step()new_logprobs,obj_entropys=modelActor_logp_entropy(states,ary_actions,actions)ratio_clip=0.25ratio=(new_logprobs-logprobs).exp()advantages=advantages.unsqueeze(1).expand_as(ratio)surrogate1=advantages*ratiosurrogate2=advantages*ratio.clamp(1-ratio_clip,1+ratio_clip)obj_surrogate=th.min(surrogate1,surrogate2)lambda_entropy=0.01obj_actor=obj_surrogate.sum()+obj_entropys.sum()*lambda_entropyobj_actor=-obj_actormodelActorOptim.zero_grad()obj_actor.backward()modelActorOptim.step()print(obj_critic.item(),obj_actor.item(),modelActor_scale.detach().numpy())if(modelActor_scale<8).any():print("reset modelActor_scale")modelActor_scale.data=(modelActor_scale.data*0)+10.0runHuman(250)print("END.")input("Show UI?")modelActor_scale.data=(modelActor_scale.data*0)+3.0runHuman(25000)
Last edit: iBoxDB 2025-04-07