Add run_async operator

This commit is contained in:
Karl Kroening 2018-11-25 21:32:04 -06:00
parent 4276899cea
commit 462e34bab3
4 changed files with 164 additions and 25 deletions

View File

@ -132,44 +132,48 @@ out.run()
- Encode output video with ffmpeg - Encode output video with ffmpeg
```python ```python
args1 = ( process1 = (
ffmpeg ffmpeg
.input(in_filename) .input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) .output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8)
.compile() .run_async(pipe_stdout=True)
) )
process1 = subprocess.Popen(args1, stdout=subprocess.PIPE)
args2 = ( process2 = (
ffmpeg ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p') .output(out_filename, pix_fmt='yuv420p')
.overwrite_output() .overwrite_output()
.compile() .run_async(pipe_stdin=True()
) )
process2 = subprocess.Popen(args2, stdin=subprocess.PIPE)
while True: while True:
in_bytes = process1.stdout.read(width * height * 3) in_bytes = process1.stdout.read(width * height * 3)
in_frame ( if not in_bytes:
break
in_frame = (
np np
.frombuffer(in_bytes, np.uint8) .frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3]) .reshape([height, width, 3])
) )
# See examples/tensorflow_stream.py: # See examples/tensorflow_stream.py:
frame = deep_dream.process_frame(frame) out_frame = deep_dream.process_frame(in_frame)
process2.stdin.write( process2.stdin.write(
frame in_frame
.astype(np.uint8) .astype(np.uint8)
.tobytes() .tobytes()
) )
process2.stdin.close()
process1.wait()
process2.wait()
``` ```
<img src="https://raw.githubusercontent.com/kkroening/ffmpeg-python/master/examples/graphs/dream.png" alt="deep dream streaming" width="40%" /> <img src="https://raw.githubusercontent.com/kkroening/ffmpeg-python/master/examples/graphs/dream.png" alt="deep dream streaming" width="40%" />
## [FaceTime webcam input](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py) ## [FaceTime webcam input (OS X)](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py)
```python ```python
( (
@ -179,3 +183,25 @@ while True:
.run() .run()
) )
``` ```
## Stream from RTSP server to TCP socket
```python
packet_size = 4096
process = (
ffmpeg
.input('rtsp://%s:8554/default')
.output('-', format='h264')
.run_async(pipe_stdout=True)
)
while process.poll() is None:
packet = process.stdout.read(packet_size)
try:
tcp_socket.send(packet)
except socket.error:
process.stdout.close()
process.wait()
break
```

View File

@ -58,7 +58,7 @@ def start_ffmpeg_process1(in_filename):
args = ( args = (
ffmpeg ffmpeg
.input(in_filename) .input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8) .output('pipe:', format='rawvideo', pix_fmt='rgb24')
.compile() .compile()
) )
return subprocess.Popen(args, stdout=subprocess.PIPE) return subprocess.Popen(args, stdout=subprocess.PIPE)
@ -113,14 +113,14 @@ def run(in_filename, out_filename, process_frame):
process1 = start_ffmpeg_process1(in_filename) process1 = start_ffmpeg_process1(in_filename)
process2 = start_ffmpeg_process2(out_filename, width, height) process2 = start_ffmpeg_process2(out_filename, width, height)
while True: while True:
frame = read_frame(process1, width, height) in_frame = read_frame(process1, width, height)
if frame is None: if in_frame is None:
logger.info('End of input stream') logger.info('End of input stream')
break break
logger.debug('Processing frame') logger.debug('Processing frame')
frame = process_frame(frame) out_frame = process_frame(in_frame)
write_frame(process2, frame) write_frame(process2, out_frame)
logger.info('Waiting for ffmpeg process1') logger.info('Waiting for ffmpeg process1')
process1.wait() process1.wait()

View File

@ -183,11 +183,100 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
return cmd + get_args(stream_spec, overwrite_output=overwrite_output) return cmd + get_args(stream_spec, overwrite_output=overwrite_output)
@output_operator()
def run_async(
stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False,
quiet=False, overwrite_output=False):
"""Asynchronously invoke ffmpeg for the supplied node graph.
Args:
pipe_stdin: if True, connect pipe to subprocess stdin (to be
used with ``pipe:`` ffmpeg inputs).
pipe_stdout: if True, connect pipe to subprocess stdout (to be
used with ``pipe:`` ffmpeg outputs).
pipe_stderr: if True, connect pipe to subprocess stderr.
quiet: shorthand for setting ``capture_stdout`` and
``capture_stderr``.
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
``overwrite_output=True``).
Returns:
A `subprocess Popen`_ object representing the child process.
Examples:
Run and stream input::
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
process.communicate(input=input_data)
Run and capture output::
process = (
ffmpeg
.input(in_filename)
.output('pipe':, format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate()
Process video frame-by-frame using numpy::
process1 = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True)
)
process2 = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
while True:
in_bytes = process1.stdout.read(width * height * 3)
if not in_bytes:
break
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
out_frame = in_frame * 0.3
process2.stdin.write(
frame
.astype(np.uint8)
.tobytes()
)
process2.stdin.close()
process1.wait()
process2.wait()
.. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects
"""
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if pipe_stdin else None
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None
return subprocess.Popen(
args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
@output_operator() @output_operator()
def run( def run(
stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None, stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None,
quiet=False, overwrite_output=False): quiet=False, overwrite_output=False):
"""Ivoke ffmpeg for the supplied node graph. """Invoke ffmpeg for the supplied node graph.
Args: Args:
capture_stdout: if True, capture stdout (to be used with capture_stdout: if True, capture stdout (to be used with
@ -201,13 +290,17 @@ def run(
Returns: (out, err) tuple containing captured stdout and stderr data. Returns: (out, err) tuple containing captured stdout and stderr data.
""" """
args = compile(stream_spec, cmd, overwrite_output=overwrite_output) process = run_async(
stdin_stream = subprocess.PIPE if input else None stream_spec,
stdout_stream = subprocess.PIPE if capture_stdout or quiet else None cmd,
stderr_stream = subprocess.PIPE if capture_stderr or quiet else None pipe_stdin=input is not None,
p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream) pipe_stdout=capture_stdout,
out, err = p.communicate(input) pipe_stderr=capture_stderr,
retcode = p.poll() quiet=quiet,
overwrite_output=overwrite_output,
)
out, err = process.communicate(input)
retcode = process.poll()
if retcode: if retcode:
raise Error('ffmpeg', out, err) raise Error('ffmpeg', out, err)
return out, err return out, err
@ -218,4 +311,5 @@ __all__ = [
'Error', 'Error',
'get_args', 'get_args',
'run', 'run',
'run_async',
] ]

View File

@ -1,9 +1,9 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from builtins import str
from builtins import bytes from builtins import bytes
from builtins import range from builtins import range
from builtins import str
import ffmpeg import ffmpeg
import mock
import os import os
import pytest import pytest
import random import random
@ -414,6 +414,25 @@ def test__compile():
assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4'] assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4']
@pytest.mark.parametrize('pipe_stdin', [True, False])
@pytest.mark.parametrize('pipe_stdout', [True, False])
@pytest.mark.parametrize('pipe_stderr', [True, False])
def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr):
process__mock = mock.Mock()
popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock)
stream = _get_simple_example()
process = ffmpeg.run_async(
stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr)
assert process is process__mock
expected_stdin = subprocess.PIPE if pipe_stdin else None
expected_stdout = subprocess.PIPE if pipe_stdout else None
expected_stderr = subprocess.PIPE if pipe_stderr else None
(args,), kwargs = popen__mock.call_args
assert args == ffmpeg.compile(stream)
assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr)
def test__run(): def test__run():
stream = _get_complex_filter_example() stream = _get_complex_filter_example()
out, err = ffmpeg.run(stream) out, err = ffmpeg.run(stream)