validate:tools: Implement the logic of validate ouput parsing in the baseclass

+ Add some logic to check that we are mot playing outside wanted segment
This commit is contained in:
Thibault Saunier 2014-01-30 12:42:25 +01:00
parent 2c52d6374c
commit cd098eb28c
4 changed files with 108 additions and 79 deletions

View file

@ -106,7 +106,7 @@ class GESPlaybackTest(GESTest):
project_uri, scenario=scenario)
def get_current_value(self):
return utils.get_current_position(self)
return self.get_current_position()
class GESRenderTest(GESTest):
@ -151,7 +151,7 @@ class GESRenderTest(GESTest):
GstValidateTest.check_results(self)
def get_current_value(self):
return utils.get_current_size(self)
return self.get_current_size()
class GESTestsManager(TestsManager):

View file

@ -24,8 +24,7 @@ from loggable import Loggable
from baseclasses import GstValidateTest, TestsManager, Test, Scenario, NamedDic
from utils import MediaFormatCombination, get_profile,\
path2url, get_current_position, get_current_size, \
DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
path2url, DEFAULT_TIMEOUT, which, GST_SECOND, Result, \
compare_rendered_with_original
@ -97,7 +96,7 @@ class GstValidateLaunchTest(GstValidateTest):
self.add_arguments(self.pipeline_desc)
def get_current_value(self):
return get_current_position(self)
return self.get_current_position()
class GstValidateMediaCheckTest(Test):
@ -158,7 +157,7 @@ class GstValidateTranscodingTest(GstValidateTest):
self.add_arguments(self.uri, self.dest_file)
def get_current_value(self):
return get_current_size(self)
return self.get_current_size()
def check_results(self):
if self.process.returncode == 0:

View file

@ -23,6 +23,7 @@ import os
import re
import time
import utils
import urlparse
import subprocess
import reporters
from loggable import Loggable
@ -202,11 +203,15 @@ class GstValidateTest(Test):
def __init__(self, application_name, classname,
options, reporter, timeout=DEFAULT_TIMEOUT,
scenario=None, hard_timeout=None):
scenario=None, hard_timeout=None, max_outside_segment=5):
super(GstValidateTest, self).__init__(application_name, classname, options,
reporter, timeout=timeout, hard_timeout=hard_timeout)
# defines how much the process can be outside of the configured
# segment / seek
self.max_outside_segment = max_outside_segment
if scenario is None or scenario.name.lower() == "none":
self.scenario = None
else:
@ -256,6 +261,96 @@ class GstValidateTest(Test):
self.process.returncode,
self.get_validate_criticals_errors()
))
def _parse_position(self, p):
self.log("Parsing %s" % p)
start_stop = p.replace("<position: ", '').replace("/>", "").split(" duration: ")
if len(start_stop) < 2:
self.warning("Got a unparsable value: %s" % p)
return 0, 0
if "speed:"in start_stop[1]:
start_stop[1] = start_stop[1].split("speed:")[0].rstrip().lstrip()
return utils.parse_gsttimeargs(start_stop[0]), utils.parse_gsttimeargs(start_stop[1])
def _parse_buffering(self, b):
return b.split("buffering... ")[1].split("%")[0], 100
def _get_position(self):
position = duration = -1
self.debug("Getting position")
self.reporter.out.seek(0)
m = None
for l in reversed(self.reporter.out.readlines()):
l = l.lower()
if "<position:" in l or "buffering" in l:
m = l
break
if m is None:
self.debug("Could not fine any positionning info")
return position, duration
for j in m.split("\r"):
j = j.lstrip().rstrip()
if j.startswith("<position:") and j.endswith("/>"):
position, duration = self._parse_position(j)
elif j.startswith("buffering") and j.endswith("%"):
position, duration = self._parse_buffering(j)
else:
self.debug("No info in %s" % j)
return position, duration
def _get_last_seek_values(self):
m = None
rate = start = stop = None
for l in reversed(self.reporter.out.readlines()):
l = l.lower()
if "seeking to: " in l:
m = l
break
if m is None:
self.debug("Could not fine any seeking info")
return start, stop, rate
tmp = m.split("seeking to: ")[1].split(" stop: ")
start = tmp[0]
stop_rate = tmp[1].split(" Rate")
return utils.parse_gsttimeargs(start), \
utils.parse_gsttimeargs(stop_rate[0]), float(stop_rate[1].replace(":",""))
def get_current_position(self):
position, duration = self._get_position()
start, stop, rate = self._get_last_seek_values()
if start and not (start - self.max_outside_segment * GST_SECOND < position < stop +
self.max_outside_segment):
self.set_result(Result.FAILED,
"Position not in expected 'segment' (with %d second tolerance)"
"seek.start %d < position %d < seek.stop %d is FALSE"
% (self.max_outside_segment,
start - self.max_outside_segment, position,
stop + self.max_outside_segment)
)
return position
def get_current_size(self):
position = self.get_current_position()
size = os.stat(urlparse.urlparse(self.dest_file).path).st_size
self.debug("Size: %s" % size)
return size
class TestsManager(Loggable):

View file

@ -193,81 +193,16 @@ def get_profile(combination):
##################################################
# Some utilities to parse gst-validate output #
##################################################
def _parse_position(p):
def parse_gsttimeargs(time):
return int(time.split(":")[0]) * 3600 + int(time.split(":")[1]) * 60 + int(time.split(":")[2].split(".")[0]) * 60
start_stop = p.replace("<position: ", '').replace("/>", "").split(" duration: ")
if len(start_stop) < 2:
loggable.warning("utils", "Got a unparsable value: %s" % p)
return 0, 0
if " speed: "in start_stop[1]:
start_stop[1] = start_stop[1].split("speed: ")[0]
return parse_gsttimeargs(start_stop[0]), parse_gsttimeargs(start_stop[1])
def _parse_buffering(b):
return b.split("buffering... ")[1].split("%")[0], 100
def _get_position(test):
position = duration = -1
test.reporter.out.seek(0)
m = None
for l in reversed(test.reporter.out.readlines()):
l = l.lower()
if "<position:" in l or "buffering" in l:
m = l
break
if m is None:
loggable.debug("utils", "Could not fine any positionning info")
return position, duration
for j in m.split("\r"):
if j.startswith("<position:") and j.endswith("/>"):
position, duration = _parse_position(j)
elif j.startswith("buffering") and j.endswith("%"):
position, duration = _parse_buffering(j)
return position, duration
def get_current_position(test, max_passed_stop=0.5):
position, duration = _get_position(test)
if position > duration + max_passed_stop:
loggable.warning("utils", "Position > duration -> Returning -1")
return -1
return position
def get_current_size(test):
position = get_current_position(test)
if position is -1:
return -1
size = os.stat(urlparse.urlparse(test.dest_file).path).st_size
loggable.debug("utils", "Size: %s" % size)
return size
def parse_gsttimeargs(time):
stime = time.split(":")
sns = stime[2].split(".")
stime[2] = sns[0]
stime.append(sns[1])
return long((int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2]) * 60) * GST_SECOND + int(stime[3]))
def get_duration(media_file):
duration = 0
def parse_gsttimeargs(time):
stime = time.split(":")
sns = stime[2].split(".")
stime[2] = sns[0]
stime.append(sns[1])
return (int(stime[0]) * 3600 + int(stime[1]) * 60 + int(stime[2]) * 60) * GST_SECOND + int(stime[3])
duration = 0
try:
res = subprocess.check_output([DISCOVERER_COMMAND, media_file])
except subprocess.CalledProcessError: