优化批量换脸批处理程序
v 1.0
# python multi_swap_duration_1.1.py
import os
import subprocess
from tqdm import tqdm
source_videos_dir = "/home/nvidia/data/video/HDTF/10s"
source_images_dir = "/home/nvidia/data/image/CelebA-HQ/300/0"
output_dir = source_images_dir
# Generate lists of files directly from the source directories
video_files_list = [
os.path.join(source_videos_dir, f)
for f in os.listdir(source_videos_dir)
if os.path.isfile(os.path.join(source_videos_dir, f)) and f.endswith('.mp4') and not any(char.isalpha() for char in f.split('.')[0])
]
image_files_list = [
os.path.join(source_images_dir, f)
for f in os.listdir(source_images_dir)
if os.path.isfile(os.path.join(source_images_dir, f)) and f.endswith('.jpg')
]
# Extract model identifier once
model_id = 'c'
# Calculate total pairs
total_pairs = len(video_files_list) * len(image_files_list)
# Use tqdm with a generator expression to update the progress bar dynamically
with tqdm(total=total_pairs, desc="Processing") as pbar:
for vid_file in video_files_list:
for img_file in image_files_list:
output_video = f"{os.path.splitext(os.path.basename(vid_file))[0]}_{os.path.splitext(os.path.basename(img_file))[0]}_{model_id}.mp4"
output_video_path = os.path.join(output_dir, output_video)
cmd = [
"python", "multi_face_single_source.py",
"--retina_path", "retinaface/RetinaFace-Res50.h5",
"--arcface_path", "arcface_model/ArcFace-Res50.h5",
"--facedancer_path", "model_zoo/FaceDancer_config_c_HQ.h5",
"--vid_path", vid_file,
"--swap_source", img_file,
"--output", output_video_path,
"--compare", "False",
"--sample_rate", "1",
"--length", "1",
"--align_source", "True",
"--device_id", "0"
]
subprocess.run(cmd)
pbar.update(1)
v 1.1
使用ProcessPoolExecutor来并行运行subprocess.run调用,每个任务在一个单独的进程中执行。这可以极大地提高处理速度,但要注意的是,进程间通信和上下文切换也会带来开销,所以最好根据具体任务的特性来调整max_workers参数。此外,确保你的硬件资源(如CPU核心数和内存)足以支持这种级别的并行处理。
import os
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
source_videos_dir = "/home/nvidia/data/video/HDTF/10s"
source_images_dir = "/home/nvidia/data/image/CelebA-HQ/300/0"
output_dir = source_images_dir
video_files_list = [
os.path.join(source_videos_dir, f)
for f in os.listdir(source_videos_dir)
if os.path.isfile(os.path.join(source_videos_dir, f)) and f.endswith('.mp4') and not any(char.isalpha() for char in f.split('.')[0])
]
image_files_list = [
os.path.join(source_images_dir, f)
for f in os.listdir(source_images_dir)
if os.path.isfile(os.path.join(source_images_dir, f)) and f.endswith('.jpg')
]
model_id = 'c'
# Create a list of tasks
tasks = []
for vid_file in video_files_list:
for img_file in image_files_list:
output_video = f"{os.path.splitext(os.path.basename(vid_file))[0]}_{os.path.splitext(os.path.basename(img_file))[0]}_{model_id}.mp4"
output_video_path = os.path.join(output_dir, output_video)
cmd = [
"python", "multi_face_single_source.py",
"--retina_path", "retinaface/RetinaFace-Res50.h5",
"--arcface_path", "arcface_model/ArcFace-Res50.h5",
"--facedancer_path", "model_zoo/FaceDancer_config_c_HQ.h5",
"--vid_path", vid_file,
"--swap_source", img_file,
"--output", output_video_path,
"--compare", "False",
"--sample_rate", "1",
"--length", "1",
"--align_source", "True",
"--device_id", "0"
]
tasks.append(cmd)
# Use a process pool to execute tasks concurrently
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = {executor.submit(subprocess.run, task): task for task in tasks}
for future in tqdm(as_completed(futures), total=len(tasks), desc="Processing"):
future.result()
v 1.2
确保每次只分配一个任务到空闲的GPU上。为此,我们需要引入一个锁机制来同步GPU的使用状态,确保在任何时刻,每个GPU上只有一个任务在运行。
我们为每个GPU创建了一个锁。在worker函数中,我们使用with lock:语句来确保在执行任务之前锁定GPU。这意味着在任何时刻,每个GPU上只会有一个任务在运行。同时,通过在构建命令时使用gpu_id % 2来交替选择GPU,我们确保了任务会在两个GPU之间均匀分布。
import os
import subprocess
import queue
from multiprocessing import Process, Lock, Queue
from tqdm import tqdm
# Locks for each GPU to ensure only one task runs at a time per GPU
gpu_locks = [Lock(), Lock()]
def worker(gpu_id, lock, task_queue):
while True:
try:
# Acquire the lock for this GPU before getting a task
with lock:
# Get a task from the queue
cmd = task_queue.get(timeout=1)
except queue.Empty:
# If no more tasks, exit the worker
break
# Run the subprocess
subprocess.run(cmd)
# Notify the queue that the task is complete
task_queue.task_done()
def main():
source_videos_dir = "/home/nvidia/data/video/HDTF/10s"
source_images_dir = "/home/nvidia/data/image/CelebA-HQ/300/0"
output_dir = source_images_dir
video_files_list = [
os.path.join(source_videos_dir, f)
for f in os.listdir(source_videos_dir)
if os.path.isfile(os.path.join(source_videos_dir, f)) and f.endswith('.mp4') and not any(char.isalpha() for char in f.split('.')[0])
]
image_files_list = [
os.path.join(source_images_dir, f)
for f in os.listdir(source_images_dir)
if os.path.isfile(os.path.join(source_images_dir, f)) and f.endswith('.jpg')
]
model_id = 'c'
# Create a task queue
task_queue = Queue()
# Fill the task queue
for vid_file in video_files_list:
for img_file in image_files_list:
output_video = f"{os.path.splitext(os.path.basename(vid_file))[0]}_{os.path.splitext(os.path.basename(img_file))[0]}_{model_id}.mp4"
output_video_path = os.path.join(output_dir, output_video)
cmd = [
"python", "multi_face_single_source.py",
"--retina_path", "retinaface/RetinaFace-Res50.h5",
"--arcface_path", "arcface_model/ArcFace-Res50.h5",
"--facedancer_path", "model_zoo/FaceDancer_config_c_HQ.h5",
"--vid_path", vid_file,
"--swap_source", img_file,
"--output", output_video_path,
"--compare", "False",
"--sample_rate", "1",
"--length", "1",
"--align_source", "True",
"--device_id", str(gpu_id % 2) # Alternate between GPUs
]
task_queue.put(cmd)
# Create worker processes for each GPU
workers = []
for gpu_id in range(2): # Assuming you have 2 GPUs
p = Process(target=worker, args=(gpu_id, gpu_locks[gpu_id], task_queue))
p.start()
workers.append(p)
# Wait for all tasks to be processed
task_queue.join()
# Terminate the workers
for _ in workers:
task_queue.put(None) # Signal the workers to terminate
for p in workers:
p.join()
if __name__ == '__main__':
main()
v 1.3
为了避免某个GPU闲置过久,采用一种更动态的调度策略,即当一个GPU上的任务完成时,立刻为其分配下一个任务。这通常涉及到使用一个共享的队列和锁机制,以确保任务的正确分配和同步。
import os
import subprocess
import queue
from multiprocessing import Process, Lock, Queue
from tqdm import tqdm
# Locks for each GPU to ensure only one task runs at a time per GPU
gpu_locks = [Lock(), Lock()]
# A shared queue for all tasks
task_queue = Queue()
def worker(gpu_id, lock):
while True:
# Acquire the lock for this GPU
with lock:
# Check if there are more tasks
try:
cmd = task_queue.get_nowait()
except queue.Empty:
# No more tasks available, exit the worker
break
# Update the progress bar outside the lock to avoid contention
tqdm.write(f"GPU {gpu_id} starting task: {' '.join(cmd)}")
# Run the subprocess
subprocess.run(cmd)
# Notify the queue that the task is complete
task_queue.task_done()
def main():
source_videos_dir = "/home/nvidia/data/video/HDTF/10s"
source_images_dir = "/home/nvidia/data/image/CelebA-HQ/300/0"
output_dir = source_images_dir
video_files_list = [
os.path.join(source_videos_dir, f)
for f in os.listdir(source_videos_dir)
if os.path.isfile(os.path.join(source_videos_dir, f)) and f.endswith('.mp4') and not any(char.isalpha() for char in f.split('.')[0])
]
image_files_list = [
os.path.join(source_images_dir, f)
for f in os.listdir(source_images_dir)
if os.path.isfile(os.path.join(source_images_dir, f)) and f.endswith('.jpg')
]
model_id = 'c'
# Fill the task queue
for vid_file in video_files_list:
for img_file in image_files_list:
output_video = f"{os.path.splitext(os.path.basename(vid_file))[0]}_{os.path.splitext(os.path.basename(img_file))[0]}_{model_id}.mp4"
output_video_path = os.path.join(output_dir, output_video)
cmd = [
"python", "multi_face_single_source.py",
"--retina_path", "retinaface/RetinaFace-Res50.h5",
"--arcface_path", "arcface_model/ArcFace-Res50.h5",
"--facedancer_path", "model_zoo/FaceDancer_config_c_HQ.h5",
"--vid_path", vid_file,
"--swap_source", img_file,
"--output", output_video_path,
"--compare", "False",
"--sample_rate", "1",
"--length", "1",
"--align_source", "True",
"--device_id", str(gpu_id % 2) # Alternate between GPUs
]
task_queue.put(cmd)
# Create worker processes for each GPU
workers = []
for gpu_id in range(2): # Assuming you have 2 GPUs
p = Process(target=worker, args=(gpu_id, gpu_locks[gpu_id]))
p.start()
workers.append(p)
# Wait for all tasks to be processed
task_queue.join()
# Terminate the workers
for p in workers:
p.terminate()
if __name__ == '__main__':
main()
worker函数使用get_nowait()方法尝试从队列中获取任务,而不是阻塞等待。这样,一旦一个GPU完成当前任务,它就会立即尝试获取队列中的下一个任务,而无需等待其他GPU完成它们的任务。这有助于最大化GPU的利用率,减少闲置时间。
然而,使用terminate()来结束进程可能不是最优雅的方式,因为这可能会导致一些资源没有得到适当的清理。一个更好的做法是在所有任务完成后,向队列中放入一个特殊标记(如None),让worker函数能够识别到没有更多任务可做,并自行退出。以下是修改后的终止部分:
# Wait for all tasks to be processed
task_queue.join()
# Signal workers to exit by adding None to the queue
for _ in workers:
task_queue.put(None)
# Wait for all workers to finish
for p in workers:
p.join()
这样,当worker函数检测到队列中的None值时,它会知道没有更多的任务需要处理,并且可以安全地退出。这确保了所有资源都得到了适当的释放,同时保持了代码的整洁性和效率。
原文地址:https://blog.csdn.net/Ppandaer/article/details/140682225
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!