PythonからFFmpegを呼び出す最もシンプルな方法は subprocess.run(["ffmpeg", "-i", "input.mov", "-c:v", "libx264", "output.mp4"]) だ。バッチ処理なら pathlib.glob() でファイルを列挙し、tqdm で進捗表示、concurrent.futures で並列エンコードまで組み合わせられる。
動画の変換作業を手作業でやっているなら、Pythonで自動化する価値は十分ある。100本のMP4をまとめてWebM変換したり、解像度を一括で変更したりするのも、スクリプト1本で片付けられる。一度セットアップすると手作業には戻れなくなる。
この記事では、PythonからFFmpegを呼び出す方法を基礎から説明する。subprocess で直接叩く方法から、ffmpeg-python ライブラリを使った書き方、フォルダ内を一括変換するスクリプト、進捗表示、並列エンコード、エラーハンドリングまで順番に解説する。
なぜPythonでFFmpegを使うのか
FFmpegはCLIツールとして完成度が高く、単体でも強力だ。でも、ファイルが大量にあったり、処理の組み合わせが複雑になってくると、シェルスクリプトだけでは辛くなってくる。
Pythonを組み合わせることで、次のことが楽になる。
- ファイルの列挙と条件フィルタリング: 特定の拡張子、特定サイズ以上のファイルだけを対象にする
- エラーハンドリングとリトライ: 失敗したファイルを記録して再実行する
- 進捗表示: 何本中何本が終わったかをリアルタイムで把握する
- ログ出力: 変換結果を後からレビューできる形で保存する
シェルスクリプトでもできなくはないが、Pythonのほうがロジックが複雑になったときに読みやすく保ちやすい。FFmpegの公式ドキュメントがCLIオプションをカバーし、Pythonがそのオーケストレーションを担当する形だ。
subprocessで基本を押さえる
PythonからFFmpegを呼び出す一番シンプルな方法は subprocess.run() を使うことだ。FFmpegはCLIツールなので、コマンドライン引数をリストで渡すだけで動く。
import subprocess
from pathlib import Path
def convert_to_mp4(input_path: Path, output_path: Path) -> bool:
"""MP4に変換する。成功したらTrue、失敗したらFalseを返す。"""
cmd = [
"ffmpeg",
"-i", str(input_path),
"-c:v", "libx264",
"-crf", "23",
"-preset", "medium",
"-c:a", "aac",
"-b:a", "128k",
"-y", # 上書き確認なし
str(output_path),
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.returncode == 0
if __name__ == "__main__":
src = Path("input.mov")
dst = Path("output.mp4")
if convert_to_mp4(src, dst):
print(f"変換成功: {dst}")
else:
print("変換失敗")
いくつかポイントを説明する。
"-y"フラグで出力ファイルが既存でも上書きする。バッチ処理では基本的に付けておくstdout=subprocess.PIPEとstderr=subprocess.PIPEで出力をキャプチャする。これがないとターミナルにFFmpegの大量ログが流れる- FFmpegはログを
stderrに出力する(正常時も)。エラー情報を取り出したい場合はresult.stderr.decode()を参照する
ffmpeg-pythonライブラリを使う
subprocess で直接叩く方法はシンプルだが、オプションが増えるとコマンドリストが長くなって管理しにくくなる。そこで ffmpeg-python ライブラリを使う方法がある。
インストールはこれだけだ。
# pip install ffmpeg-python
使い方はメソッドチェーンでFFmpegのパイプラインを組み立てる形になる。
import ffmpeg
def transcode(input_path: str, output_path: str) -> None:
"""ffmpeg-pythonを使ったトランスコード。"""
(
ffmpeg
.input(input_path)
.output(
output_path,
vcodec="libx264",
crf=23,
preset="medium",
acodec="aac",
audio_bitrate="128k",
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
if __name__ == "__main__":
transcode("input.mov", "output.mp4")
エラーが発生した場合は ffmpeg.Error が投げられる。e.stderr にFFmpegの標準エラー出力が入っているので、デバッグに使える。
import ffmpeg
def transcode_safe(input_path: str, output_path: str) -> None:
try:
(
ffmpeg
.input(input_path)
.output(output_path, vcodec="libx264", crf=23)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as e:
print("FFmpegエラー:")
print(e.stderr.decode())
raise
フォルダ内の動画を一括変換する
ここからが本番だ。フォルダ内にある全MOVファイルをMP4に変換するスクリプトを組む。pathlib の glob() でファイルを列挙して、順番に変換する。数百本規模のバッチ処理をローカル PC で回すと数時間占有されることもあるので、さくらのVPS のようなリモートサーバーで実行する選択肢も覚えておくといい。
import subprocess
from pathlib import Path
from typing import List
INPUT_DIR = Path("./originals")
OUTPUT_DIR = Path("./converted")
INPUT_EXT = ".mov"
OUTPUT_EXT = ".mp4"
def convert_file(src: Path, dst: Path) -> bool:
"""1ファイルをMP4に変換する。"""
dst.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg",
"-i", str(src),
"-c:v", "libx264",
"-crf", "23",
"-preset", "fast",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "+faststart", # Web再生向け最適化
"-y",
str(dst),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.returncode == 0
def batch_convert(input_dir: Path, output_dir: Path) -> None:
"""input_dir内の全INPUT_EXTファイルをOUTPUT_EXTに変換する。"""
files = list(input_dir.glob(f"**/*{INPUT_EXT}"))
if not files:
print(f"{input_dir} に {INPUT_EXT} ファイルが見つかりません")
return
print(f"{len(files)} 件のファイルを変換します")
success_count = 0
fail_list: List[Path] = []
for i, src in enumerate(files, start=1):
# 入力ディレクトリからの相対パスを保ちながら出力先を決める
relative = src.relative_to(input_dir)
dst = output_dir / relative.with_suffix(OUTPUT_EXT)
print(f"[{i}/{len(files)}] {src.name} → {dst.name}", end=" ... ")
if convert_file(src, dst):
print("OK")
success_count += 1
else:
print("FAILED")
fail_list.append(src)
print(f"\n完了: {success_count}/{len(files)} 件成功")
if fail_list:
print("失敗したファイル:")
for f in fail_list:
print(f" {f}")
if __name__ == "__main__":
batch_convert(INPUT_DIR, OUTPUT_DIR)
glob の再帰パターンでサブフォルダも含めて処理する。出力側のフォルダ構造も元の構造を保ったまま作成されるので、ファイルが散在していても整理しながら変換できる。
進捗表示をつける
大量のファイルを変換するときは、あと何本残っているかがひと目でわかる進捗バーがほしくなる。tqdm を使えば1行で追加できる。
import subprocess
from pathlib import Path
from typing import List
from tqdm import tqdm
# pip install tqdm
def convert_file(src: Path, dst: Path) -> bool:
dst.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg", "-i", str(src),
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-y", str(dst),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return result.returncode == 0
def batch_convert_with_progress(input_dir: Path, output_dir: Path) -> None:
files = list(input_dir.glob("**/*.mov"))
fail_list: List[Path] = []
with tqdm(total=len(files), unit="file", ncols=80) as pbar:
for src in files:
relative = src.relative_to(input_dir)
dst = output_dir / relative.with_suffix(".mp4")
pbar.set_description(src.name[:30]) # ファイル名を左に表示
if not convert_file(src, dst):
fail_list.append(src)
pbar.update(1)
if fail_list:
print(f"\n失敗: {len(fail_list)} 件")
for f in fail_list:
print(f" {f}")
if __name__ == "__main__":
batch_convert_with_progress(Path("./originals"), Path("./converted"))
tqdm の set_description() でファイル名をバーの左に表示できる。長いファイル名は [:30] でカットしておくと表示が崩れない。
エラーハンドリングとリトライ
FFmpegのエラーハンドリングで厄介なのは、exit code 0でも出力ファイルが壊れている場合がある ことだ。例えば、入力ファイルの途中にデータ欠損があると、FFmpegは処理を完了して終了コード0を返すが、出力動画は再生できない。
リトライロジックと出力ファイルの簡易検証を組み合わせたスクリプトを示す。
import subprocess
import time
from pathlib import Path
MAX_RETRIES = 3
RETRY_DELAY = 2 # 秒
def verify_output(path: Path) -> bool:
"""ffprobeで出力ファイルを検証する。"""
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
return False
# duration が取れれば最低限の整合性はある
output = result.stdout.decode().strip()
return bool(output) and float(output) > 0
def convert_with_retry(src: Path, dst: Path, retries: int = MAX_RETRIES) -> bool:
"""最大retries回リトライしながら変換する。"""
for attempt in range(1, retries + 1):
dst.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg", "-i", str(src),
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-y", str(dst),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
print(f" 試行 {attempt}/{retries}: FFmpegエラー (exit {result.returncode})")
if attempt < retries:
time.sleep(RETRY_DELAY)
continue
if not verify_output(dst):
print(f" 試行 {attempt}/{retries}: 出力ファイル検証失敗")
dst.unlink(missing_ok=True) # 壊れたファイルを削除
if attempt < retries:
time.sleep(RETRY_DELAY)
continue
return True
return False
def batch_convert_robust(input_dir: Path, output_dir: Path) -> None:
files = list(input_dir.glob("**/*.mov"))
success, failed = 0, []
for i, src in enumerate(files, start=1):
relative = src.relative_to(input_dir)
dst = output_dir / relative.with_suffix(".mp4")
print(f"[{i}/{len(files)}] {src.name}")
if convert_with_retry(src, dst):
print(f" 完了: {dst}")
success += 1
else:
print(f" スキップ({MAX_RETRIES}回失敗): {src}")
failed.append(src)
print(f"\n結果: 成功 {success} / 失敗 {len(failed)} / 合計 {len(files)}")
if failed:
log_path = output_dir / "failed.txt"
log_path.write_text("\n".join(str(f) for f in failed))
print(f"失敗リスト: {log_path}")
if __name__ == "__main__":
batch_convert_robust(Path("./originals"), Path("./converted"))
失敗したファイルは failed.txt として出力ディレクトリに保存する。後からまとめて確認・再実行できる。
FFmpegについての基本的な使い方は FFmpegの使い方チュートリアル もあわせて参照してほしい。
concurrent.futuresで並列エンコード
ここまでのスクリプトは全てファイルを1本ずつ順番に処理していた。マルチコアCPUを使っているなら、これはもったいない。Pythonの concurrent.futures モジュールを使えば、複数ファイルを同時にエンコードできる。
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
MAX_WORKERS = 4 # CPUコア数に合わせて調整
def convert_file(src: Path, dst: Path) -> tuple[Path, bool]:
"""1ファイルを変換する。(ソースパス, 成功フラグ) を返す。"""
dst.parent.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg", "-i", str(src),
"-c:v", "libx264", "-crf", "23", "-preset", "fast",
"-c:a", "aac", "-b:a", "128k",
"-y", str(dst),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return src, result.returncode == 0
def batch_convert_parallel(input_dir: Path, output_dir: Path) -> None:
files = list(input_dir.glob("**/*.mov"))
tasks: dict = {}
success, failed = 0, []
with ProcessPoolExecutor(max_workers=MAX_WORKERS) as executor:
for src in files:
relative = src.relative_to(input_dir)
dst = output_dir / relative.with_suffix(".mp4")
future = executor.submit(convert_file, src, dst)
tasks[future] = src
for future in as_completed(tasks):
src_path, ok = future.result()
if ok:
success += 1
print(f" 完了: {src_path.name}")
else:
failed.append(src_path)
print(f" 失敗: {src_path.name}")
print(f"\n結果: 成功 {success} / 失敗 {len(failed)} / 合計 {len(files)}")
if __name__ == "__main__":
batch_convert_parallel(Path("./originals"), Path("./converted"))
並列エンコードで気をつけるべき点がいくつかある。
MAX_WORKERSはCPUコア数に合わせる。 FFmpegのx264エンコーダは1プロセスで複数スレッドを使うので、ワーカーを増やしすぎるとスレッド競合でかえって遅くなる。8コアなら2〜4ワーカーが妥当な出発点だ- メモリ使用量はワーカー数に比例する。 4K素材を扱うならRAMの使用状況を監視すること
- I/Oがボトルネックになる場合がある。 特にHDDでは並列読み書きが詰まりやすい。SSDなら問題になりにくい
たとえば1080pのスクリーンレコーディング50本(各2-5分)を4ワーカーで処理すれば、シングルスレッドと比べて3〜4倍の高速化が期待できる。I/Oオーバーヘッドがあるので完全なコア数倍にはならないが、十分な効果がある。
数百〜数千本の動画をバッチ処理する場合、ローカル PC のリソースを長時間占有するのは現実的ではない。VPS にエンコード専用サーバーを構築すれば、スクリプトを SSH 経由で走らせたまま、手元の PC で他の作業を続けられる。
よくある質問
PythonからFFmpegを呼び出す一番いい方法は?
subprocess.run() でコマンドをリストとして渡す方法が最も確実で、FFmpegの全オプションをそのまま使える。ffmpeg-python ライブラリは便利だが、依存が1つ増える点はトレードオフだ。
ffmpeg-pythonは2019年から更新されていないけど大丈夫?
問題ない。最新リリースはv0.2.0(2019年)だが、FFmpeg CLIをラップしているだけなのでAPIは安定している。FFmpegのCLIインターフェースが変わらない限り(滅多に変わらない)動き続ける。より活発にメンテナンスされている代替を使いたいなら、python-ffmpeg や typed-ffmpeg もある。
個別ファイルのエンコード進捗を表示するには?
この記事ではファイル単位の進捗(N本中M本目)を表示している。1ファイル内のフレームレベルの進捗を出したい場合は、FFmpegの stderr 出力を行ごとにパースして frame= や time= フィールドを読む。better-ffmpeg-progress パッケージを使えばこれを自動化できる。
FFmpegがexit code 0なのに壊れたファイルができることはある?
ある。入力データの途中が壊れている場合や、エンコード中にディスク容量が不足した場合が典型例だ。出力ファイルは必ず ffprobe で検証すること。durationが0より大きく、期待値と一致しているかを確認する。
並列FFmpegプロセスは何個が適切?
物理CPUコア数の半分から始めるのが目安。FFmpegのx264エンコーダは1プロセスで複数スレッドを使うため、並列数を増やしすぎるとスレッド競合が起きてかえって遅くなる。CPU使用率を見ながら調整するのがベストだ。
subprocess.run()とsubprocess.Popen()はどちらを使うべき?
大半のバッチ処理では subprocess.run() で十分。FFmpegが完了するまでブロックするので、エラーハンドリングがシンプルに書ける。Popen() はstderrをリアルタイムに読みたい場合(フレームレベルの進捗パース等)に使う。
必要なPythonバージョンは?
Python 3.8以降。pathlib、subprocess.run() のキーワード引数、型ヒントを使っている。並列処理の例では tuple[Path, bool] 構文を使っているため、そこだけPython 3.9以降が必要だ。
リモートサーバーでFFmpegエンコードを実行するには?
rsync か scp でファイルをアップロードし、SSH 経由でPythonスクリプトを実行する(nohup か tmux を使えば切断しても継続する)。処理後に結果をダウンロードする。専用CPUコアのある VPSでエンコードサーバーを構築するのが理想的だ。
まとめ
この記事で解説した内容をまとめる。
subprocess.run()を使えば、PythonからFFmpegをシンプルに呼び出せる。コマンドラインと同じオプションがそのまま使えるffmpeg-pythonはメソッドチェーンでパイプラインを構築できる。複雑なフィルターグラフを扱う場合に特に有効だ- バッチ変換 は
pathlib.glob()でファイルを列挙して、フォルダ構造を保ちながら変換できる tqdmで進捗バーを1行で追加できる。長時間のバッチ処理では必須だ- エラーハンドリング では exit code だけを信頼しない。
ffprobeで出力ファイルを検証してリトライする設計が堅牢だ - 並列エンコード は
concurrent.futuresでCPUコアに比例した高速化が得られる
スクリプトを育てていくなら、argparse でCLI引数を受け付けるようにしたり、logging モジュールでファイルにログを残したりするとさらに使いやすくなる。このパターンを土台にして用途に合わせて拡張していくのがおすすめだ。
関連記事: