import json import torch import numpy as np from tqdm import tqdm from collections import namedtuple from typing import List, Tuple, Dict from transformers import AutoModelForCausalLM, AutoTokenizer from pypinyin import pinyin, Style BeamEntry = namedtuple('BeamEntry', ['sequence', 'log_prob', 'position']) def is_pinyin(syllable): """Check if a syllable is a valid pinyin syllable""" try: syllable.encode('ascii') except UnicodeEncodeError: return False return True class CiJiangRhymer: def __init__(self, strict=True, tone=True, heteronym=False): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self._load_model() self._load_rules() self.tone = tone self.heteronym = heteronym if strict: self.mode = 'strict' else: self.mode = 'blurry' # Pre-compute character mappings for efficiency self._build_character_cache() def _load_model(self): model_name = "Qwen/Qwen3-0.6B-Base" # Changed to base model self.tokenizer = AutoTokenizer.from_pretrained(model_name) # Add padding token if it doesn't exist if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype="auto", device_map="auto" ) self.model.eval() # Note: torch.compile may not work with all versions, comment out if issues self.vocab = self.tokenizer.get_vocab() def _load_rules(self): with open('rules/syllable_to_yunmu.json', 'r', encoding='utf-8') as f: self.syllable_to_yunmu = json.load(f) with open('rules/rhymes.json', 'r', encoding='utf-8') as f: self.rhymes = json.load(f) def _build_character_cache(self): """Pre-compute character to pinyin mappings for all vocabulary tokens""" print("Building character cache for faster lookup...") self.char_to_pinyins = {} self.token_to_char: Dict[int, str] = {} for token_id in tqdm(range(len(self.vocab)), desc="Caching characters"): char = self.tokenizer.decode(token_id).strip() if len(char) == 1 and '\u4e00' <= char <= '\u9fff': self.token_to_char[token_id] = char # Cache pinyin for this character if not already done if char not in self.char_to_pinyins: hetero_pinyins = pinyin(char, style=Style.TONE3, heteronym=True, neutral_tone_with_five=True)[0] pinyins = pinyin(char, style=Style.TONE3, heteronym=False, neutral_tone_with_five=True)[0] self.char_to_pinyins[char] = { "hetero": hetero_pinyins, "single": pinyins } def _prefilter_tokens_by_rhyme(self, top_tokens: torch.Tensor, top_log_probs: torch.Tensor, allowed_rhymes: set, target_tone: str) -> List[Tuple[str, float, int]]: """Pre-filter tokens that match rhyming requirements using cached data""" matching_candidates = [] token_ids = top_tokens.to(torch.float32).cpu().numpy() log_probs = top_log_probs.to(torch.float32).cpu().numpy() for i, token_id in enumerate(token_ids): char = self.token_to_char.get(int(token_id)) if char is None: continue candidate_pinyins = self.char_to_pinyins[char]["hetero" if self.heteronym else "single"] for candidate_pinyin in candidate_pinyins: if len(candidate_pinyin) < 2: continue candidate_syllable, candidate_tone = candidate_pinyin[:-1], candidate_pinyin[-1] yunmu = self.syllable_to_yunmu.get(candidate_syllable) if self.tone==False: candidate_tone = target_tone # Ignore tone if not required if (yunmu in allowed_rhymes and (candidate_tone == target_tone or target_tone == '5' or candidate_tone == '5')): matching_candidates.append((char, float(log_probs[i]), int(token_id))) break return matching_candidates def _get_next_token_probabilities(self, prompt: str, num_candidates: int = 200) -> Tuple[torch.Tensor, torch.Tensor]: """Get probabilities for next token using base model""" # Simplified approach for base model - no chat formatting needed model_inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device) with torch.inference_mode(): outputs = self.model(**model_inputs) # Get logits for the next token (last position) next_token_logits = outputs.logits[0, -1, :] # Get top candidates top_k_result = next_token_logits.topk(min(num_candidates, next_token_logits.size(0))) top_tokens = top_k_result.indices top_log_probs = torch.log_softmax(next_token_logits, dim=-1)[top_tokens] return top_tokens, top_log_probs def get_rhymes(self, text_with_placeholder: str, target_rhyme: str, beam_width: int = 5, num_candidates: int = 200) -> List[Tuple[str, float]]: """ Generate rhyming text using Qwen3 base language model Args: text_with_placeholder: Text with placeholders (e.g., "恰似一江春水[M][M][M]") target_rhyme: Target rhyme pattern beam_width: Number of beams to maintain during search num_candidates: Number of top candidates to consider at each step Returns: List of (sequence, log_probability) tuples ranked by likelihood """ if is_pinyin(target_rhyme): target_rhyme_pinyin = target_rhyme.split(' ') else: target_rhyme_pinyin = [pinyin(rhyme, style=Style.TONE3, heteronym=False, neutral_tone_with_five=True)[0][0] for rhyme in target_rhyme] # print(f"Target rhyme pinyin: {target_rhyme_pinyin}") # Count placeholders to replace placeholder_count = text_with_placeholder.count('[M]') if placeholder_count != len(target_rhyme_pinyin): print(f"Warning: Number of placeholders ({placeholder_count}) doesn't match target rhyme length ({len(target_rhyme_pinyin)})") # Initialize beam with the original sequence (remove placeholders for now) base_text = text_with_placeholder.replace('[M]', '') if len(base_text) == 0: # add some base text if empty base_text = "一个常见词汇是:" beam = [BeamEntry(sequence=base_text, log_prob=0.0, position=0)] # Process each character in the target rhyme # for i in range(len(target_rhyme_pinyin)): for i in tqdm(range(len(target_rhyme_pinyin)), desc="Generating rhymes"): new_beam = [] syl = target_rhyme_pinyin[i] syllable, tone = syl[:-1], syl[-1] allowed_rhymes = set(self.rhymes.get(self.syllable_to_yunmu.get(syllable, None), {}).get(self.mode, [])) # Process each sequence in current beam for beam_entry in beam: current_sequence = beam_entry.sequence current_log_prob = beam_entry.log_prob # Create prompt for next character (simplified for base model) prompt = current_sequence # Get next token probabilities try: top_tokens, top_log_probs = self._get_next_token_probabilities(prompt, num_candidates) except Exception as e: print(f"Error getting probabilities: {e}") continue # print(current_sequence) # Use optimized filtering matching_candidates = self._prefilter_tokens_by_rhyme( top_tokens, top_log_probs, allowed_rhymes, tone ) # print(matching_candidates) # Add matching candidates to new beam for char, log_prob_value, token_id in matching_candidates: new_sequence = current_sequence + char new_beam.append(BeamEntry( sequence=new_sequence, log_prob=current_log_prob + log_prob_value, position=i + 1 )) # Keep only top beam_width candidates if new_beam: new_beam.sort(key=lambda x: x.log_prob, reverse=True) beam = new_beam[:beam_width] else: print(f"Warning: No valid candidates found for position {i} (syllable: {syl})") break # Return final results sorted by probability if not beam: return [] final_results = [(entry.sequence, np.exp(entry.log_prob/10)) for entry in beam] final_results.sort(key=lambda x: x[1], reverse=True) return final_results # Example usage: if __name__ == "__main__": # Initialize the rhymer rhymer = CiJiangRhymer(strict=False, tone=True) # Example: Generate rhyming text base_text = "没人给你[M][M][M][M]" # target_rhyme = "摆摊算命" # Target rhyme pattern target_rhyme = "bai3 tan1 suan4 ming4" # Pinyin representation for testing results = rhymer.get_rhymes(base_text, target_rhyme, beam_width=10, num_candidates=5000) print("Generated rhyming completions:") for i, (sequence, prob) in enumerate(results): print(f"{i+1}. {sequence} (probability: {prob:.4f})")