Add a test launcher tool

This commit is contained in:
Thibault Saunier 2013-12-31 11:45:07 +01:00
parent e8db3c67b9
commit b4a2ca6286
5 changed files with 877 additions and 0 deletions

View file

@ -0,0 +1,353 @@
#!/usr//bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
import os
import time
from urllib import unquote
from urlparse import urlsplit
from utils import launch_command
from gi.repository import GES, Gst, GLib
DURATION_TOLERANCE = Gst.SECOND / 2
DEFAULT_GES_LAUNCH = "ges-launch-1.0"
class Combination(object):
def __str__(self):
return "%s and %s in %s" % (self.acodec, self.vcodec, self.container)
def __init__(self, container, acodec, vcodec):
self.container = container
self.acodec = acodec
self.vcodec = vcodec
FORMATS = {"aac": "audio/mpeg,mpegversion=4",
"ac3": "audio/x-ac3",
"vorbis": "audio/x-vorbis",
"mp3": "audio/mpeg,mpegversion=1,layer=3",
"h264": "video/x-h264",
"vp8": "video/x-vp8",
"theora": "video/x-theora",
"ogg": "application/ogg",
"mkv": "video/x-matroska",
"mp4": "video/quicktime,variant=iso;",
"webm": "video/x-matroska"}
COMBINATIONS = [
Combination("ogg", "vorbis", "theora"),
Combination("webm", "vorbis", "vp8"),
Combination("mp4", "mp3", "h264"),
Combination("mkv", "vorbis", "h264")]
SCENARIOS = ["none", "seek_forward", "seek_backward", "scrub_forward_seeking"]
def get_profile_full(muxer, venc, aenc, video_restriction=None,
audio_restriction=None,
audio_presence=0, video_presence=0):
ret = "\""
if muxer:
ret += muxer
ret += ":"
if venc:
if video_restriction is not None:
ret = ret + video_restriction + '->'
ret += venc
if video_presence:
ret = ret + '|' + str(video_presence)
if aenc:
ret += ":"
if audio_restriction is not None:
ret = ret + audio_restriction + '->'
ret += aenc
if audio_presence:
ret = ret + '|' + str(audio_presence)
ret += "\""
return ret.replace("::", ":")
def get_profile(combination):
return get_profile_full(FORMATS[combination.container],
FORMATS[combination.vcodec],
FORMATS[combination.acodec],
video_restriction="video/x-raw,format=I420")
def quote_uri(uri):
"""
Encode a URI/path according to RFC 2396, without touching the file:/// part.
"""
# Split off the "file:///" part, if present.
parts = urlsplit(uri, allow_fragments=False)
# Make absolutely sure the string is unquoted before quoting again!
raw_path = unquote(parts.path)
# For computing thumbnail md5 hashes in the media library, we must adhere to
# RFC 2396. It is quite tricky to handle all corner cases, leave it to Gst:
return Gst.filename_to_uri(raw_path)
class GESTest(Test):
def __init__(self, classname, options, reporter, project_uri, scenario,
combination=None):
super(GESTest, self).__init__(DEFAULT_GES_LAUNCH, classname, options, reporter)
self.scenario = scenario
self.project_uri = project_uri
self.combination = combination
proj = GES.Project.new(project_uri)
tl = proj.extract()
if tl is None:
self.duration = None
else:
self.duration = tl.get_meta("duration")
if self.duration is not None:
self.duration = self.duration / Gst.SECOND
else:
self.duration = 2 * 60
def set_rendering_info(self):
self.dest_file = os.path.join(self.options.dest,
os.path.basename(self.project_uri) +
'-' + self.combination.acodec +
self.combination.vcodec + '.' +
self.combination.container)
if not Gst.uri_is_valid(self.dest_file):
self.dest_file = GLib.filename_to_uri(self.dest_file, None)
profile = get_profile(self.combination)
self.add_arguments("-f", profile, "-o", self.dest_file)
def set_sample_paths(self):
if not self.options.paths:
if not self.options.recurse_paths:
return
paths = [os.path.dirname(Gst.uri_get_location(self.project_uri))]
else:
paths = self.options.paths
for path in paths:
if self.options.recurse_paths:
self.add_arguments("--sample-paths", quote_uri(path))
for root, dirs, files in os.walk(path):
for directory in dirs:
self.add_arguments("--sample-paths",
quote_uri(os.path.join(path,
root,
directory)
)
)
else:
self.add_arguments("--sample-paths", "file://" + path)
def build_arguments(self):
print "\OOO %s" % self.combination
if self.scenario is not None:
self.add_arguments("--set-scenario", self.scenario)
if self.combination is not None:
self.set_rendering_info()
if self.options.mute:
self.add_arguments(" --mute")
self.set_sample_paths()
self.add_arguments("-l", self.project_uri)
def check_results(self):
if self.process.returncode == 0:
if self.combination:
try:
asset = GES.UriClipAsset.request_sync(self.dest_file)
if self.duration - DURATION_TOLERANCE <= asset.get_duration() \
<= self.duration + DURATION_TOLERANCE:
self.set_result(Result.FAILURE, "Duration of encoded file is "
" wrong (%s instead of %s)" %
(Gst.TIME_ARGS(self.duration),
Gst.TIME_ARGS(asset.get_duration())),
"wrong-duration")
else:
self.set_result(Result.PASSED)
except GLib.Error as e:
self.set_result(Result.FAILURE, "Wrong rendered file", "failure", e)
else:
self.set_result(Result.PASSED)
else:
if self.combination and self.result == Result.TIMEOUT:
missing_eos = False
try:
asset = GES.UriClipAsset.request_sync(self.dest_file)
if asset.get_duration() == self.duration:
missing_eos = True
except Exception as e:
pass
if missing_eos is True:
self.set_result(Result.TIMEOUT, "The rendered file add right duration, MISSING EOS?\n",
"failure", e)
else:
if self.result == Result.TIMEOUT:
self.set_result(Result.TIMEOUT, "Application timed out", "timeout")
else:
if self.process.returncode == 139:
self.get_backtrace("SEGFAULT")
self.set_result("Application segfaulted")
else:
self.set_result(Result.FAILED,
"Application returned %d (issues: %s)" % (
self.process.returncode,
self.get_validate_criticals_errors()),
"error")
def wait_process(self):
last_val = 0
last_change_ts = time.time()
while True:
self.process.poll()
if self.process.returncode is not None:
self.check_results()
break
# Dirty way to avoid eating to much CPU... good enough for us anyway.
time.sleep(1)
if self.combination:
val = os.stat(GLib.filename_from_uri(self.dest_file)[0]).st_size
else:
val = self.get_last_position()
if val == last_val:
if time.time() - last_change_ts > 10:
self.result = Result.TIMEOUT
else:
last_change_ts = time.time()
last_val = val
def get_last_position(self):
self.reporter.out.seek(0)
m = None
for l in self.reporter.out.readlines():
if "<Position:" in l:
m = l
if m is None:
return ""
pos = ""
for j in m.split("\r"):
if j.startswith("<Position:") and j.endswith("/>"):
pos = j
return pos
class GESTestsManager(TestsManager):
def __init__(self):
super(GESTestsManager, self).__init__()
Gst.init(None)
GES.init()
default_opath = GLib.get_user_special_dir(
GLib.UserDirectory.DIRECTORY_VIDEOS)
if default_opath:
self.default_path = os.path.join(default_opath, "ges-projects")
else:
self.default_path = os.path.join(os.path.expanduser('~'), "Video",
"ges-projects")
def add_options(self, parser):
parser.add_option("-o", "--output-path", dest="dest",
default=os.path.join(self.default_path, "rendered"),
help="Set the path to which projects should be"
" renderd")
parser.add_option("-P", "--sample-path", dest="paths",
default=[],
help="Paths in which to look for moved assets")
parser.add_option("-r", "--recurse-paths", dest="recurse_paths",
default=False, action="store_true",
help="Whether to recurse into paths to find assets")
parser.add_option("-m", "--mute", dest="mute",
action="store_true", default=False,
help="Mute playback output, which mean that we use "
"a fakesink")
def set_settings(self, options, args, reporter):
TestsManager.set_settings(self, options, args, reporter)
if not args and not os.path.exists(self.default_path):
launch_command("git clone %s" % DEFAULT_ASSET_REPO,
"Getting assets")
if not Gst.uri_is_valid(options.dest):
options.dest = GLib.filename_to_uri(options.dest, None)
try:
os.makedirs(GLib.filename_from_uri(options.dest)[0])
print "Created directory: %s" % options.dest
except OSError:
pass
def list_tests(self):
projects = list()
if not self.args:
self.options.paths = [os.path.join(self.default_path, "assets")]
path = os.path.join(self.default_path, "projects")
for root, dirs, files in os.walk(path):
for f in files:
if not f.endswith(".xges"):
continue
projects.append(GLib.filename_to_uri(os.path.join(path,
root,
f),
None))
else:
for proj in self.args:
if Gst.uri_is_valid(proj):
projects.append(proj)
else:
projects.append(GLib.filename_to_uri(proj, None))
for proj in projects:
# First playback casses
for scenario in SCENARIOS:
classname = "ges.playback.%s.%s" % (scenario, os.path.basename(proj).replace(".xges", ""))
self.tests.append(GESTest(classname,
self.options,
self.reporter,
proj,
scenario)
)
# And now rendering casses
for comb in COMBINATIONS:
classname = "ges.render.%s.%s" % (str(comb).replace(' ', '_'),
os.path.basename(proj).replace(".xges", ""))
self.tests.append(GESTest(classname,
self.options,
self.reporter,
proj,
None,
comb)
)

View file

@ -0,0 +1,51 @@
#!/usr//bin/python
import os
from testdefinitions import _TestsLauncher
from optparse import OptionParser
def main():
parser = OptionParser()
parser.add_option("-g", "--gdb", dest="gdb",
action="store_true",
default=False,
help="Run applications into gdb")
parser.add_option("-f", "--forever", dest="forever",
action="store_true", default=False,
help="Keep running tests until one fails")
parser.add_option("-F", "--fatal-error", dest="fatal_error",
action="store_true", default=False,
help="Stop on first fail")
parser.add_option('--xunit-file', action='store',
dest='xunit_file', metavar="FILE",
default=None,
help=("Path to xml file to store the xunit report in. "
"Default is xunit.xml the logs-dir directory"))
parser.add_option("-t", "--wanted-tests", dest="wanted_tests",
default=None,
help="Define the tests to execute, it can be a regex")
parser.add_option("-L", "--list-tests",
dest="list_tests",
action="store_true",
default=False,
help="List tests and exit")
parser.add_option("-l", "--logs-dir", dest="logsdir",
action="store_true", default=os.path.expanduser("~/gst-validate/logs/"),
help="Directory where to store logs")
tests_launcher = _TestsLauncher()
tests_launcher.add_options(parser)
(options, args) = parser.parse_args()
if options.xunit_file is None:
options.xunit_file = os.path.join(options.logsdir, "xunit.xml")
tests_launcher.set_settings(options, args)
tests_launcher.list_tests()
if options.list_tests:
for test in tests_launcher.tests:
print test
return 0
tests_launcher.run_tests()
tests_launcher.final_report()
return 0
if "__main__" == __name__:
exit(main())

183
validate/tools/reporters.py Normal file
View file

@ -0,0 +1,183 @@
#!/usr/bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Test Reporters implementation. """
import os
import re
import codecs
import testdefinitions
from xml.sax import saxutils
from utils import mkdir, Result
UNICODE_STRINGS = (type(unicode()) == type(str()))
class UnknownResult(Exception):
pass
CONTROL_CHARACTERS = re.compile(r"[\000-\010\013\014\016-\037]")
def xml_safe(value):
"""Replaces invalid XML characters with '?'."""
return CONTROL_CHARACTERS.sub('?', value)
def escape_cdata(cdata):
"""Escape a string for an XML CDATA section."""
return xml_safe(cdata).replace(']]>', ']]>]]&gt;<![CDATA[')
class Reporter(object):
name = 'simple'
def __init__(self, options):
self._current_test = None
self.out = None
self.options = options
self.stats = {'timeout': 0,
'failures': 0,
'passes': 0,
'skipped': 0
}
self.results = []
def before_test(self, test):
"""Initialize a timer before starting a test."""
path = os.path.join(self.options.logsdir,
test.classname.replace(".", os.sep))
mkdir(os.path.dirname(path))
self.out = open(path, 'w+')
self._current_test = test
def set_failed(self, test):
self.stats["failed"] += 1
def set_passed(self, test):
self.stats["passed"] += 1
def add_results(self, test):
if test.result == Result.PASSED:
self.set_passed(test)
elif test.result == Result.FAILED or \
test.result == Result.TIMEOUT:
self.set_failed(test)
else:
raise UnknownResult("%s" % test.result)
def after_test(self):
self.out.close()
self.out = None
self.results.append(self._current_test)
self.add_results(self._current_test)
self._current_test = None
def final_report(self):
pass
class XunitReporter(Reporter):
"""This reporter provides test results in the standard XUnit XML format."""
name = 'xunit'
encoding = 'UTF-8'
xml_file = None
def __init__(self, options):
super(XunitReporter, self).__init__(options)
self.errorlist = []
def final_report(self):
self.report()
def _get_captured(self):
if self.out:
self.out.seek(0)
value = self.out.read()
if value:
return '<system-out><![CDATA[%s]]></system-out>' % \
escape_cdata(value)
return ''
def _quoteattr(self, attr):
"""Escape an XML attribute. Value can be unicode."""
attr = xml_safe(attr)
if isinstance(attr, unicode) and not UNICODE_STRINGS:
attr = attr.encode(self.encoding)
return saxutils.quoteattr(attr)
def report(self):
"""Writes an Xunit-formatted XML file
The file includes a report of test errors and failures.
"""
print "Writing XML file to: %s" % self.options.xunit_file
self.xml_file = codecs.open(self.options.xunit_file, 'w',
self.encoding, 'replace')
self.stats['encoding'] = self.encoding
self.stats['total'] = (self.stats['timeout'] + self.stats['failures']
+ self.stats['passes'] + self.stats['skipped'])
print self.stats
self.xml_file.write( u'<?xml version="1.0" encoding="%(encoding)s"?>'
u'<testsuite name="gesprojectslauncher" tests="%(total)d" '
u'errors="%(timeout)d" failures="%(failures)d" '
u'skip="%(skipped)d">' % self.stats)
self.xml_file.write(u''.join([self._forceUnicode(e)
for e in self.errorlist]))
self.xml_file.write(u'</testsuite>')
self.xml_file.close()
def set_failed(self, test):
"""Add failure output to Xunit report.
"""
self.stats['failures'] += 1
self.results.insert(0, test)
self.errorlist.append(
'<testcase classname=%(cls)s name=%(name)s time="%(taken).3f">'
'<failure type=%(errtype)s message=%(message)s>'
'</failure>%(systemout)s</testcase>' %
{'cls': self._quoteattr(test.classname),
'name': self._quoteattr(test.classname.split('.')[-1]),
'taken': test.time_taken,
'errtype': self._quoteattr(test.result),
'message': self._quoteattr(test.message),
'systemout': self._get_captured(),
})
def set_passed(self, test):
"""Add success output to Xunit report.
"""
self.stats['passes'] += 1
self.results.append(test)
self.errorlist.append(
'<testcase classname=%(cls)s name=%(name)s '
'time="%(taken).3f">%(systemout)s</testcase>' %
{'cls': self._quoteattr(test.classname),
'name': self._quoteattr(test.classname.split('.')[-1]),
'taken': test.time_taken,
'systemout': self._get_captured(),
})
def _forceUnicode(self, s):
if not UNICODE_STRINGS:
if isinstance(s, str):
s = s.decode(self.encoding, 'replace')
return s

View file

@ -0,0 +1,227 @@
#!/usr/bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Class representing tests and test managers. """
import os
import re
import time
import subprocess
import reporters
from utils import mkdir, Result
DEFAULT_TIMEOUT = 10
class Test(object):
""" A class representing a particular test. """
def __init__(self, application_name, classname, options, reporter):
self.timeout = DEFAULT_TIMEOUT
self.classname = classname
self.options = options
self.application = application_name
self.command = ""
self.reporter = reporter
self.process = None
self.message = None
self.error = None
self.time_taken = None
self._starting_time = None
self.result = Result.NOT_RUN
def __str__(self):
string = self.classname
if self.result:
string += ": " + self.result
if "FAILED" in self.result:
string += "\n You can reproduce with: " + self.command
return string
def add_arguments(self, *args):
for arg in args:
self.command += " " + arg
def build_arguments(self):
pass
def set_result(self, result, message=None, error=None):
print "SETTING TER"
self.result = result
self.message = message
self.error = error
def check_results(self):
if self.process.returncode == 0:
self.result = Result.PASSED
self.result = Result.FAILED
def wait_process(self):
last_change_ts = time.time()
while True:
self.process.poll()
if self.process.returncode is not None:
break
if time.time() - last_change_ts > self.timeout:
self.result = Result.TIMEOUT
# Dirty way to avoid eating to much CPU...
# good enough for us anyway.
time.sleep(1)
self.check_results()
def get_validate_criticals_errors(self):
self.reporter.out.seek(0)
ret = "["
for l in self.reporter.out.readlines():
if "critical : " in l:
if ret != "[":
ret += ", "
ret += l.split("critical : ")[1].replace("\n", '')
if ret == "[":
return "No critical"
else:
return ret + "]"
def run(self):
self.command = "%s " % (self.application)
self._starting_time = time.time()
self.build_arguments()
print "Launching %s" % self.command
try:
self.process = subprocess.Popen(self.command,
stderr=self.reporter.out,
stdout=self.reporter.out,
shell=True)
self.wait_process()
except KeyboardInterrupt:
self.process.kill()
raise
try:
self.process.terminate()
except OSError:
pass
self.time_taken = time.time() - self._starting_time
class TestsManager(object):
""" A class responsible for managing tests. """
def __init__(self):
self.tests = []
self.options = None
self.args = None
self.reporter = None
self.wanted_tests_patterns = []
def list_tests(self):
pass
def get_tests(self):
return self.tests
def add_options(self, parser):
""" Add more arguments. """
pass
def set_settings(self, options, args, reporter):
""" Set properties after options parsing. """
self.options = options
self.args = args
self.reporter = reporter
if options.wanted_tests:
for pattern in options.wanted_tests.split(','):
self.wanted_tests_patterns.append(re.compile(pattern))
def _is_test_wanted(self, test):
for pattern in self.wanted_tests_patterns:
if pattern.findall(test.classname):
return True
return False
def run_tests(self):
for test in self.tests:
if self._is_test_wanted(test):
self.reporter.before_test(test)
test.run()
self.reporter.after_test()
class _TestsLauncher(object):
def __init__(self):
self.testers = []
self.tests = []
self.reporter = None
self._list_testers()
self.wanted_tests_patterns = []
def _list_testers(self):
def get_subclasses(c, env):
subclasses = []
for symb in env.iteritems():
try:
if issubclass(symb[1], c):
subclasses.append(symb[1])
except TypeError:
pass
return subclasses
env = globals().copy()
d = os.path.dirname(__file__)
for f in os.listdir(os.path.join(d, "apps")):
execfile(os.path.join(d, "apps", f), env)
self.testers = [i() for i in get_subclasses(TestsManager, env)]
print self.testers
def add_options(self, parser):
for tester in self.testers:
tester.add_options(parser)
def set_settings(self, options, args):
self.reporter = reporters.XunitReporter(options)
mkdir(options.logsdir)
for tester in self.testers:
tester.set_settings(options, args, self.reporter)
def list_tests(self):
for tester in self.testers:
tester.list_tests()
self.tests.extend(tester.tests)
def run_tests(self):
for tester in self.testers:
tester.run_tests()
def final_report(self):
self.reporter.final_report()

63
validate/tools/utils.py Normal file
View file

@ -0,0 +1,63 @@
#!/usr/bin/python
#
# Copyright (c) 2013,Thibault Saunier <thibault.saunier@collabora.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.
""" Some utilies. """
import os
class Result(object):
NOT_RUN = "Not run"
FAILED = "Failed"
TIMEOUT = "Timeout"
PASSED = "Passed"
class Colors(object):
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def desactivate_colors(self):
Colors.HEADER = ''
Colors.OKBLUE = ''
Colors.OKGREEN = ''
Colors.WARNING = ''
Colors.FAIL = ''
Colors.ENDC = ''
def mkdir(directory):
try:
os.makedirs(directory)
except os.error:
pass
def launch_command(command, name="", color=None):
if name != "":
if color is not None:
print "%s%s" % (color, len(name) * "=")
print name
if color is not None:
print "%s%s" % (len(name) * "=", Colors.ENDC)
os.system(command)