import os import torch import numpy as np from collections import Counter Card2Column = {3: 0, 4: 1, 5: 2, 6: 3, 7: 4, 8: 5, 9: 6, 10: 7, 11: 8, 12: 9, 13: 10, 14: 11, 17: 12, 20: 13, 30: 14} NumOnes2Array = {0: np.array([0, 0, 0, 0, 0, 0, 0, 0]), 1: np.array([1, 0, 0, 0, 0, 0, 0, 0]), 2: np.array([1, 1, 0, 0, 0, 0, 0, 0]), 3: np.array([1, 1, 1, 0, 0, 0, 0, 0]), 4: np.array([1, 1, 1, 1, 0, 0, 0, 0]), 5: np.array([1, 1, 1, 1, 1, 0, 0, 0]), 6: np.array([1, 1, 1, 1, 1, 1, 0, 0]), 7: np.array([1, 1, 1, 1, 1, 1, 1, 0]), 8: np.array([1, 1, 1, 1, 1, 1, 1, 1])} def _get_one_hot_bomb(bomb_num, use_legacy = False): """ A utility function to encode the number of bombs into one-hot representation. """ if use_legacy: one_hot = np.zeros(29) one_hot[bomb_num[0] + bomb_num[1]] = 1 else: one_hot = np.zeros(56) # 14 + 15 + 27 one_hot[bomb_num[0]] = 1 one_hot[14 + bomb_num[1]] = 1 one_hot[29 + bomb_num[2]] = 1 return one_hot def _load_model(position, model_dir, use_onnx): if not use_onnx or not os.path.isfile(os.path.join(model_dir, position+'.onnx')) : from models import model_dict_new model = model_dict_new[position]() model_state_dict = model.state_dict() model_path = os.path.join(model_dir, position+'.ckpt') if torch.cuda.is_available(): pretrained = torch.load(model_path, map_location='cuda:0') else: pretrained = torch.load(model_path, map_location='cpu') pretrained = {k: v for k, v in pretrained.items() if k in model_state_dict} model_state_dict.update(pretrained) model.load_state_dict(model_state_dict) if torch.cuda.is_available(): model.cuda() model.eval() if use_onnx: z = torch.randn(1, 5, 162, requires_grad=True) if position == 'landlord': x = torch.randn(1, 373, requires_grad=True) else: x = torch.randn(1, 484, requires_grad=True) torch.onnx.export(model, (z,x), os.path.join(model_dir, position+'.onnx'), export_params=True, opset_version=10, do_constant_folding=True, input_names = ['z', 'x'], output_names = ['y'], dynamic_axes={'z' : {0 : 'batch_size'}, 'x' : {0 : 'batch_size'}, 'y' : {0 : 'batch_size'}}) if use_onnx: import onnxruntime model = onnxruntime.InferenceSession(os.path.join(model_dir, position+'.onnx')) return model def _process_action_seq(sequence, length=20, new_model=True): """ A utility function encoding historical moves. We encode 20 moves. If there is no 20 moves, we pad with zeros. """ sequence = sequence[-length:].copy() if new_model: sequence = sequence[::-1] if len(sequence) < length: empty_sequence = [[] for _ in range(length - len(sequence))] empty_sequence.extend(sequence) sequence = empty_sequence return sequence class DeepAgent: def __init__(self, position, model_dir, use_onnx=False): self.model = _load_model(position, model_dir, use_onnx) self.use_onnx = use_onnx def cards2array(self, list_cards): if len(list_cards) == 0: return np.zeros(108, dtype=np.int8) matrix = np.zeros([8, 13], dtype=np.int8) jokers = np.zeros(4, dtype=np.int8) counter = Counter(list_cards) for card, num_times in counter.items(): if card < 20: matrix[:, Card2Column[card]] = NumOnes2Array[num_times] elif card == 20: jokers[0] = 1 if num_times == 2: jokers[1] = 1 elif card == 30: jokers[2] = 1 if num_times == 2: jokers[3] = 1 return np.concatenate((matrix.flatten('F'), jokers)) def get_one_hot_array(self, num_left_cards, max_num_cards): one_hot = np.zeros(max_num_cards, dtype=np.float32) one_hot[num_left_cards - 1] = 1 return one_hot def action_seq_list2array(self, action_seq_list, new_model=True): """ A utility function to encode the historical moves. We encode the historical 20 actions. If there is no 20 actions, we pad the features with 0. Since three moves is a round in DouDizhu, we concatenate the representations for each consecutive three moves. Finally, we obtain a 5x432 matrix, which will be fed into LSTM for encoding. """ if new_model: # position_map = {"landlord": 0, "landlord_up": 1, "landlord_front": 2, "landlord_down": 3} action_seq_array = np.ones((len(action_seq_list), 108)) * -1 # Default Value -1 for not using area for row, list_cards in enumerate(action_seq_list): if list_cards != []: action_seq_array[row, :108] = self.cards2array(list_cards) else: action_seq_array = np.zeros((len(action_seq_list), 108)) for row, list_cards in enumerate(action_seq_list): if list_cards != []: action_seq_array[row, :] = self.cards2array(list_cards) action_seq_array = action_seq_array.reshape(5, 432) return action_seq_array def get_obs_general(self, infoset, position, use_legacy = False): num_legal_actions = len(infoset.legal_actions) my_handcards = self.cards2array(infoset.player_hand_cards) my_handcards_batch = np.repeat(my_handcards[np.newaxis, :], num_legal_actions, axis=0) other_handcards = self.cards2array(infoset.other_hand_cards) bid_info = np.array(infoset.bid_info).flatten() bid_info_batch = np.repeat(bid_info[np.newaxis, :], num_legal_actions, axis=0) multiply_info = np.array(infoset.multiply_info) multiply_info_batch = np.repeat(multiply_info[np.newaxis, :], num_legal_actions, axis=0) my_action_batch = np.zeros(my_handcards_batch.shape) for j, action in enumerate(infoset.legal_actions): my_action_batch[j, :] = self.cards2array(action) landlord_num_cards_left = self.get_one_hot_array( infoset.num_cards_left_dict['landlord'], 33) landlord_up_num_cards_left = self.get_one_hot_array( infoset.num_cards_left_dict['landlord_up'], 25) landlord_front_num_cards_left = self.get_one_hot_array( infoset.num_cards_left_dict['landlord_front'], 25) landlord_down_num_cards_left = self.get_one_hot_array( infoset.num_cards_left_dict['landlord_down'], 25) landlord_played_cards = self.cards2array( infoset.played_cards[0]) landlord_up_played_cards = self.cards2array( infoset.played_cards[1]) landlord_front_played_cards = self.cards2array( infoset.played_cards[2]) landlord_down_played_cards = self.cards2array( infoset.played_cards[3]) bomb_num = _get_one_hot_bomb( infoset.bomb_num, use_legacy) bomb_num_batch = np.repeat( bomb_num[np.newaxis, :], num_legal_actions, axis=0) num_cards_left = np.hstack(( landlord_num_cards_left, # 33 landlord_up_num_cards_left, # 25 landlord_front_num_cards_left, # 25 landlord_down_num_cards_left)) if use_legacy: x_batch = np.hstack(( bid_info_batch, # 20 multiply_info_batch)) # 4 x_no_action = np.hstack(( bid_info, multiply_info)) else: x_batch = np.hstack(( bomb_num_batch, # 56 bid_info_batch, # 20 multiply_info_batch)) # 4 x_no_action = np.hstack(( bomb_num, # 56 bid_info, multiply_info)) z =np.vstack(( num_cards_left, my_handcards, # 108 other_handcards, # 108 # three_landlord_cards, # 108 landlord_played_cards, # 108 landlord_up_played_cards, # 108 landlord_front_played_cards, # 108 landlord_down_played_cards, # 108 self.action_seq_list2array(_process_action_seq(infoset.card_play_action_seq, 32)) )) _z_batch = np.repeat( z[np.newaxis, :, :], num_legal_actions, axis=0) my_action_batch = my_action_batch[:,np.newaxis,:] z_batch = np.zeros([len(_z_batch),40,108],int) for i in range(0,len(_z_batch)): z_batch[i] = np.vstack((my_action_batch[i],_z_batch[i])) obs = { 'position': position, 'x_batch': x_batch.astype(np.float32), 'z_batch': z_batch.astype(np.float32), 'legal_actions': infoset.legal_actions, 'x_no_action': x_no_action.astype(np.int8), 'z': z.astype(np.int8), } return obs def act(self, infoset): obs = self.get_obs_general(infoset, infoset.player_position) z_batch = obs['z_batch'] x_batch = obs['x_batch'] if self.use_onnx: ort_inputs = {'z': z_batch, 'x': x_batch} y_pred = self.model.run(None, ort_inputs)[0] elif torch.cuda.is_available(): y_pred = self.model.forward(torch.from_numpy(z_batch).float().cuda(), torch.from_numpy(x_batch).float().cuda()) y_pred = y_pred.cpu().detach().numpy() else: y_pred = self.model.forward(torch.from_numpy(z_batch).float(), torch.from_numpy(x_batch).float())['values'] y_pred = y_pred.detach().numpy() y_pred = y_pred.flatten() #best_action_index = np.argmax(y_pred, axis=0)[0] size = min(3, len(y_pred)) best_action_index = np.argpartition(y_pred, -size)[-size:] best_action_confidence = y_pred[best_action_index] best_action = [infoset.legal_actions[index] for index in best_action_index] return best_action, best_action_confidence