| import cv2 |
| import os |
| import shutil |
| import time |
| from SinglePhoto import FaceSwapper |
|
|
| def extract_frames(video_path, frames_dir): |
| if not os.path.exists(frames_dir): |
| os.makedirs(frames_dir) |
| last_idx = -1 |
| else: |
| |
| existing = [f for f in os.listdir(frames_dir) if f.startswith("frame_") and f.endswith(".jpg")] |
| if existing: |
| last_idx = max([int(f.split("_")[1].split(".")[0]) for f in existing]) |
| else: |
| last_idx = -1 |
|
|
| cap = cv2.VideoCapture(video_path) |
| frame_paths = [] |
| idx = 0 |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| frame_path = os.path.join(frames_dir, f"frame_{idx:05d}.jpg") |
| if idx > last_idx: |
| cv2.imwrite(frame_path, frame) |
| frame_paths.append(frame_path) |
| idx += 1 |
| cap.release() |
| return frame_paths |
|
|
| def frames_to_video(frames_dir, output_video_path, fps): |
| frames = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
| if not frames: |
| print("No frames found in directory.") |
| return |
| first_frame = cv2.imread(frames[0]) |
| height, width, layers = first_frame.shape |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| for frame_path in frames: |
| frame = cv2.imread(frame_path) |
| out.write(frame) |
| out.release() |
|
|
| def main(): |
| |
| video_path = os.path.join("VideoSwapping", "data_dst.mp4") |
| source_image_path = os.path.join("VideoSwapping", "data_src.jpg") |
| frames_dir = os.path.join("VideoSwapping", "video_frames") |
| swapped_dir = os.path.join("VideoSwapping", "swapped_frames") |
| output_video_path = os.path.join("VideoSwapping", "output_swapped_video.mp4") |
| source_face_idx = 1 |
|
|
| |
| while True: |
| try: |
| user_input = input("Enter target_face_idx (default is 1): ").strip() |
| if user_input == "": |
| dest_face_idx = 1 |
| break |
| dest_face_idx = int(user_input) |
| break |
| except ValueError: |
| print("Invalid input. Please enter an integer value.") |
|
|
| print("Choose an option:") |
| print("1. Extract frames only") |
| print("2. Face swap only (requires extracted frames)") |
| print("3. Both extract frames and face swap") |
| choice = input("Enter 1, 2, or 3: ").strip() |
|
|
| frame_paths = [] |
| if choice == "1" or choice == "3": |
| print("Extracting frames from video (resuming if needed)...") |
| frame_paths = extract_frames(video_path, frames_dir) |
| print(f"Extracted {len(frame_paths)} frames to {frames_dir}.") |
| if choice == "1": |
| return |
|
|
| if choice == "2": |
| |
| if not os.path.exists(frames_dir): |
| print("Frames directory does not exist. Please extract frames first.") |
| return |
| frame_paths = sorted([os.path.join(frames_dir, f) for f in os.listdir(frames_dir) if f.endswith('.jpg')]) |
|
|
| if choice == "2" or choice == "3": |
| |
| if not os.path.exists(swapped_dir): |
| os.makedirs(swapped_dir) |
|
|
| |
| swapper = FaceSwapper() |
|
|
| |
| print("Swapping faces on frames...") |
| start_time = time.time() |
| for idx, frame_path in enumerate(frame_paths): |
| frame_name = os.path.basename(frame_path) |
| |
| if frame_name.startswith("frame_"): |
| swapped_name = "swapped_" + frame_name[len("frame_"):] |
| else: |
| swapped_name = "swapped_" + frame_name |
| out_path = os.path.join(swapped_dir, swapped_name) |
| if os.path.exists(out_path): |
| |
| print(f"Frame {idx+1}/{len(frame_paths)} already swapped, skipping...", end='\r') |
| continue |
| try: |
| try: |
| swapped = swapper.swap_faces( |
| source_path=source_image_path, |
| source_face_idx=source_face_idx, |
| target_path=frame_path, |
| target_face_idx=dest_face_idx |
| ) |
| except ValueError as ve: |
| if "Target image contains" in str(ve): |
| print(f"\nFrame {idx}: Target face idx {dest_face_idx} not found, trying with idx 1.",end='\r') |
| swapped = swapper.swap_faces( |
| source_path=source_image_path, |
| source_face_idx=source_face_idx, |
| target_path=frame_path, |
| target_face_idx=1 |
| ) |
| else: |
| raise ve |
| cv2.imwrite(out_path, swapped) |
| except Exception as e: |
| print(f"\nFrame {idx}: {e}") |
| |
| cv2.imwrite(out_path, cv2.imread(frame_path)) |
| |
| elapsed = time.time() - start_time |
| avg_time = elapsed / (idx + 1) |
| remaining = avg_time * (len(frame_paths) - (idx + 1)) |
| mins, secs = divmod(int(remaining), 60) |
| print(f"Swapping frame {idx+1}/{len(frame_paths)} | Est. time left: {mins:02d}:{secs:02d}", end='\r') |
| print() |
|
|
| |
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| cap.release() |
|
|
| |
| print("Combining swapped frames into video...") |
| frames_to_video(swapped_dir, output_video_path, fps) |
| print(f"Done! Output video saved as {output_video_path}") |
|
|
| |
| answer = input("Do you want to keep the extracted frames and swapped images? (y/n): ").strip().lower() |
| if answer == 'n': |
| try: |
| shutil.rmtree(frames_dir) |
| shutil.rmtree(swapped_dir) |
| print("Temporary folders deleted.") |
| except Exception as e: |
| print(f"Error deleting folders: {e}") |
| else: |
| print("Temporary folders kept.") |
|
|
| if __name__ == "__main__": |
| main() |