From ce3346125951cb40028552442022e645ad65a270 Mon Sep 17 00:00:00 2001 From: Nitay Megides Date: Fri, 27 Jul 2018 04:31:32 +0300 Subject: [PATCH 1/5] Added an option to return immediately from run() so you could process the output of ffmpeg while it's processing --- README.md | 21 +++++++++++++++++++++ ffmpeg/_run.py | 22 +++++++++++++++------- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 56e378f..18ac9fc 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,27 @@ import ffmpeg ) ``` +You could also use pipes to direct the output of ffmpeg to a socket, a file or for processing in python +```python +import ffmpeg +out, err, subproc = ( + ffmpeg + .input('rtsp://%s:8554/default') + .output('-', format='h264') + # wait=False means run() returns immediately and does not wait for ffmpeg process to end + .run(capture_stdout=True, wait=False) +) + +packet_size = 4096 +while subproc.poll() is None: + out_packet = out.read(packet_size) + try: + tcp_socket.send(out_packet) + except socket.error as e: + return +``` + + ## Complex filter graphs FFmpeg is extremely powerful, but its command-line interface gets really complicated really quickly - especially when working with signal graphs and doing anything more than trivial. diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 87e44ff..8aabe5e 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -186,7 +186,7 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False): @output_operator() def run( stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None, - quiet=False, overwrite_output=False): + quiet=False, overwrite_output=False, wait=True): """Ivoke ffmpeg for the supplied node graph. Args: @@ -196,21 +196,29 @@ def run( quiet: shorthand for setting ``capture_stdout`` and ``capture_stderr``. input: text to be sent to stdin (to be used with ``pipe:`` ffmpeg inputs) + wait: wait until the ffmpeg process is done before returning. If you're using pipes, you might + want this False **kwargs: keyword-arguments passed to ``get_args()`` (e.g. ``overwrite_output=True``). - Returns: (out, err) tuple containing captured stdout and stderr data. + Returns: If wait is True (default), this function returns (out, err) tuple containing captured stdout and stderr data. + If wait is False, this function returns (out, err, popen) tuple containing the subprocess handle as well """ args = compile(stream_spec, cmd, overwrite_output=overwrite_output) stdin_stream = subprocess.PIPE if input else None stdout_stream = subprocess.PIPE if capture_stdout or quiet else None stderr_stream = subprocess.PIPE if capture_stderr or quiet else None p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) - out, err = p.communicate(input) - retcode = p.poll() - if retcode: - raise Error('ffmpeg', out, err) - return out, err + if wait: + out, err = p.communicate(input) + retcode = p.poll() + if retcode: + raise Error('ffmpeg', out, err) + return out, err + else: + out = p.stdout + err = p.stderr + return out, err, p __all__ = [ From 462e34bab3b519dd99005c155386faf783f43f26 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Sun, 25 Nov 2018 21:32:04 -0600 Subject: [PATCH 2/5] Add run_async operator --- examples/README.md | 46 ++++++++++---- examples/tensorflow_stream.py | 10 ++-- ffmpeg/_run.py | 110 +++++++++++++++++++++++++++++++--- ffmpeg/tests/test_ffmpeg.py | 23 ++++++- 4 files changed, 164 insertions(+), 25 deletions(-) diff --git a/examples/README.md b/examples/README.md index 5d919f7..463f866 100644 --- a/examples/README.md +++ b/examples/README.md @@ -132,44 +132,48 @@ out.run() - Encode output video with ffmpeg ```python -args1 = ( +process1 = ( ffmpeg .input(in_filename) .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) - .compile() + .run_async(pipe_stdout=True) ) -process1 = subprocess.Popen(args1, stdout=subprocess.PIPE) -args2 = ( +process2 = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) .output(out_filename, pix_fmt='yuv420p') .overwrite_output() - .compile() + .run_async(pipe_stdin=True() ) -process2 = subprocess.Popen(args2, stdin=subprocess.PIPE) while True: in_bytes = process1.stdout.read(width * height * 3) - in_frame ( + if not in_bytes: + break + in_frame = ( np .frombuffer(in_bytes, np.uint8) .reshape([height, width, 3]) ) # See examples/tensorflow_stream.py: - frame = deep_dream.process_frame(frame) + out_frame = deep_dream.process_frame(in_frame) process2.stdin.write( - frame + in_frame .astype(np.uint8) .tobytes() ) + +process2.stdin.close() +process1.wait() +process2.wait() ``` deep dream streaming -## [FaceTime webcam input](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py) +## [FaceTime webcam input (OS X)](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py) ```python ( @@ -179,3 +183,25 @@ while True: .run() ) ``` + +## Stream from RTSP server to TCP socket + +```python +packet_size = 4096 + +process = ( + ffmpeg + .input('rtsp://%s:8554/default') + .output('-', format='h264') + .run_async(pipe_stdout=True) +) + +while process.poll() is None: + packet = process.stdout.read(packet_size) + try: + tcp_socket.send(packet) + except socket.error: + process.stdout.close() + process.wait() + break +``` diff --git a/examples/tensorflow_stream.py b/examples/tensorflow_stream.py index 066c765..6c9c9c9 100644 --- a/examples/tensorflow_stream.py +++ b/examples/tensorflow_stream.py @@ -58,7 +58,7 @@ def start_ffmpeg_process1(in_filename): args = ( ffmpeg .input(in_filename) - .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) + .output('pipe:', format='rawvideo', pix_fmt='rgb24') .compile() ) return subprocess.Popen(args, stdout=subprocess.PIPE) @@ -113,14 +113,14 @@ def run(in_filename, out_filename, process_frame): process1 = start_ffmpeg_process1(in_filename) process2 = start_ffmpeg_process2(out_filename, width, height) while True: - frame = read_frame(process1, width, height) - if frame is None: + in_frame = read_frame(process1, width, height) + if in_frame is None: logger.info('End of input stream') break logger.debug('Processing frame') - frame = process_frame(frame) - write_frame(process2, frame) + out_frame = process_frame(in_frame) + write_frame(process2, out_frame) logger.info('Waiting for ffmpeg process1') process1.wait() diff --git a/ffmpeg/_run.py b/ffmpeg/_run.py index 87e44ff..a0189be 100644 --- a/ffmpeg/_run.py +++ b/ffmpeg/_run.py @@ -183,11 +183,100 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False): return cmd + get_args(stream_spec, overwrite_output=overwrite_output) +@output_operator() +def run_async( + stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False, + quiet=False, overwrite_output=False): + """Asynchronously invoke ffmpeg for the supplied node graph. + + Args: + pipe_stdin: if True, connect pipe to subprocess stdin (to be + used with ``pipe:`` ffmpeg inputs). + pipe_stdout: if True, connect pipe to subprocess stdout (to be + used with ``pipe:`` ffmpeg outputs). + pipe_stderr: if True, connect pipe to subprocess stderr. + quiet: shorthand for setting ``capture_stdout`` and + ``capture_stderr``. + **kwargs: keyword-arguments passed to ``get_args()`` (e.g. + ``overwrite_output=True``). + + Returns: + A `subprocess Popen`_ object representing the child process. + + Examples: + Run and stream input:: + + process = ( + ffmpeg + .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) + .output(out_filename, pix_fmt='yuv420p') + .overwrite_output() + .run_async(pipe_stdin=True) + ) + process.communicate(input=input_data) + + Run and capture output:: + + process = ( + ffmpeg + .input(in_filename) + .output('pipe':, format='rawvideo', pix_fmt='rgb24') + .run_async(pipe_stdout=True, pipe_stderr=True) + ) + out, err = process.communicate() + + Process video frame-by-frame using numpy:: + + process1 = ( + ffmpeg + .input(in_filename) + .output('pipe:', format='rawvideo', pix_fmt='rgb24') + .run_async(pipe_stdout=True) + ) + + process2 = ( + ffmpeg + .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) + .output(out_filename, pix_fmt='yuv420p') + .overwrite_output() + .run_async(pipe_stdin=True) + ) + + while True: + in_bytes = process1.stdout.read(width * height * 3) + if not in_bytes: + break + in_frame = ( + np + .frombuffer(in_bytes, np.uint8) + .reshape([height, width, 3]) + ) + out_frame = in_frame * 0.3 + process2.stdin.write( + frame + .astype(np.uint8) + .tobytes() + ) + + process2.stdin.close() + process1.wait() + process2.wait() + + .. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects + """ + args = compile(stream_spec, cmd, overwrite_output=overwrite_output) + stdin_stream = subprocess.PIPE if pipe_stdin else None + stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None + stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None + return subprocess.Popen( + args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) + + @output_operator() def run( stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None, quiet=False, overwrite_output=False): - """Ivoke ffmpeg for the supplied node graph. + """Invoke ffmpeg for the supplied node graph. Args: capture_stdout: if True, capture stdout (to be used with @@ -201,13 +290,17 @@ def run( Returns: (out, err) tuple containing captured stdout and stderr data. """ - args = compile(stream_spec, cmd, overwrite_output=overwrite_output) - stdin_stream = subprocess.PIPE if input else None - stdout_stream = subprocess.PIPE if capture_stdout or quiet else None - stderr_stream = subprocess.PIPE if capture_stderr or quiet else None - p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) - out, err = p.communicate(input) - retcode = p.poll() + process = run_async( + stream_spec, + cmd, + pipe_stdin=input is not None, + pipe_stdout=capture_stdout, + pipe_stderr=capture_stderr, + quiet=quiet, + overwrite_output=overwrite_output, + ) + out, err = process.communicate(input) + retcode = process.poll() if retcode: raise Error('ffmpeg', out, err) return out, err @@ -218,4 +311,5 @@ __all__ = [ 'Error', 'get_args', 'run', + 'run_async', ] diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index b62f2ef..4f1d62e 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -1,9 +1,9 @@ from __future__ import unicode_literals - -from builtins import str from builtins import bytes from builtins import range +from builtins import str import ffmpeg +import mock import os import pytest import random @@ -414,6 +414,25 @@ def test__compile(): assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4'] +@pytest.mark.parametrize('pipe_stdin', [True, False]) +@pytest.mark.parametrize('pipe_stdout', [True, False]) +@pytest.mark.parametrize('pipe_stderr', [True, False]) +def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr): + process__mock = mock.Mock() + popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock) + stream = _get_simple_example() + process = ffmpeg.run_async( + stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr) + assert process is process__mock + + expected_stdin = subprocess.PIPE if pipe_stdin else None + expected_stdout = subprocess.PIPE if pipe_stdout else None + expected_stderr = subprocess.PIPE if pipe_stderr else None + (args,), kwargs = popen__mock.call_args + assert args == ffmpeg.compile(stream) + assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr) + + def test__run(): stream = _get_complex_filter_example() out, err = ffmpeg.run(stream) From e5e293fca4711d15c6559ac2a4c70567d853d509 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Sun, 25 Nov 2018 21:42:43 -0600 Subject: [PATCH 3/5] Fix `mock` import for python3 --- ffmpeg/tests/test_ffmpeg.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ffmpeg/tests/test_ffmpeg.py b/ffmpeg/tests/test_ffmpeg.py index 4f1d62e..f665e8e 100644 --- a/ffmpeg/tests/test_ffmpeg.py +++ b/ffmpeg/tests/test_ffmpeg.py @@ -3,13 +3,17 @@ from builtins import bytes from builtins import range from builtins import str import ffmpeg -import mock import os import pytest import random import re import subprocess +try: + import mock # python 2 +except ImportError: + from unittest import mock # python 3 + TEST_DIR = os.path.dirname(__file__) SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data') From 413b71a4e8f52b59c873e86528a3e6fca258578c Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Sun, 25 Nov 2018 21:43:41 -0600 Subject: [PATCH 4/5] Fix RTSP/TCP socket example to use consistent indentation --- examples/README.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/README.md b/examples/README.md index 463f866..0f60371 100644 --- a/examples/README.md +++ b/examples/README.md @@ -190,17 +190,17 @@ process2.wait() packet_size = 4096 process = ( - ffmpeg - .input('rtsp://%s:8554/default') - .output('-', format='h264') - .run_async(pipe_stdout=True) + ffmpeg + .input('rtsp://%s:8554/default') + .output('-', format='h264') + .run_async(pipe_stdout=True) ) while process.poll() is None: - packet = process.stdout.read(packet_size) - try: - tcp_socket.send(packet) - except socket.error: + packet = process.stdout.read(packet_size) + try: + tcp_socket.send(packet) + except socket.error: process.stdout.close() process.wait() break From 7ed9adf483fa1713301c7f7c0106fda6abbb4158 Mon Sep 17 00:00:00 2001 From: Karl Kroening Date: Sun, 25 Nov 2018 21:50:01 -0600 Subject: [PATCH 5/5] Fix example readme typo --- examples/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/README.md b/examples/README.md index 0f60371..208c7b5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -144,7 +144,7 @@ process2 = ( .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) .output(out_filename, pix_fmt='yuv420p') .overwrite_output() - .run_async(pipe_stdin=True() + .run_async(pipe_stdin=True) ) while True: