diff --git a/examples/README.md b/examples/README.md index 5d919f7..208c7b5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -132,44 +132,48 @@ out.run() - Encode output video with ffmpeg ```python -args1 = ( +process1 = ( ffmpeg .input(in_filename) .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) - .compile() + .run_async(pipe_stdout=True) ) -process1 = subprocess.Popen(args1, stdout=subprocess.PIPE) -args2 = ( +process2 = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) .output(out_filename, pix_fmt='yuv420p') .overwrite_output() - .compile() + .run_async(pipe_stdin=True) ) -process2 = subprocess.Popen(args2, stdin=subprocess.PIPE) while True: in_bytes = process1.stdout.read(width * height * 3) - in_frame ( + if not in_bytes: + break + in_frame = ( np .frombuffer(in_bytes, np.uint8) .reshape([height, width, 3]) ) # See examples/tensorflow_stream.py: - frame = deep_dream.process_frame(frame) + out_frame = deep_dream.process_frame(in_frame) process2.stdin.write( - frame + in_frame .astype(np.uint8) .tobytes() ) + +process2.stdin.close() +process1.wait() +process2.wait() ``` deep dream streaming -## [FaceTime webcam input](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py) +## [FaceTime webcam input (OS X)](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py) ```python ( @@ -179,3 +183,25 @@ while True: .run() ) ``` + +## Stream from RTSP server to TCP socket + +```python +packet_size = 4096 + +process = ( + ffmpeg + .input('rtsp://%s:8554/default') + .output('-', format='h264') + .run_async(pipe_stdout=True) +) + +while process.poll() is None: + packet = process.stdout.read(packet_size) + try: + tcp_socket.send(packet) + except socket.error: + process.stdout.close() + process.wait() + break +``` diff --git a/examples/tensorflow_stream.py b/examples/tensorflow_stream.py index 066c765..6c9c9c9 100644 --- a/examples/tensorflow_stream.py +++ b/examples/tensorflow_stream.py @@ -58,7 +58,7 @@ def start_ffmpeg_process1(in_filename): args = ( ffmpeg .input(in_filename) - .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) + .output('pipe:', format='rawvideo', pix_fmt='rgb24') .compile() ) return subprocess.Popen(args, stdout=subprocess.PIPE) @@ -113,14 +113,14 @@ def run(in_filename, out_filename, process_frame): process1 = start_ffmpeg_process1(in_filename) process2 = start_ffmpeg_process2(out_filename, width, height) while True: - frame = read_frame(process1, width, height) - if frame is None: + in_frame = read_frame(process1, width, height) + if in_frame is None: logger.info('End of input stream') break logger.debug('Processing frame') - frame = process_frame(frame) - write_frame(process2, frame) + out_frame = process_frame(in_frame) + write_frame(process2, out_frame) logger.info('Waiting for ffmpeg process1') process1.wait() diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 87e44ff..a0189be 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -183,11 +183,100 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False): return cmd + get_args(stream_spec, overwrite_output=overwrite_output) +@output_operator() +def run_async( + stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False, + quiet=False, overwrite_output=False): + """Asynchronously invoke ffmpeg for the supplied node graph. + + Args: + pipe_stdin: if True, connect pipe to subprocess stdin (to be + used with ``pipe:`` ffmpeg inputs). + pipe_stdout: if True, connect pipe to subprocess stdout (to be + used with ``pipe:`` ffmpeg outputs). + pipe_stderr: if True, connect pipe to subprocess stderr. + quiet: shorthand for setting ``capture_stdout`` and + ``capture_stderr``. + **kwargs: keyword-arguments passed to ``get_args()`` (e.g. + ``overwrite_output=True``). + + Returns: + A `subprocess Popen`_ object representing the child process. + + Examples: + Run and stream input:: + + process = ( + ffmpeg + .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) + .output(out_filename, pix_fmt='yuv420p') + .overwrite_output() + .run_async(pipe_stdin=True) + ) + process.communicate(input=input_data) + + Run and capture output:: + + process = ( + ffmpeg + .input(in_filename) + .output('pipe':, format='rawvideo', pix_fmt='rgb24') + .run_async(pipe_stdout=True, pipe_stderr=True) + ) + out, err = process.communicate() + + Process video frame-by-frame using numpy:: + + process1 = ( + ffmpeg + .input(in_filename) + .output('pipe:', format='rawvideo', pix_fmt='rgb24') + .run_async(pipe_stdout=True) + ) + + process2 = ( + ffmpeg + .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) + .output(out_filename, pix_fmt='yuv420p') + .overwrite_output() + .run_async(pipe_stdin=True) + ) + + while True: + in_bytes = process1.stdout.read(width * height * 3) + if not in_bytes: + break + in_frame = ( + np + .frombuffer(in_bytes, np.uint8) + .reshape([height, width, 3]) + ) + out_frame = in_frame * 0.3 + process2.stdin.write( + frame + .astype(np.uint8) + .tobytes() + ) + + process2.stdin.close() + process1.wait() + process2.wait() + + .. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects + """ + args = compile(stream_spec, cmd, overwrite_output=overwrite_output) + stdin_stream = subprocess.PIPE if pipe_stdin else None + stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None + stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None + return subprocess.Popen( + args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) + + @output_operator() def run( stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None, quiet=False, overwrite_output=False): - """Ivoke ffmpeg for the supplied node graph. + """Invoke ffmpeg for the supplied node graph. Args: capture_stdout: if True, capture stdout (to be used with @@ -201,13 +290,17 @@ def run( Returns: (out, err) tuple containing captured stdout and stderr data. """ - args = compile(stream_spec, cmd, overwrite_output=overwrite_output) - stdin_stream = subprocess.PIPE if input else None - stdout_stream = subprocess.PIPE if capture_stdout or quiet else None - stderr_stream = subprocess.PIPE if capture_stderr or quiet else None - p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) - out, err = p.communicate(input) - retcode = p.poll() + process = run_async( + stream_spec, + cmd, + pipe_stdin=input is not None, + pipe_stdout=capture_stdout, + pipe_stderr=capture_stderr, + quiet=quiet, + overwrite_output=overwrite_output, + ) + out, err = process.communicate(input) + retcode = process.poll() if retcode: raise Error('ffmpeg', out, err) return out, err @@ -218,4 +311,5 @@ __all__ = [ 'Error', 'get_args', 'run', + 'run_async', ] diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index b62f2ef..f665e8e 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -1,8 +1,7 @@ from __future__ import unicode_literals - -from builtins import str from builtins import bytes from builtins import range +from builtins import str import ffmpeg import os import pytest @@ -10,6 +9,11 @@ import random import re import subprocess +try: + import mock # python 2 +except ImportError: + from unittest import mock # python 3 + TEST_DIR = os.path.dirname(__file__) SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data') @@ -414,6 +418,25 @@ def test__compile(): assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4'] +@pytest.mark.parametrize('pipe_stdin', [True, False]) +@pytest.mark.parametrize('pipe_stdout', [True, False]) +@pytest.mark.parametrize('pipe_stderr', [True, False]) +def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr): + process__mock = mock.Mock() + popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock) + stream = _get_simple_example() + process = ffmpeg.run_async( + stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr) + assert process is process__mock + + expected_stdin = subprocess.PIPE if pipe_stdin else None + expected_stdout = subprocess.PIPE if pipe_stdout else None + expected_stderr = subprocess.PIPE if pipe_stderr else None + (args,), kwargs = popen__mock.call_args + assert args == ffmpeg.compile(stream) + assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr) + + def test__run(): stream = _get_complex_filter_example() out, err = ffmpeg.run(stream)