|
import os |
|
import json |
|
|
|
def annotate_repetitions(session_id, base_dir="session_data"): |
|
|
|
session_dir = os.path.join(base_dir, session_id) |
|
json_file = os.path.join(session_dir, f"{session_id}_transcriptionCW.json") |
|
|
|
if not os.path.exists(json_file): |
|
print(f"Error: could not find {json_file}") |
|
return |
|
|
|
with open(json_file, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
segments = data.get("segments", []) |
|
for segment in segments: |
|
if "repetitions" in segment: |
|
del segment["repetitions"] |
|
|
|
words_list = segment.get("words", []) |
|
tokens = [w.get("word", "") for w in words_list] |
|
reps = [] |
|
i = 0 |
|
n = len(tokens) |
|
while i < n: |
|
found = False |
|
maxL = (n - i) // 2 |
|
for L in range(maxL, 0, -1): |
|
if tokens[i:i+L] == tokens[i+L:i+2*L]: |
|
count = 2 |
|
while i + count * L <= n and tokens[i:i+L] == tokens[i+(count-1)*L:i+count*L]: |
|
count += 1 |
|
count -= 1 |
|
|
|
rep_count = count - 1 |
|
rep_obj = { |
|
"content": " ".join(tokens[i:i+L] * rep_count), |
|
"words": list(range(i, i + rep_count * L)), |
|
"mark_location": i + rep_count * L - 1 |
|
} |
|
reps.append(rep_obj) |
|
i += count * L |
|
found = True |
|
break |
|
if not found: |
|
i += 1 |
|
segment["repetitions"] = reps |
|
|
|
with open(json_file, "w", encoding="utf-8") as f: |
|
json.dump(data, f, ensure_ascii=False, indent=4) |
|
|
|
print(f"Session {session_id} repetition annotation done: {json_file}") |
|
return data |
|
|
|
if __name__ == "__main__": |
|
annotate_repetitions("000030") |
|
|