""" 出力ディレクトリの古いファイルをクリーンアップする機能 """ import os import time from pathlib import Path from datetime import datetime, timedelta import threading import logging # ロギング設定 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FileCleanupManager: """定期的に古いファイルを削除するマネージャー""" def __init__(self, output_dir: Path, max_age_hours: float = 24, check_interval_minutes: int = 60): """ Args: output_dir: 監視対象のディレクトリ max_age_hours: ファイルを保持する最大時間(時間) check_interval_minutes: チェック間隔(分) """ self.output_dir = output_dir self.max_age_seconds = max_age_hours * 3600 self.check_interval = check_interval_minutes * 60 self._running = False self._thread = None def start(self): """クリーンアップ処理を開始""" if self._running: logger.warning("Cleanup manager is already running") return self._running = True self._thread = threading.Thread(target=self._cleanup_loop, daemon=True) self._thread.start() logger.info(f"Started cleanup manager for {self.output_dir}") def stop(self): """クリーンアップ処理を停止""" self._running = False if self._thread: self._thread.join() logger.info("Stopped cleanup manager") def _cleanup_loop(self): """定期的にクリーンアップを実行""" while self._running: try: self.cleanup_now() except Exception as e: logger.error(f"Error during cleanup: {e}") # 次のチェックまで待機 time.sleep(self.check_interval) def cleanup_now(self): """今すぐクリーンアップを実行""" if not self.output_dir.exists(): logger.warning(f"Output directory does not exist: {self.output_dir}") return current_time = time.time() deleted_count = 0 deleted_size = 0 # MP4ファイルをチェック for file_path in self.output_dir.glob("*.mp4"): try: # ファイルの最終変更時刻を取得 file_age = current_time - file_path.stat().st_mtime if file_age > self.max_age_seconds: file_size = file_path.stat().st_size file_path.unlink() deleted_count += 1 deleted_size += file_size logger.debug(f"Deleted old file: {file_path.name} (age: {file_age/3600:.1f} hours)") except Exception as e: logger.error(f"Failed to delete {file_path}: {e}") if deleted_count > 0: logger.info(f"Cleanup completed: deleted {deleted_count} files, freed {deleted_size/1024/1024:.1f} MB") def get_directory_stats(self): """ディレクトリの統計情報を取得""" if not self.output_dir.exists(): return { "total_files": 0, "total_size_mb": 0, "oldest_file_hours": None } files = list(self.output_dir.glob("*.mp4")) if not files: return { "total_files": 0, "total_size_mb": 0, "oldest_file_hours": None } total_size = sum(f.stat().st_size for f in files) current_time = time.time() oldest_age = max((current_time - f.stat().st_mtime) for f in files) return { "total_files": len(files), "total_size_mb": total_size / 1024 / 1024, "oldest_file_hours": oldest_age / 3600 } # グローバルインスタンス(アプリケーションで使用) cleanup_manager = None def initialize_cleanup(output_dir: Path, max_age_hours: float = 24): """クリーンアップマネージャーを初期化して開始""" global cleanup_manager if cleanup_manager is not None: cleanup_manager.stop() cleanup_manager = FileCleanupManager(output_dir, max_age_hours) cleanup_manager.start() return cleanup_manager def get_cleanup_status(): """クリーンアップの状態を取得""" if cleanup_manager is None: return "Cleanup manager not initialized" stats = cleanup_manager.get_directory_stats() return f""" クリーンアップ状態: - ファイル数: {stats['total_files']} - 合計サイズ: {stats['total_size_mb']:.1f} MB - 最古のファイル: {stats['oldest_file_hours']:.1f} 時間前 (最古のファイルがある場合) """ if __name__ == "__main__": # テスト実行 output_dir = Path("./output") output_dir.mkdir(exist_ok=True) # テスト用のダミーファイルを作成 test_file = output_dir / "test.mp4" test_file.write_text("dummy content") # 古いファイルをシミュレート(2日前) old_time = time.time() - (48 * 3600) os.utime(test_file, (old_time, old_time)) # クリーンアップマネージャーを初期化 manager = FileCleanupManager(output_dir, max_age_hours=24) # 即座にクリーンアップを実行 print("Before cleanup:", manager.get_directory_stats()) manager.cleanup_now() print("After cleanup:", manager.get_directory_stats())