|
import os |
|
import json |
|
import re |
|
from collections import Counter |
|
|
|
def target_speaker(segments): |
|
speakers = [seg.get("speaker", "UNKNOWN") for seg in segments] |
|
most_common = Counter(speakers).most_common(1) |
|
return most_common[0][0] if most_common else "UNKNOWN" |
|
|
|
def pause_feature(session_id, base_dir="session_data"): |
|
|
|
json_file = os.path.join(base_dir, session_id, f"{session_id}_transcriptionCW.json") |
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"[Error] transcriptionCW file not found: {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
tgt_speaker = target_speaker(segments) |
|
tgt_segments = [seg for seg in segments if seg.get("speaker") == tgt_speaker] |
|
|
|
total_words = 0 |
|
total_duration = 0.0 |
|
all_pauses = [] |
|
|
|
for seg in tgt_segments: |
|
words = seg.get("words", []) |
|
total_words += len(words) |
|
duration = seg.get("end", 0) - seg.get("start", 0) |
|
total_duration += duration |
|
pauses = seg.get("pauses", []) |
|
all_pauses.extend(pauses) |
|
|
|
total_pauses = len(all_pauses) |
|
pause_durations = [p["duration"] for p in all_pauses] |
|
pause_total_duration = sum(pause_durations) |
|
avg_pause_duration = sum(pause_durations) / total_pauses if total_pauses > 0 else 0 |
|
longest_pause = max(pause_durations) if pause_durations else 0 |
|
|
|
pause_density_1 = (total_pauses / total_words * 100) if total_words > 0 else 0 |
|
pause_density_2 = (total_pauses / total_duration * 60) if total_duration > 0 else 0 |
|
pause_proportion = (pause_total_duration / total_duration) * 100 if total_duration > 0 else 0 |
|
|
|
pause_feat = { |
|
"Pause Density I": round(pause_density_1, 3), |
|
"Pause Density II": round(pause_density_2, 3), |
|
"Pause Proportion": round(pause_proportion, 3), |
|
"Average Pause Duration": round(avg_pause_duration, 3), |
|
"Longest Pause Duration": round(longest_pause, 3) |
|
} |
|
|
|
if os.path.exists(feature_file): |
|
with open(feature_file, "r", encoding="utf-8") as f: |
|
feature_data = json.load(f) |
|
else: |
|
feature_data = {} |
|
|
|
feature_data.setdefault("pause", []).append(pause_feat) |
|
|
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump(feature_data, f, indent=4, ensure_ascii=False) |
|
|
|
print(f"[Done] Pause features written to {feature_file}") |
|
|
|
|
|
|
|
def get_syllable_weight(cv_pattern): |
|
weight1_patterns = {'CV', 'CVC', 'VC', 'V'} |
|
weight2_patterns = {'VCC', 'CVCC', 'CCV', 'CCVC'} |
|
if cv_pattern in weight1_patterns: |
|
return 1 |
|
elif cv_pattern in weight2_patterns: |
|
return 10 |
|
else: |
|
return 100 |
|
|
|
|
|
def syllable_feature(session_id, base_dir="session_data"): |
|
json_file = os.path.join(base_dir, session_id, f"{session_id}_transcriptionCW.json") |
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"[Error] transcriptionCW file not found: {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
tgt_speaker = target_speaker(segments) |
|
tgt_segments = [seg for seg in segments if seg.get("speaker") == tgt_speaker] |
|
|
|
total_duration = sum(seg["end"] - seg["start"] for seg in tgt_segments) |
|
syllable_types = set() |
|
total_syllables = 0 |
|
total_weight = 0 |
|
max_syllable_len = 0 |
|
word_syllable_count = {} |
|
word_counter = 0 |
|
|
|
for seg in tgt_segments: |
|
syllables = seg.get("syllables", []) |
|
for syl in syllables: |
|
cv = syl.get("CV_pattern", "") |
|
phonemes = syl.get("phonemes", []) |
|
syllable_types.add(cv) |
|
total_syllables += 1 |
|
total_weight += get_syllable_weight(cv) |
|
max_syllable_len = max(max_syllable_len, len(phonemes)) |
|
|
|
local_idx = syl.get("word_index") |
|
global_idx = word_counter + local_idx |
|
word_syllable_count[global_idx] = word_syllable_count.get(global_idx, 0) + 1 |
|
word_counter += len(seg.get("words", [])) |
|
|
|
all_words = [] |
|
for seg in tgt_segments: |
|
all_words.extend(seg.get("words", [])) |
|
total_words = len(all_words) |
|
|
|
multi_syll_2 = sum(1 for c in word_syllable_count.values() if c >= 2) |
|
multi_syll_3 = sum(1 for c in word_syllable_count.values() if c >= 3) |
|
|
|
feat = { |
|
"Number of Syllable Types": len(syllable_types), |
|
"Syllable Complexity Index": round(total_weight / total_syllables, 3) if total_syllables > 0 else 0, |
|
"Average Syllable Rate": round(total_syllables / total_duration, 3) if total_duration > 0 else 0, |
|
"Longest Syllable Length": max_syllable_len, |
|
"Proportion of Multisyllabic Words I": round((multi_syll_2 / total_words) * 100, 3) if total_words > 0 else 0, |
|
"Proportion of Multisyllabic Words II": round((multi_syll_3 / total_words) * 100, 3) if total_words > 0 else 0 |
|
} |
|
|
|
if os.path.exists(feature_file): |
|
with open(feature_file, "r", encoding="utf-8") as f: |
|
feature_data = json.load(f) |
|
else: |
|
feature_data = {} |
|
|
|
feature_data.setdefault("syllable", []).append(feat) |
|
|
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump(feature_data, f, indent=4, ensure_ascii=False) |
|
|
|
print(f"[Done] Syllable features written to {feature_file}") |
|
|
|
|
|
|
|
def get_rep_weight(length): |
|
if length == 1: |
|
return 1 |
|
elif length == 2: |
|
return 5 |
|
elif length == 3: |
|
return 10 |
|
elif length == 4: |
|
return 20 |
|
else: |
|
return 40 |
|
|
|
|
|
def repetition_feature(session_id, base_dir="session_data"): |
|
json_file = os.path.join(base_dir, session_id, f"{session_id}_transcriptionCW.json") |
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"[Error] transcriptionCW file not found: {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
tgt_speaker = target_speaker(segments) |
|
tgt_segments = [seg for seg in segments if seg.get("speaker") == tgt_speaker] |
|
|
|
total_duration = sum(seg["end"] - seg["start"] for seg in tgt_segments) |
|
total_words = sum(len(seg.get("words", [])) for seg in tgt_segments) |
|
|
|
rep_count = 0 |
|
rep_weights = 0 |
|
rep_lengths = [] |
|
rep_durations = [] |
|
|
|
for seg in tgt_segments: |
|
words = seg.get("words", []) |
|
repetitions = seg.get("repetitions", []) |
|
for rep in repetitions: |
|
indices = rep.get("words", []) |
|
if not indices: |
|
continue |
|
length = len(indices) |
|
rep_lengths.append(length) |
|
rep_weights += get_rep_weight(length) |
|
rep_count += 1 |
|
|
|
|
|
start_idx = indices[0] |
|
end_idx = indices[-1] |
|
if 0 <= start_idx < len(words) and 0 <= end_idx < len(words): |
|
start_time = words[start_idx]["start"] |
|
end_time = words[end_idx]["end"] |
|
rep_durations.append(end_time - start_time) |
|
|
|
total_rep_time = sum(rep_durations) |
|
longest_rep = max(rep_lengths) if rep_lengths else 0 |
|
avg_rep_len = sum(rep_lengths) / len(rep_lengths) if rep_lengths else 0 |
|
|
|
rep_feat = { |
|
"Word Repetition Index": round(rep_weights / rep_count, 3) if rep_count > 0 else 0, |
|
"Repetition Density I": round(rep_count / total_words * 100, 3) if total_words > 0 else 0, |
|
"Repetition Density II": round(rep_count / total_duration * 60, 3) if total_duration > 0 else 0, |
|
"Repetition Proportion": round((total_rep_time / total_duration) * 100, 3) if total_duration > 0 else 0, |
|
"Longest Repetition Length": longest_rep, |
|
"Average Repetition Length": round(avg_rep_len, 3) |
|
} |
|
|
|
if os.path.exists(feature_file): |
|
with open(feature_file, "r", encoding="utf-8") as f: |
|
feature_data = json.load(f) |
|
else: |
|
feature_data = {} |
|
|
|
feature_data.setdefault("repetition", []).append(rep_feat) |
|
|
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump(feature_data, f, indent=4, ensure_ascii=False) |
|
|
|
print(f"[Done] Repetition features written to {feature_file}") |
|
|
|
|
|
|
|
def fillerword_feature(session_id, base_dir="session_data"): |
|
json_file = os.path.join(base_dir, session_id, f"{session_id}_transcriptionCW.json") |
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"[Error] transcriptionCW file not found: {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
tgt_speaker = target_speaker(segments) |
|
tgt_segments = [seg for seg in segments if seg.get("speaker") == tgt_speaker] |
|
|
|
total_duration = sum(seg["end"] - seg["start"] for seg in tgt_segments) |
|
total_words = sum(len(seg.get("words", [])) for seg in tgt_segments) |
|
|
|
filler_total = 0 |
|
filler_durations = [] |
|
filler_intervals = [] |
|
|
|
for seg in tgt_segments: |
|
fillers = seg.get("fillerwords", []) |
|
words = seg.get("words", []) |
|
|
|
filler_total += len(fillers) |
|
filler_durations += [fw["duration"] for fw in fillers] |
|
|
|
if len(fillers) >= 2: |
|
indices = [] |
|
for fw in fillers: |
|
for i, w in enumerate(words): |
|
if abs(w.get("start", 0) - fw["start"]) < 0.01: |
|
indices.append(i) |
|
break |
|
indices.sort() |
|
intervals = [indices[i+1] - indices[i] - 1 for i in range(len(indices)-1)] |
|
if intervals: |
|
filler_intervals.append(sum(intervals) / len(intervals)) |
|
|
|
avg_duration = sum(filler_durations) / len(filler_durations) if filler_durations else 0 |
|
longest_duration = max(filler_durations) if filler_durations else 0 |
|
avg_interval = sum(filler_intervals) / len(filler_intervals) if filler_intervals else 0 |
|
|
|
feat = { |
|
"Filler Word Density I": round(filler_total / total_words * 100, 3) if total_words > 0 else 0, |
|
"Filler Word Density II": round(filler_total / total_duration * 60, 3) if total_duration > 0 else 0, |
|
"Filler Word Proportion": round((sum(filler_durations) / total_duration) * 100, 3) if total_duration > 0 else 0, |
|
"Average Filler Word Duration": round(avg_duration, 3), |
|
"Longest Filler Word Duration": round(longest_duration, 3), |
|
"Average Filler Word Interval": round(avg_interval, 3) |
|
} |
|
|
|
if os.path.exists(feature_file): |
|
with open(feature_file, "r", encoding="utf-8") as f: |
|
feature_data = json.load(f) |
|
else: |
|
feature_data = {} |
|
|
|
feature_data.setdefault("fillerword", []).append(feat) |
|
|
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump(feature_data, f, indent=4, ensure_ascii=False) |
|
|
|
print(f"[Done] Filler word features written to {feature_file}") |
|
|
|
|
|
def plm_feature(session_id, base_dir="session_data"): |
|
|
|
|
|
json_file = os.path.join(base_dir, session_id, f"{session_id}_transcriptionCW.json") |
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"[Error] transcriptionCW file not found: {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
tgt_speaker = target_speaker(segments) |
|
tgt_segments = [seg for seg in segments if seg.get("speaker") == tgt_speaker] |
|
|
|
sequence = ''.join(seg.get("mispronunciation", "") for seg in tgt_segments) |
|
sequence = re.sub(r"[^CE]", "", sequence.upper()) |
|
|
|
n = len(sequence) |
|
if n == 0: |
|
feat = { |
|
"Mispronunciation Density": 0, |
|
"Normalized Transition Count": 0, |
|
"Average Common Correct": 0, |
|
"Average Common Error": 0, |
|
"Longest Common Correct": 0, |
|
"Longest Common Error": 0 |
|
} |
|
else: |
|
transitions = sum(1 for i in range(1, n) if sequence[i] != sequence[i - 1]) |
|
MPD = sum(1 for ch in sequence if ch == 'E') / n |
|
NTC = transitions / n |
|
|
|
def run_lengths(s, ch): |
|
return [len(g) for g in re.findall(f"{ch}+", s)] |
|
|
|
c_runs = run_lengths(sequence, 'C') |
|
e_runs = run_lengths(sequence, 'E') |
|
|
|
ACC = sum(c_runs) / len(c_runs) if c_runs else 0 |
|
ACE = sum(e_runs) / len(e_runs) if e_runs else 0 |
|
LCC = max(c_runs) if c_runs else 0 |
|
LCE = max(e_runs) if e_runs else 0 |
|
|
|
feat = { |
|
"Mispronunciation Density": round(MPD, 3), |
|
"Normalized Transition Count": round(NTC, 3), |
|
"Average Common Correct": round(ACC, 3), |
|
"Average Common Error": round(ACE, 3), |
|
"Longest Common Correct": LCC, |
|
"Longest Common Error": LCE |
|
} |
|
|
|
if os.path.exists(feature_file): |
|
with open(feature_file, "r", encoding="utf-8") as f: |
|
feature_data = json.load(f) |
|
else: |
|
feature_data = {} |
|
|
|
feature_data.setdefault("plm", []).append(feat) |
|
|
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump(feature_data, f, indent=4, ensure_ascii=False) |
|
|
|
print(f"[Done] PLM features written to {feature_file}") |
|
|
|
|
|
|
|
def feature_extraction(session_id, base_dir="session_data"): |
|
|
|
feature_file = os.path.join(base_dir, session_id, f"{session_id}_feature.json") |
|
|
|
if not os.path.exists(feature_file): |
|
with open(feature_file, "w", encoding="utf-8") as f: |
|
json.dump({}, f, indent=4) |
|
|
|
pause_feature(session_id, base_dir=base_dir) |
|
syllable_feature(session_id, base_dir=base_dir) |
|
repetition_feature(session_id, base_dir=base_dir) |
|
fillerword_feature(session_id, base_dir=base_dir) |
|
plm_feature(session_id, base_dir=base_dir) |
|
|
|
print(f"[All Analysis Done] Feature extraction complete for session {session_id}") |
|
|
|
|
|
if __name__ == "__main__": |
|
feature_extraction("000030") |
|
|