forked from jinhojsk515/SPMM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSPMM_models_rxn.py
69 lines (61 loc) · 3.21 KB
/
SPMM_models_rxn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import copy
from xbert import BertConfig, BertForMaskedLM
import torch
from torch import nn
from torch.distributions.categorical import Categorical
class SPMM_rxn(nn.Module):
def __init__(self, cp=None, config=None,):
super().__init__()
self.text_encoder = BertForMaskedLM(config=BertConfig.from_json_file(config['bert_config_text']))
self.text_encoder2 = BertForMaskedLM(config=BertConfig.from_json_file(config['bert_config_smiles']))
# copy weights of checkpoint's SMILES encoder to text_encoder2
if cp:
checkpoint = torch.load(cp, map_location='cpu')
try:
state_dict = copy.deepcopy(checkpoint['model'])
except:
state_dict = copy.deepcopy(checkpoint['state_dict'])
for key in list(state_dict.keys()):
if 'text_encoder.' in key:
new_key = key.replace('text_encoder.', '')
state_dict[new_key] = state_dict[key]
del state_dict[key]
msg = self.text_encoder2.load_state_dict(state_dict, strict=False)
# print(msg)
del state_dict
def forward(self, text_input_ids, text_attention_mask, product_input_ids, product_attention_mask):
input_ids = product_input_ids.clone()
labels = input_ids.clone()[:, 1:]
text_embeds = self.text_encoder2.bert(text_input_ids, attention_mask=text_attention_mask, return_dict=True, mode='text').last_hidden_state
mlm_output = self.text_encoder(input_ids,
attention_mask=product_attention_mask,
encoder_hidden_states=text_embeds,
encoder_attention_mask=text_attention_mask,
return_dict=True,
is_decoder=True,
return_logits=True,
)[:, :-1, :]
loss_fct = nn.CrossEntropyLoss(ignore_index=0)
loss_mlm = loss_fct(mlm_output.permute((0, 2, 1)), labels)
return loss_mlm
def generate(self, text_embeds, text_mask, product_input, stochastic=False, k=None):
product_atts = torch.where(product_input == 0, 0, 1)
token_output = self.text_encoder(product_input,
attention_mask=product_atts,
encoder_hidden_states=text_embeds,
encoder_attention_mask=text_mask,
return_dict=True,
is_decoder=True,
return_logits=True,
)[:, -1, :] # batch*300
if k:
p = torch.softmax(token_output, dim=-1)
output = torch.topk(p, k=k, dim=-1) # batch*k
return torch.log(output.values), output.indices
if stochastic:
p = torch.softmax(token_output, dim=-1)
m = Categorical(p)
token_output = m.sample()
else:
token_output = torch.argmax(token_output, dim=-1)
return token_output.unsqueeze(1) # batch*1