import ffmpeg, typer, os, sys, json, shutil from loguru import logger logger.remove() logger.add( sys.stderr, format="{time:YYYY-MM-DD ddd HH:mm:ss} | {level} | {message}", ) app = typer.Typer(pretty_exceptions_show_locals=False) def parse_frame_name(fname: str): """return a tuple of frame_type and frame_index""" fn, fext = os.path.splitext(os.path.basename(fname)) frame_type, frame_index = fn.split("_") return frame_type, int(frame_index) def get_fps_ffmpeg(video_path: str): probe = ffmpeg.probe(video_path) # Find the first video stream video_stream = next( (stream for stream in probe["streams"] if stream["codec_type"] == "video"), None ) if video_stream is None: raise ValueError("No video stream found") # Frame rate is given as a string fraction, e.g., '30000/1001' r_frame_rate = video_stream["r_frame_rate"] num, denom = map(int, r_frame_rate.split("/")) return num / denom @app.command() def extract_keyframes_greedy( video_path: str, output_dir: str = None, threshold: float = 0.2, overwrite: bool = False, ): """ run i-frames extractions and keyframes extraction and return a list of keyframe's paths """ assert ( threshold > 0 ), f"threshold must be no negative, for i-frame extraction use extract-keyframes instead" iframes = extract_keyframes( video_path, output_dir=output_dir, threshold=0, overwrite=overwrite, append=False, ) assert type(iframes) != type(None), f"i-frames extraction failed" kframes = extract_keyframes( video_path, output_dir=output_dir, threshold=threshold, overwrite=False, append=True, ) assert type(kframes) != type(None), f"keyframes extraction failed" # remove kframes that are also iframes removed_kframes = [] for fn in kframes: fname = os.path.basename(fn) if os.path.isfile( os.path.join(os.path.dirname(fn), fname.replace("kframe_", "iframe_")) ): os.remove(fn) removed_kframes.append(fn) if len(removed_kframes) > 0: logger.warning(f"removed {len(removed_kframes)} redundant kframes") kframes = [kf for kf in kframes if kf not in removed_kframes] frames = iframes + kframes logger.success(f"extracted {len(frames)} total frames") return frames @app.command() def extract_keyframes( video_path: str, output_dir: str = None, threshold: float = 0.3, overwrite: bool = False, append: bool = False, ): """extract keyframes as images into output_dir and return a list of keyframe's paths Args: output_dir: if not provided, will be in video_name/keyframes/ """ # Create output directory if it doesn't exist output_dir = output_dir if output_dir else os.path.dirname(video_path) vname, vext = os.path.splitext(os.path.basename(video_path)) output_dir = os.path.join(output_dir, vname, "keyframes") if os.path.isdir(output_dir): if overwrite: shutil.rmtree(output_dir) logger.warning(f"removed existing data: {output_dir}") elif not append: logger.error(f"overwrite is false and data already exists!") return None os.makedirs(output_dir, exist_ok=True) # Construct the ffmpeg-python pipeline stream = ffmpeg.input(video_path) config_dict = { "vsync": "0", "frame_pts": "true", } if threshold: # always add in the first frame by default filter_value = f"eq(n,0)+gt(scene,{threshold})" frame_name = "kframe" logger.info(f"Extracting Scene-changing frames with {filter_value}") else: filter_value = f"eq(pict_type,I)" # config_dict["skip_frame"] = "nokey" frame_name = "iframe" logger.info(f"Extracting I-Frames since no threshold provided: {filter_value}") stream = ffmpeg.filter(stream, "select", filter_value) stream = ffmpeg.output(stream, f"{output_dir}/{frame_name}_%d.jpg", **config_dict) # Execute the ffmpeg command try: ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) frames = [ os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.endswith(".jpg") and frame_name in f ] logger.success(f"{len(frames)} {frame_name} extracted to {output_dir}") return frames except ffmpeg.Error as e: logger.error(f"Error executing FFmpeg command: {e.stderr.decode()}") return None @app.command() def extract_audio(video_path: str, output_dir: str = None, overwrite: bool = False): """extracting audio of a video file into m4a without re-encoding ref: https://www.baeldung.com/linux/ffmpeg-audio-from-video#1-extracting-audio-without-re-encoding """ # Create output directory if it doesn't exist output_dir = output_dir if output_dir else os.path.dirname(video_path) vname, vext = os.path.splitext(os.path.basename(video_path)) output_dir = os.path.join(output_dir, vname) output_fname = os.path.join(output_dir, vname + ".m4a") if os.path.isfile(output_fname): if overwrite: os.remove(output_fname) logger.warning(f"removed existing data: {output_fname}") else: logger.error(f"overwrite is false and data already exists!") return None os.makedirs(output_dir, exist_ok=True) # Construct the ffmpeg-python pipeline stream = ffmpeg.input(video_path) config_dict = {"map": "0:a", "acodec": "copy"} stream = ffmpeg.output(stream, output_fname, **config_dict) # Execute the ffmpeg command try: ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) logger.success(f"audio extracted to {output_fname}") return output_fname except ffmpeg.Error as e: logger.error(f"Error executing FFmpeg command: {e.stderr.decode()}") return None @app.command() def extract_frames( video_path: str, output_dir: str = None, fps: int = None, every_x: int = None, overwrite: bool = False, append: bool = False, im_name_pattern: str = "frame_%05d.jpg", ): """extract frames as images into output_dir and return the list of frames' paths Args: output_dir: if not provided, will be in video_name/keyframes/ """ # Create output directory if it doesn't exist vname, vext = os.path.splitext(os.path.basename(video_path)) output_dir = output_dir if output_dir else os.path.dirname(video_path) output_dir = os.path.join(output_dir, vname, "keyframes") if os.path.isdir(output_dir): if overwrite: shutil.rmtree(output_dir) logger.warning(f"removed existing data: {output_dir}") elif not append: logger.error(f"overwrite is false and data already exists in {output_dir}!") return None os.makedirs(output_dir, exist_ok=True) # Construct the ffmpeg-python pipeline stream = ffmpeg.input(video_path) config_dict = { "vsync": 0, # preserves the original timestamps "frame_pts": 1, # set output file's %d to the frame's PTS } if fps: # check FPS vid_fps = get_fps_ffmpeg(video_path) fps = min(vid_fps, fps) logger.info(f"{vname}{vext} FPS: {vid_fps}, extraction FPS: {fps}") config_dict["vf"] = f"fps={fps}" elif every_x: config_dict["vf"] = f"select=not(mod(n\,{every_x}))" logger.info( f"Extracting Frames into {output_dir} with these configs: \n{config_dict}" ) stream = ffmpeg.output(stream, f"{output_dir}/{im_name_pattern}", **config_dict) # Execute the ffmpeg command try: ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) frames = [ os.path.join(output_dir, f) for f in os.listdir(output_dir) if f.endswith(".jpg") ] logger.success(f"{len(frames)} frames extracted to {output_dir}") return frames except ffmpeg.Error as e: logger.error(f"Error executing FFmpeg command: {e.stderr.decode()}") return None if __name__ == "__main__": app()