Merge pull request #144 from kkroening/run-async

Add `run_async` operator
This commit is contained in:
Karl Kroening 2018-11-25 21:50:50 -06:00 committed by GitHub
commit 629202806e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 25 deletions

View File

@ -132,44 +132,48 @@ out.run()
- Encode output video with ffmpeg
```python
args1 = (
process1 = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8)
.compile()
.run_async(pipe_stdout=True)
)
process1 = subprocess.Popen(args1, stdout=subprocess.PIPE)
args2 = (
process2 = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.compile()
.run_async(pipe_stdin=True)
)
process2 = subprocess.Popen(args2, stdin=subprocess.PIPE)
while True:
in_bytes = process1.stdout.read(width * height * 3)
in_frame (
if not in_bytes:
break
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
# See examples/tensorflow_stream.py:
frame = deep_dream.process_frame(frame)
out_frame = deep_dream.process_frame(in_frame)
process2.stdin.write(
frame
in_frame
.astype(np.uint8)
.tobytes()
)
process2.stdin.close()
process1.wait()
process2.wait()
```
<img src="https://raw.githubusercontent.com/kkroening/ffmpeg-python/master/examples/graphs/dream.png" alt="deep dream streaming" width="40%" />
## [FaceTime webcam input](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py)
## [FaceTime webcam input (OS X)](https://github.com/kkroening/ffmpeg-python/blob/master/examples/facetime.py)
```python
(
@ -179,3 +183,25 @@ while True:
.run()
)
```
## Stream from RTSP server to TCP socket
```python
packet_size = 4096
process = (
ffmpeg
.input('rtsp://%s:8554/default')
.output('-', format='h264')
.run_async(pipe_stdout=True)
)
while process.poll() is None:
packet = process.stdout.read(packet_size)
try:
tcp_socket.send(packet)
except socket.error:
process.stdout.close()
process.wait()
break
```

View File

@ -58,7 +58,7 @@ def start_ffmpeg_process1(in_filename):
args = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24', vframes=8)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.compile()
)
return subprocess.Popen(args, stdout=subprocess.PIPE)
@ -113,14 +113,14 @@ def run(in_filename, out_filename, process_frame):
process1 = start_ffmpeg_process1(in_filename)
process2 = start_ffmpeg_process2(out_filename, width, height)
while True:
frame = read_frame(process1, width, height)
if frame is None:
in_frame = read_frame(process1, width, height)
if in_frame is None:
logger.info('End of input stream')
break
logger.debug('Processing frame')
frame = process_frame(frame)
write_frame(process2, frame)
out_frame = process_frame(in_frame)
write_frame(process2, out_frame)
logger.info('Waiting for ffmpeg process1')
process1.wait()

View File

@ -183,11 +183,100 @@ def compile(stream_spec, cmd='ffmpeg', overwrite_output=False):
return cmd + get_args(stream_spec, overwrite_output=overwrite_output)
@output_operator()
def run_async(
stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False, pipe_stderr=False,
quiet=False, overwrite_output=False):
"""Asynchronously invoke ffmpeg for the supplied node graph.
Args:
pipe_stdin: if True, connect pipe to subprocess stdin (to be
used with ``pipe:`` ffmpeg inputs).
pipe_stdout: if True, connect pipe to subprocess stdout (to be
used with ``pipe:`` ffmpeg outputs).
pipe_stderr: if True, connect pipe to subprocess stderr.
quiet: shorthand for setting ``capture_stdout`` and
``capture_stderr``.
**kwargs: keyword-arguments passed to ``get_args()`` (e.g.
``overwrite_output=True``).
Returns:
A `subprocess Popen`_ object representing the child process.
Examples:
Run and stream input::
process = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
process.communicate(input=input_data)
Run and capture output::
process = (
ffmpeg
.input(in_filename)
.output('pipe':, format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True, pipe_stderr=True)
)
out, err = process.communicate()
Process video frame-by-frame using numpy::
process1 = (
ffmpeg
.input(in_filename)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True)
)
process2 = (
ffmpeg
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
.output(out_filename, pix_fmt='yuv420p')
.overwrite_output()
.run_async(pipe_stdin=True)
)
while True:
in_bytes = process1.stdout.read(width * height * 3)
if not in_bytes:
break
in_frame = (
np
.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
out_frame = in_frame * 0.3
process2.stdin.write(
frame
.astype(np.uint8)
.tobytes()
)
process2.stdin.close()
process1.wait()
process2.wait()
.. _subprocess Popen: https://docs.python.org/3/library/subprocess.html#popen-objects
"""
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if pipe_stdin else None
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None
return subprocess.Popen(
args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
@output_operator()
def run(
stream_spec, cmd='ffmpeg', capture_stdout=False, capture_stderr=False, input=None,
quiet=False, overwrite_output=False):
"""Ivoke ffmpeg for the supplied node graph.
"""Invoke ffmpeg for the supplied node graph.
Args:
capture_stdout: if True, capture stdout (to be used with
@ -201,13 +290,17 @@ def run(
Returns: (out, err) tuple containing captured stdout and stderr data.
"""
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if input else None
stdout_stream = subprocess.PIPE if capture_stdout or quiet else None
stderr_stream = subprocess.PIPE if capture_stderr or quiet else None
p = subprocess.Popen(args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream)
out, err = p.communicate(input)
retcode = p.poll()
process = run_async(
stream_spec,
cmd,
pipe_stdin=input is not None,
pipe_stdout=capture_stdout,
pipe_stderr=capture_stderr,
quiet=quiet,
overwrite_output=overwrite_output,
)
out, err = process.communicate(input)
retcode = process.poll()
if retcode:
raise Error('ffmpeg', out, err)
return out, err
@ -218,4 +311,5 @@ __all__ = [
'Error',
'get_args',
'run',
'run_async',
]

View File

@ -1,8 +1,7 @@
from __future__ import unicode_literals
from builtins import str
from builtins import bytes
from builtins import range
from builtins import str
import ffmpeg
import os
import pytest
@ -10,6 +9,11 @@ import random
import re
import subprocess
try:
import mock # python 2
except ImportError:
from unittest import mock # python 3
TEST_DIR = os.path.dirname(__file__)
SAMPLE_DATA_DIR = os.path.join(TEST_DIR, 'sample_data')
@ -414,6 +418,25 @@ def test__compile():
assert out_file.compile(cmd='ffmpeg.old') == ['ffmpeg.old', '-i', 'dummy.mp4', 'dummy2.mp4']
@pytest.mark.parametrize('pipe_stdin', [True, False])
@pytest.mark.parametrize('pipe_stdout', [True, False])
@pytest.mark.parametrize('pipe_stderr', [True, False])
def test__run_async(mocker, pipe_stdin, pipe_stdout, pipe_stderr):
process__mock = mock.Mock()
popen__mock = mocker.patch.object(subprocess, 'Popen', return_value=process__mock)
stream = _get_simple_example()
process = ffmpeg.run_async(
stream, pipe_stdin=pipe_stdin, pipe_stdout=pipe_stdout, pipe_stderr=pipe_stderr)
assert process is process__mock
expected_stdin = subprocess.PIPE if pipe_stdin else None
expected_stdout = subprocess.PIPE if pipe_stdout else None
expected_stderr = subprocess.PIPE if pipe_stderr else None
(args,), kwargs = popen__mock.call_args
assert args == ffmpeg.compile(stream)
assert kwargs == dict(stdin=expected_stdin, stdout=expected_stdout, stderr=expected_stderr)
def test__run():
stream = _get_complex_filter_example()
out, err = ffmpeg.run(stream)