|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
|
|
|
|
|
import json |
|
import numpy as np |
|
import sys,os |
|
from datasets import load_dataset |
|
import torch |
|
from transformers import (AutoModelForCausalLM, |
|
AutoTokenizer, |
|
BitsAndBytesConfig, |
|
TrainingArguments, |
|
pipeline, |
|
logging, |
|
TrainerCallback) |
|
from peft import LoraConfig, PeftConfig, prepare_model_for_kbit_training, get_peft_model |
|
from trl import SFTTrainer |
|
from accelerate import infer_auto_device_map,init_empty_weights |
|
import wandb |
|
from datasets import concatenate_datasets |
|
import numpy as np |
|
|
|
|
|
|
|
|
|
|
|
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), '../../')) |
|
|
|
|
|
|
|
device='cuda' |
|
np.random.seed(42) |
|
output_dir = os.path.join(os.path.dirname(__file__),'../') |
|
datapath=os.path.join(os.path.dirname(__file__),'../NL2TL-dataset/collect2') |
|
exp_name="_mid_ascii_0327_eos_2" |
|
explainer_files=['LTLexplain_0.json','LTLexplain_1.json','LTLexplain_2.json','LTLexplain_3.json'] |
|
explainer_dic={} |
|
for path in explainer_files: |
|
with open(os.path.join(datapath,path)) as f: |
|
LTLlist=json.load(f) |
|
for key in LTLlist.keys(): |
|
if isinstance(LTLlist[key],dict): |
|
if not (key in explainer_dic): |
|
explainer_dic[key]=[] |
|
explainer_dic[key].append(LTLlist[key]['translate']) |
|
sp=LTLlist[key]['explain'].split("means that") |
|
if len(sp)>1: |
|
explainer_dic[key].append(sp[1]) |
|
|
|
base_model_name = "mistralai/Mistral-7B-Instruct-v0.2" |
|
bnb_config = BitsAndBytesConfig( |
|
load_in_4bit = True, |
|
bnb_4bit_use_double_quant = False, |
|
bnb_4bit_quant_type = 'nf4', |
|
bnb_4bit_compute_dtype = getattr(torch, "float16") |
|
) |
|
bnb_config = BitsAndBytesConfig( |
|
load_in_8bit = True, |
|
|
|
|
|
|
|
|
|
) |
|
import os |
|
os.environ['CUDA_DEVICE_ORDER']='PCI_BUS_ID' |
|
|
|
device_map="auto" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dataset = load_dataset("json", data_files={"train":os.path.join(datapath,"ltl_eng_train_mid_ascii_gptAuged.jsonl"),"test":os.path.join(datapath,"ltl_eng_test_mid_ascii_gptAuged.jsonl")}) |
|
print(dataset) |
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(base_model_name) |
|
|
|
|
|
|
|
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
tokenizer.padding_side = 'right' |
|
|
|
|
|
|
|
|
|
|
|
|
|
def preprocess_function(sample,padding="max_length"): |
|
|
|
|
|
inputs=[ |
|
f"""### Instruction: |
|
translate natural description to linear temproal logic, first translate into a logical way, and then translate into linear temproal logic, pay specific attention to brackets '()' |
|
|
|
### Natural Language Task: |
|
{sample['natural'][i].strip()} |
|
|
|
### Logic Translation: |
|
{explainer_dic[sample['raw_ltl'][i].strip()][np.random.randint(0,len(explainer_dic[sample['raw_ltl'][i].strip()]))]} |
|
|
|
### linear temproal logic: |
|
{sample['raw_ltl'][i].strip()} |
|
</s>""".lower() |
|
|
|
for i in (range(len(sample['natural'])))] |
|
|
|
|
|
sample["complete_text"] = inputs |
|
return sample |
|
|
|
|
|
|
|
def preprocess_function2(sample,padding="max_length"): |
|
|
|
|
|
inputs=[ |
|
tokenizer.apply_chat_template( |
|
[ |
|
{"role": "user", "content": "translate natural description to linear temproal logic, first translate into a logical expression, and then translate into linear temproal logic, please pay specific attention to logic grammar, the natural language task is {}".format(sample['natural'][i].strip())}, |
|
{"role": "assistant", "content": "logic expression is {}, and LTL is {} .".format( |
|
explainer_dic[sample['raw_ltl'][i].strip()][np.random.randint(0,len(explainer_dic[sample['raw_ltl'][i].strip()]))], |
|
sample['raw_ltl'][i].strip() |
|
) |
|
}, |
|
|
|
|
|
|
|
|
|
|
|
],tokenize=False) |
|
|
|
for i in (range(len(sample['natural'])))] |
|
|
|
|
|
sample["complete_text"] = inputs |
|
return sample |
|
tokenized_dataset = dataset.map(preprocess_function2, batched=True) |
|
print(f"Keys of tokenized dataset: {list(tokenized_dataset['train'].features)}") |
|
|
|
|
|
|
|
|
|
|
|
class PeftSavingCallback(TrainerCallback): |
|
def on_save(self, args, state, control, **kwargs): |
|
checkpoint_path = os.path.join(args.output_dir, f"checkpoint-{state.global_step}") |
|
kwargs["model"].save_pretrained(checkpoint_path) |
|
|
|
if "pytorch_model.bin" in os.listdir(checkpoint_path): |
|
os.remove(os.path.join(checkpoint_path, "pytorch_model.bin")) |
|
callbacks = [PeftSavingCallback] |
|
|
|
peft_config = LoraConfig( |
|
lora_alpha=16, |
|
lora_dropout=0.05, |
|
r=128, |
|
bias="none", |
|
task_type="CAUSAL_LM", |
|
target_modules=["q_proj", "v_proj"] |
|
) |
|
|
|
|
|
training_arguments = TrainingArguments( |
|
output_dir=output_dir, |
|
logging_dir = os.path.join(output_dir,"logs"), |
|
per_device_train_batch_size=1, |
|
num_train_epochs=3, |
|
gradient_accumulation_steps=8, |
|
optim="paged_adamw_32bit", |
|
save_strategy='epoch', |
|
logging_steps=25, |
|
learning_rate=2e-4, |
|
weight_decay=0.001, |
|
fp16=True, |
|
bf16=False, |
|
max_grad_norm=0.3, |
|
max_steps=-1, |
|
warmup_ratio = 0.05, |
|
group_by_length=True, |
|
lr_scheduler_type="cosine", |
|
report_to="wandb", |
|
evaluation_strategy="epoch", |
|
do_eval=True, |
|
run_name = base_model_name+exp_name, |
|
disable_tqdm=False |
|
) |
|
import os |
|
output_dir = os.path.join(output_dir, "mistral7b"+exp_name+'aug1_quat8') |
|
|
|
base_model = AutoModelForCausalLM.from_pretrained( |
|
base_model_name, |
|
from_tf=bool(".ckpt" in base_model_name), |
|
quantization_config=bnb_config, |
|
device_map=device_map, |
|
trust_remote_code=True, |
|
use_auth_token=True |
|
) |
|
base_model.config.use_cache = False |
|
|
|
|
|
base_model.config.pretraining_tp = 1 |
|
|
|
base_model.gradient_checkpointing_enable() |
|
base_model = prepare_model_for_kbit_training(base_model) |
|
base_model = get_peft_model(base_model, peft_config) |
|
|
|
trainer = SFTTrainer( |
|
model=base_model, |
|
train_dataset=tokenized_dataset['train'], |
|
eval_dataset=tokenized_dataset['test'], |
|
peft_config=peft_config, |
|
dataset_text_field="complete_text", |
|
max_seq_length=512, |
|
tokenizer=tokenizer, |
|
args=training_arguments, |
|
callbacks=callbacks, |
|
packing=False, |
|
) |
|
wandb.login() |
|
trainer.train() |
|
trainer.model.save_pretrained(output_dir) |
|
|
|
tokenizer.save_pretrained(output_dir) |
|
|
|
wandb.finish() |
|
|
|
|
|
print('model dir',output_dir) |
|
from peft import AutoPeftModelForCausalLM |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
model = AutoPeftModelForCausalLM.from_pretrained(output_dir, |
|
from_tf=bool(".ckpt" in output_dir), |
|
quantization_config=bnb_config, |
|
device_map=device_map, |
|
trust_remote_code=True, |
|
use_auth_token=True |
|
) |
|
tokenizer = AutoTokenizer.from_pretrained(base_model_name, trust_remote_code=True) |
|
tokenizer.pad_token = tokenizer.eos_token |
|
print(tokenizer.default_chat_template) |
|
def evaluate_model(input_text): |
|
input_text =f"""### Instruction: |
|
translate natural description to linear temproal logic, first translate into a logical way, and then translate into linear temproal logic, pay specific attention to brackets '()' ### Natural Language Task: |
|
{input_text}""".lower() |
|
inputs = tokenizer(input_text, return_tensors="pt").to(device) |
|
print(inputs) |
|
outputs = model.generate(input_ids=inputs["input_ids"].to("cuda"), attention_mask=inputs["attention_mask"].to("cuda"), max_new_tokens=512, pad_token_id=tokenizer.eos_token_id) |
|
|
|
return tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
def evaluate_model2(input_text): |
|
messages=[ |
|
{"role": "user", "content": "translate natural description to linear temproal logic, first translate into a logical way, and then translate into linear temproal logic, pay specific attention to brackets '()', natural language task: {}".format(input_text)}, |
|
] |
|
|
|
encodeds = tokenizer.apply_chat_template(messages, return_tensors="pt").to(device) |
|
outputs = model.generate(encodeds, max_new_tokens=512) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
|
import evaluate |
|
import numpy as np |
|
from datasets import load_from_disk |
|
from tqdm import tqdm |
|
|
|
|
|
metric = evaluate.load("rouge") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import re |
|
pattern=re.compile("linear temproal logic is ([\S ]*)") |
|
predictions, references,input_sentence,output_sentence=[], [] , [], [] |
|
for idx in range(len(tokenized_dataset['test']['natural'])): |
|
|
|
nl=tokenized_dataset['test']['natural'][idx] |
|
p = evaluate_model2(nl) |
|
|
|
input_sentence.append(nl) |
|
|
|
transLTL=pattern.findall(p) |
|
print(p) |
|
if transLTL[0][-1]=='.': |
|
transLTL[0]=transLTL[0][:-1].strip() |
|
else: |
|
transLTL[0]=transLTL[0].strip() |
|
predictions.append(transLTL[0]) |
|
output_sentence.append(p) |
|
input_sentence.append(p) |
|
references.append(tokenized_dataset['test']['raw_ltl'][idx].strip()) |
|
print(input_sentence[-1],'\nout::\n',output_sentence[-1],'\npre::\n',predictions[-1],'\nref::\n',references[-1],'\n','-'*20,'\n') |
|
|
|
|
|
rogue = metric.compute(predictions=predictions, references=references, use_stemmer=True) |
|
|
|
|
|
print(f"Rogue1: {rogue['rouge1']* 100:2f}%") |
|
print(f"rouge2: {rogue['rouge2']* 100:2f}%") |
|
print(f"rougeL: {rogue['rougeL']* 100:2f}%") |
|
print(f"rougeLsum: {rogue['rougeLsum']* 100:2f}%") |
|
eval_output=np.array([input_sentence,predictions,references]).T |
|
import pandas as pd |
|
eval_output=pd.DataFrame(eval_output) |
|
pd.DataFrame.to_csv(eval_output,output_dir+'/output') |
|
|
|
exit() |
|
messages = [ |
|
{"role": "user", "content": "What is your favourite condiment?"}, |
|
{"role": "assistant", "content": "Well, I'm quite partial to a good squeeze of fresh lemon juice. It adds just the right amount of zesty flavour to whatever I'm cooking up in the kitchen!"}, |
|
{"role": "user", "content": "Do you have mayonnaise recipes?"} |
|
] |
|
|
|
encodeds = tokenizer.apply_chat_template(messages, return_tensors="pt") |
|
|
|
model_inputs = encodeds.to(device) |
|
model.to(device) |
|
|
|
generated_ids = model.generate(model_inputs, max_new_tokens=1000, do_sample=True) |
|
decoded = tokenizer.batch_decode(generated_ids) |
|
print(decoded[0]) |
|
|