Douzero_Resnet/douzero/dmc/dmc.py

363 lines
15 KiB
Python

import os
import threading
import time
import timeit
import pprint
from collections import deque
import numpy as np
import torch
from torch import multiprocessing as mp
from torch import nn
import douzero.dmc.models
import douzero.env.env
from .file_writer import FileWriter
from .models import Model, OldModel, UnifiedModel
from .utils import get_batch, log, create_optimizers, act, infer_logic
import psutil
import shutil
import requests
mean_episode_return_buf = {p:deque(maxlen=100) for p in ['landlord', 'landlord_up', 'landlord_front', 'landlord_down']}
def compute_loss(logits, targets):
loss = ((logits.squeeze(-1) - targets)**2).mean()
return loss
def learn(position, actor_model, model, batch, optimizer, flags, lock):
"""Performs a learning (optimization) step."""
position_index = {"landlord": 31, "landlord_up": 32, 'landlord_front': 33, "landlord_down": 34}
print("Learn", position)
if flags.training_device != "cpu":
device = torch.device('cuda:'+str(flags.training_device))
else:
device = torch.device('cpu')
if flags.old_model:
obs_x_no_action = batch['obs_x_no_action'].to(device)
obs_action = batch['obs_x_batch'].to(device)
obs_x = torch.cat((obs_x_no_action, obs_action), dim=2).float()
obs_x = torch.flatten(obs_x, 0, 1)
else:
obs_x = batch["obs_x_batch"]
obs_x = torch.flatten(obs_x, 0, 1).to(device)
obs_z = torch.flatten(batch['obs_z'].to(device), 0, 1).float()
target = torch.flatten(batch['target'].to(device), 0, 1)
episode_returns = batch['episode_return'][batch['done'] & (batch["obs_type"] == position_index[position])]
if len(episode_returns) > 0:
mean_episode_return_buf[position].append(torch.mean(episode_returns).to(device))
with lock:
learner_outputs = model(obs_z, obs_x)
loss = compute_loss(learner_outputs['values'], target)
stats = {
'mean_episode_return_'+position: torch.mean(torch.stack([_r for _r in mean_episode_return_buf[position]])).item(),
'loss_'+position: loss.item(),
}
optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), flags.max_grad_norm)
optimizer.step()
if not flags.enable_onnx:
actor_model.get_model(position).load_state_dict(model.state_dict())
return stats
def train(flags):
"""
This is the main funtion for training. It will first
initilize everything, such as buffers, optimizers, etc.
Then it will start subprocesses as actors. Then, it will call
learning function with multiple threads.
"""
if flags.training_device != 'cpu' or flags.infer_devices != 'cpu':
if not torch.cuda.is_available():
raise AssertionError("CUDA not available. If you have GPUs, please specify the ID after `--gpu_devices`. Otherwise, please train with CPU with `python3 train.py --infer_devices cpu --training_device cpu`")
plogger = FileWriter(
xpid=flags.xpid,
xp_args=flags.__dict__,
rootdir=flags.savedir,
)
checkpointpath = os.path.expandvars(
os.path.expanduser('%s/%s/%s' % (flags.savedir, flags.xpid, 'model.tar')))
T = flags.unroll_length
B = flags.batch_size
# Initialize actor models
if flags.old_model:
actor_model = OldModel(device="cpu", flags = flags, lite_model = flags.lite_model)
elif flags.unified_model:
actor_model = UnifiedModel(device="cpu", flags = flags, lite_model = flags.lite_model)
else:
actor_model = Model(device="cpu", flags = flags, lite_model = flags.lite_model)
actor_model.eval()
# Initialize queues
actor_processes = []
ctx = mp.get_context('spawn')
batch_queues = {"landlord": ctx.SimpleQueue(), "landlord_up": ctx.SimpleQueue(), 'landlord_front': ctx.SimpleQueue(), "landlord_down": ctx.SimpleQueue()}
onnx_frame = ctx.Value('d', -1)
# Learner model for training
if flags.old_model:
learner_model = OldModel(device=flags.training_device, lite_model = flags.lite_model)
elif flags.unified_model:
learner_model = UnifiedModel(device=flags.training_device, lite_model = flags.lite_model)
else:
learner_model = Model(device=flags.training_device, lite_model = flags.lite_model)
# Stat Keys
stat_keys = [
'mean_episode_return_landlord',
'loss_landlord',
'mean_episode_return_landlord_up',
'loss_landlord_up',
'mean_episode_return_landlord_front',
'loss_landlord_front',
'mean_episode_return_landlord_down',
'loss_landlord_down'
]
frames, stats = 0, {k: 0 for k in stat_keys}
position_frames = {'landlord':0, 'landlord_up':0, 'landlord_front':0, 'landlord_down':0}
if flags.unified_model:
lock = threading.Lock()
position_locks = {'landlord': lock, 'landlord_up': lock, 'landlord_front': lock, 'landlord_down': lock, 'uni': lock}
else:
position_locks = {'landlord': threading.Lock(), 'landlord_up': threading.Lock(), 'landlord_front': threading.Lock(), 'landlord_down': threading.Lock()}
def sync_onnx_model(frames):
p_path = '%s/%s' % (flags.onnx_model_path, flags.xpid)
if not os.path.exists(p_path):
os.makedirs(p_path)
positions = ['uni'] if flags.unified_model else ['landlord', 'landlord_up', 'landlord_front', 'landlord_down']
for position in positions:
if flags.enable_onnx:
model_path = '%s/%s/model_%s.onnx' % (flags.onnx_model_path, flags.xpid, position)
onnx_params = learner_model.get_model(position)\
.get_onnx_params(torch.device('cpu') if flags.training_device == 'cpu' else torch.device('cuda:' + flags.training_device))
with position_locks[position]:
torch.onnx.export(
learner_model.get_model(position),
onnx_params['args'],
model_path,
export_params=True,
opset_version=10,
do_constant_folding=True,
input_names=onnx_params['input_names'],
output_names=onnx_params['output_names'],
dynamic_axes=onnx_params['dynamic_axes']
)
onnx_frame.value = frames
# Create optimizers
optimizers = create_optimizers(flags, learner_model)
# Load models if any
if flags.load_model and os.path.exists(checkpointpath):
checkpoint_states = torch.load(
checkpointpath, map_location=("cuda:"+str(flags.training_device) if flags.training_device != "cpu" else "cpu")
)
if flags.unified_model:
learner_model.get_model('uni').load_state_dict(checkpoint_states["model_state_dict"]['uni'])
optimizers['uni'].load_state_dict(checkpoint_states["optimizer_state_dict"]['uni'])
if not flags.enable_onnx:
actor_model.get_model('uni').load_state_dict(checkpoint_states["model_state_dict"]['uni'])
else:
for k in ['landlord', 'landlord_up', 'landlord_front', 'landlord_down']: # ['landlord', 'landlord_up', 'landlord_down']
learner_model.get_model(k).load_state_dict(checkpoint_states["model_state_dict"][k])
optimizers[k].load_state_dict(checkpoint_states["optimizer_state_dict"][k])
if not flags.enable_onnx:
actor_model.get_model(k).load_state_dict(checkpoint_states["model_state_dict"][k])
stats = checkpoint_states["stats"]
frames = checkpoint_states["frames"]
position_frames = checkpoint_states["position_frames"]
sync_onnx_model(frames)
log.info(f"Resuming preempted job, current stats:\n{stats}")
infer_queues = []
num_actors = flags.num_actors
for j in range(flags.num_actors_thread):
for i in range(num_actors):
infer_queues.append({
'input': ctx.Queue(maxsize=100), 'output': ctx.Queue(maxsize=100)
})
infer_processes = []
for device in flags.infer_devices.split(','):
for i in range(flags.num_infer if device != 'cpu' else 1):
infer = mp.Process(
target=infer_logic,
args=(i, device, infer_queues, actor_model, flags, onnx_frame))
infer.daemon = True
infer.start()
infer_processes.append({
'device': device,
'i': i,
'infer': infer
})
# Starting actor processes
for i in range(num_actors):
actor = mp.Process(
target=act,
args=(i, infer_queues[i * 4: (i + 1) * 4], batch_queues, flags))
actor.daemon = True
actor.start()
actor_processes.append({
'i': i,
'actor': actor
})
parent = psutil.Process()
parent.nice(psutil.NORMAL_PRIORITY_CLASS)
for child in parent.children():
child.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
def batch_and_learn(i, position, local_lock, position_lock, lock=threading.Lock()):
"""Thread target for the learning process."""
nonlocal frames, position_frames, stats
while frames < flags.total_frames:
batch = get_batch(batch_queues, position, flags, local_lock)
if 'uni' in optimizers.keys():
_stats = learn(position, actor_model, learner_model.get_model(position), batch,
optimizers['uni'], flags, position_lock)
else:
_stats = learn(position, actor_model, learner_model.get_model(position), batch,
optimizers[position], flags, position_lock)
with lock:
for k in _stats:
stats[k] = _stats[k]
to_log = dict(frames=frames)
to_log.update({k: stats[k] for k in stat_keys})
plogger.log(to_log)
frames += T * B
position_frames[position] += T * B
threads = []
locks = {}
locks['cpu'] = {'landlord': threading.Lock(), 'landlord_up': threading.Lock(), 'landlord_front': threading.Lock(), 'landlord_down': threading.Lock()}
for i in range(flags.num_threads):
for position in ['landlord', 'landlord_up', 'landlord_front', 'landlord_down']:
thread = threading.Thread(
target=batch_and_learn, name='batch-and-learn-%d' % i, args=(i,position, locks['cpu'][position],position_locks[position]))
thread.setDaemon(True)
thread.start()
threads.append(thread)
def checkpoint(frames):
if flags.disable_checkpoint:
return
log.info('Saving checkpoint to %s', checkpointpath)
_models = learner_model.get_models()
torch.save({
'model_state_dict': {k: _models[k].state_dict() for k in _models}, # {{"general": _models["landlord"].state_dict()}
'optimizer_state_dict': {k: optimizers[k].state_dict() for k in optimizers}, # {"general": optimizers["landlord"].state_dict()}
"stats": stats,
'flags': vars(flags),
'frames': frames,
'position_frames': position_frames
}, checkpointpath + '.new')
# Save the weights for evaluation purpose
for position in ['uni'] if flags.unified_model else ['landlord', 'landlord_up', 'landlord_front', 'landlord_down']:
model_weights_dir = os.path.expandvars(os.path.expanduser(
'%s/%s/%s' % (flags.savedir, flags.xpid, "general_" + position + '_' + str(frames) + '.ckpt')))
torch.save(learner_model.get_model(position).state_dict(), model_weights_dir)
if flags.enable_upload:
if flags.lite_model:
type = 'lite_'
else:
type = ''
if flags.old_model:
type += 'vanilla'
elif flags.unified_model:
type += 'unified_v2'
else:
type += 'resnet'
try:
requests.post(flags.upload_url, data={
'type': type,
'position': position,
'frame': frames
}, files = {'model_file':('model.ckpt', open(model_weights_dir, 'rb'))})
except:
print("模型上传失败")
os.remove(model_weights_dir)
shutil.move(checkpointpath + '.new', checkpointpath)
fps_log = []
timer = timeit.default_timer
try:
last_checkpoint_time = timer() - flags.save_interval * 60
last_onnx_sync_time = timer()
while frames < flags.total_frames:
start_frames = frames
position_start_frames = {k: position_frames[k] for k in position_frames}
start_time = timer()
time.sleep(5)
if timer() - last_checkpoint_time > flags.save_interval * 60:
checkpoint(frames)
last_checkpoint_time = timer()
if timer() - last_onnx_sync_time > flags.onnx_sync_interval:
sync_onnx_model(frames)
last_onnx_sync_time = timer()
end_time = timer()
fps = (frames - start_frames) / (end_time - start_time)
fps_log.append(fps)
if len(fps_log) > 240:
fps_log = fps_log[1:]
fps_avg = np.mean(fps_log)
position_fps = {k:(position_frames[k]-position_start_frames[k])/(end_time-start_time) for k in position_frames}
log.info('After %i (L:%i U:%i F:%i D:%i) frames: @ %.1f fps (avg@ %.1f fps) (L:%.1f U:%.1f F:%.1f D:%.1f) Stats:\n%s',
frames,
position_frames['landlord'],
position_frames['landlord_up'],
position_frames['landlord_front'],
position_frames['landlord_down'],
fps,
fps_avg,
position_fps['landlord'],
position_fps['landlord_up'],
position_fps['landlord_front'],
position_fps['landlord_down'],
pprint.pformat(stats))
for proc in actor_processes:
if not proc['actor'].is_alive():
i = proc['i']
actor = mp.Process(
target=act,
args=(i, infer_queues[i * 4: (i + 1) * 4], batch_queues, flags))
actor.daemon = True
actor.start()
proc['actor'] = actor
for proc in infer_processes:
if not proc['infer'].is_alive():
infer = mp.Process(
target=infer_logic,
args=(proc['i'], proc['device'], infer_queues, actor_model, flags, onnx_frame))
infer.daemon = True
infer.start()
proc['infer'] = actor
except KeyboardInterrupt:
flags.enable_upload = False
checkpoint(frames)
return
else:
for thread in threads:
thread.join()
log.info('Learning finished after %d frames.', frames)
checkpoint(frames)
plogger.close()