diff options
Diffstat (limited to 'src/test/AVTest.py')
-rw-r--r-- | src/test/AVTest.py | 674 |
1 files changed, 674 insertions, 0 deletions
diff --git a/src/test/AVTest.py b/src/test/AVTest.py new file mode 100644 index 0000000..04de91e --- /dev/null +++ b/src/test/AVTest.py @@ -0,0 +1,674 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# libavg - Media Playback Engine. +# Copyright (C) 2003-2014 Ulrich von Zadow +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Current versions can be found at www.libavg.de +# + +from libavg import avg, player +from testcase import * + +class AVTestCase(AVGTestCase): + def __init__(self, testFuncName): + AVGTestCase.__init__(self, testFuncName) + + def setUp(self): + AVGTestCase.setUp(self) + + def testEOF(self, node): + def onEOF(): + player.stop() + + def onNoEOF(): + self.fail("No EOF") + + def onSubscribeEOF(): + self.eofCalled = True + + self.eofCalled = False + root = self.loadEmptyScene() + root.appendChild(node) + node.play() + node.setEOFCallback(onEOF) + node.subscribe(avg.Node.END_OF_FILE, onSubscribeEOF) + player.setTimeout(100000, onNoEOF) + player.play() + self.assert_(self.eofCalled) + + def testVideoInfo(self): + def checkInfo(): + node.pause() + self.assertEqual(node.getContainerFormat(), "avi") + self.assertEqual(node.getCurFrame(), 0) + self.assertEqual(node.getCurTime(), 0) + self.assertEqual(node.getDuration(), 1000) + self.assertEqual(node.getBitrate(), 224064) + self.assertEqual(node.getVideoCodec(), "mpeg4") + self.assertEqual(node.getStreamPixelFormat(), "yuv420p") + self.assertEqual(node.getVideoDuration(), 1000) + if isThreaded: + self.assertEqual(node.getAudioCodec(), "mp2") + self.assertEqual(node.getAudioSampleRate(), 44100) + self.assertEqual(node.getNumAudioChannels(), 2) + self.assert_(node.getVideoDuration() >= 1000) + + def checkEnableSound(): + node = avg.VideoNode(href="mpeg1-48x48-sound.avi", threaded=isThreaded, + enablesound=False, parent=root) + node.pause() + self.assertEqual(node.getVideoCodec(), "mpeg4") + self.assertException(node.getAudioCodec) + + def checkExceptions(): + node = avg.VideoNode(href="mpeg1-48x48.mov", threaded=isThreaded) + self.assertException(node.getDuration) + self.assertException(node.getBitrate) + self.assertException(node.getVideoCodec) + self.assertException(node.getStreamPixelFormat) + node.pause() + self.assertException(node.getAudioCodec) + self.assertException(node.getAudioSampleRate) + self.assertException(node.getNumAudioChannels) + root.appendChild(node) + + def checkAudioFile(): + node = avg.VideoNode(href="44.1kHz_16bit_stereo.wav", threaded=isThreaded, + parent=root) + self.assertException(node.pause) + + sys.stderr.write("\n") + for isThreaded in (False, True): + sys.stderr.write(" Threaded: " + str(isThreaded) + "\n") + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48-sound.avi", threaded=isThreaded, + parent=root) + checkInfo() + checkEnableSound() + checkExceptions() + self.start(False, + (checkInfo, + checkExceptions, + checkAudioFile, + )) + sys.stderr.write(" Nonstandard queue length\n") + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48-sound.avi", queuelength=23, parent=root) + self.assertEqual(node.queuelength, 23) + + def testVideoFiles(self): + def testVideoFile(filename, isThreaded): + def setVolume(volume): + node.volume = volume + + def testGetVolume(volume): + self.assertAlmostEqual(node.volume, volume) + + def checkImage(filename): + if not(isThreaded): + self.compareImage("testVideo-"+filename+"1") + + def testInfo(): + if filename == "mpeg1-48x48-sound.avi" and isThreaded: + self.assert_(node.hasAudio()) + else: + self.assert_(not(node.hasAudio())) + self.assert_((filename == "rgba-48x48.mov" or + filename == "vp6a-yuva-48x48.flv") == node.hasAlpha()) + + root = self.loadEmptyScene() + node = avg.VideoNode(href=filename, volume=0.8, size=(96,96), + threaded=isThreaded) + self.assertEqual(node.threaded, isThreaded) + setVolume(0.6) + root.appendChild(node) + self.assertException(node.hasAudio) + self.start(False, + (lambda: setVolume(0.5), + lambda: testGetVolume(0.5), + lambda: node.play(), + lambda: checkImage(filename), + lambda: setVolume(0.3), + lambda: testGetVolume(0.3), + testInfo, + lambda: node.stop() + )) + videoFiles = ["mjpeg-48x48.avi", "mpeg1-48x48.mov", #"mpeg1-48x48-sound.avi", + "rgba-48x48.mov", "h264-48x48.h264", "vp6a-yuva-48x48.flv"] + sys.stderr.write("\n") + for filename in videoFiles: + sys.stderr.write(" "+filename+"\n") + for isThreaded in [False, True]: + sys.stderr.write(" threaded: "+str(isThreaded)+"\n") + testVideoFile(filename, isThreaded) + + def testPlayBeforeConnect(self): + node = avg.VideoNode(href="media/mpeg1-48x48.mov", threaded=False) + node.play() + player.createMainCanvas(size=(160,120)) + root = player.getRootNode() + root.insertChild(node, 0) + player.setFakeFPS(25) + self.start(False, + (lambda: self.assertEqual(node.size, (48, 48)), + lambda: self.compareImage("testPlayBeforeConnect"), + )) + + def testVideoState(self): + for accelerated in [True, False]: + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48.mov", size=(96,96), threaded=False, + accelerated=accelerated, parent=root) + player.setFakeFPS(25) + self.start(False, + (lambda: node.play(), + lambda: self.compareImage("testVideoState1"), + lambda: node.pause(), + lambda: self.compareImage("testVideoState2"), + lambda: self.compareImage("testVideoState2"), + lambda: node.play(), + lambda: self.compareImage("testVideoState3"), + lambda: node.stop(), + lambda: self.compareImage("testVideoState4"), + lambda: node.pause(), + lambda: self.compareImage("testVideoState5"), + lambda: self.compareImage("testVideoState5"), + lambda: node.stop(), + lambda: self.compareImage("testVideoState4"), + )) + + def testVideoActive(self): + def deactivate(): + node.active=0 + + def activate(): + node.active=1 + + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48.mov", size=(96,96), threaded=False, + parent=root) + player.setFakeFPS(25) + self.start(False, + (lambda: node.play(), + deactivate, + lambda: self.compareImage("testVideoActive1"), + activate, + lambda: self.compareImage("testVideoActive2") + )) + + def testVideoHRef(self): + def testGetMediaSize(): + self.assertEqual(node.getMediaSize(), (48, 48)) + + def setHRefLoaded(): + node.href = "h264-48x48.h264" + + def setHRefUnloaded(): + node = avg.VideoNode() + node.href = "h264-48x48.h264" + node.play() + + def testVideoNotFound(): + # Missing file, but no play() or pause(): Should just work. + node = avg.VideoNode(href="MissingFile.mov") + node.href = "SecondMissingFile.mov" + # Now libavg notices the missing file. + self.assertException(node.play) + + def testVideoBroken(): + node = avg.VideoNode(href="rgb24-64x64.png") + self.assertException(node.play) + + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False, parent=root) + player.setFakeFPS(25) + testVideoNotFound() + testVideoBroken() + setHRefUnloaded() + self.start(False, + (lambda: node.play(), + testGetMediaSize, + setHRefLoaded, + lambda: self.compareImage("testVideoHRef1"), + testGetMediaSize, + testVideoNotFound, + testVideoBroken + )) + + def testVideoOpacity(self): + def testWithFile(filename, testImgName): + def hide(): + self.videoNode.opacity=0 + + def show(): + self.videoNode.opacity=1 + + player.setFakeFPS(25) + root = self.loadEmptyScene() + self.videoNode = avg.VideoNode(href=filename, loop=True, threaded=False, + parent=root) + self.start(False, + (lambda: self.videoNode.play(), + None, + lambda: self.compareImage(testImgName+"1"), + hide, + None, + None, + show, + lambda: self.compareImage(testImgName+"2") + )) + testWithFile("rgba-48x48.mov", "testVideoOpacityRGBA") + testWithFile("mpeg1-48x48.mov", "testVideoOpacityYUV") + + def testVideoSeek(self): + def seek(frame): + videoNode.seekToFrame(frame) + + def checkCurFrame(): + self.assertEqual(videoNode.getCurFrame(), 26) + + player.setFakeFPS(25) + for useCustomFPS in [False, True]: + root = self.loadEmptyScene() + if useCustomFPS: + videoNode = avg.VideoNode(parent=root, loop=True, size=(96,96), fps=25, + threaded=False, href="mjpeg-48x48.avi") + else: + videoNode = avg.VideoNode(parent=root, loop=True, size=(96,96), + threaded=False, href="mjpeg-48x48.avi") + + videoNode.play() + seek(26) + self.start(False, + (checkCurFrame, + lambda: self.compareImage("testVideoSeek0"), + lambda: seek(100), + lambda: self.compareImage("testVideoSeek1"), + lambda: videoNode.pause(), + lambda: seek(26), + None, + lambda: self.compareImage("testVideoSeek2"), + lambda: videoNode.play(), + None, + lambda: self.compareImage("testVideoSeek3") + )) + + def checkSeek(): + seek(26) + self.assertNotEqual(videoNode.getCurFrame(), 0) + + def testVideoFPS(self): + player.setFakeFPS(25) + root = self.loadEmptyScene() + root = root + videoNode = avg.VideoNode(size=(80,80), loop=True, threaded=False, + href="mjpeg-48x48.avi", fps=250, parent=root) + self.start(False, + (lambda: videoNode.play(), + None, + lambda: self.compareImage("testVideoFPS") + )) + + def testVideoLoop(self): + def onEOF(): + self.eof = True + + def onFrame(): + if self.eof: + if not(threaded): + self.compareImage("testVideoLoop") + player.stop() + + for threaded in [False, True]: + self.eof = False + player.setFakeFPS(25) + root = self.loadEmptyScene() + videoNode = avg.VideoNode(parent=root, loop=True, fps=25, size=(96,96), + threaded=threaded, href="mpeg1-48x48.mov") + videoNode.subscribe(avg.Node.END_OF_FILE, onEOF) + videoNode.play() + player.subscribe(player.ON_FRAME, onFrame) + player.play() + + def testVideoMask(self): + def testWithFile(filename, testImgName): + def setMask(href): + video.maskhref = href + + def setOpacity(): + video.opacity = 0.5 + + player.setFakeFPS(25) + root = self.loadEmptyScene() + video = avg.VideoNode(href=filename, threaded=False, + parent=root) + video.play() + self.start(False, + (lambda: setMask("mask.png"), + lambda: self.compareImage(testImgName+"1"), + lambda: video.seekToFrame(10), + lambda: setMask(""), + lambda: self.compareImage(testImgName+"2"), + lambda: setMask("mask2.png"), + lambda: self.compareImage(testImgName+"3"), + setOpacity, + lambda: self.compareImage(testImgName+"4"), + )) + + testWithFile("mpeg1-48x48.mov", "testVideoMaskYUV") + testWithFile("mjpeg-48x48.avi", "testVideoMaskYUVJ") + testWithFile("rgba-48x48.mov", "testVideoMaskRGBA") + + def testException(self): + class TestException(Exception): + pass + + def throwException(): + raise TestException + + player.setFakeFPS(0.1) + videoNode = avg.VideoNode(threaded = False) + videoNode.href = "../testmediadir/mjpeg-48x48.avi" + videoNode.subscribe(avg.Node.END_OF_FILE, throwException) + + root = self.loadEmptyScene() + root.appendChild(videoNode) + + self.__exceptionThrown = False + try: + self.start(False, + (videoNode.pause, + lambda: videoNode.seekToFrame(videoNode.getNumFrames()), + videoNode.play, + lambda: None + )) + except TestException: + self.__exceptionThrown = True + + self.assert_(self.__exceptionThrown) + + def testVideoEOF(self): + player.setFakeFPS(25) + for filename in ["mpeg1-48x48.mov", "mpeg1-48x48-sound.avi"]: + node = avg.VideoNode(href=filename) + self.testEOF(node) + node = avg.VideoNode(href="mpeg1-48x48.mov", opacity=0) + self.testEOF(node) + + root = self.loadEmptyScene() + video = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False, + parent=root) + player.setFakeFPS(0.1) + + # Should never be called + eofID = video.subscribe(avg.Node.END_OF_FILE, lambda: self.assert_(False)) + self.start(False, + (lambda: video.unsubscribe(avg.Node.END_OF_FILE, eofID), + video.play, + None + )) + + def testVideoSeekAfterEOF(self): + def onEOF(): + node.seekToTime(0) + player.subscribe(avg.Player.ON_FRAME, onFrame) + + def onFrame(): + if node.getCurTime() < 100: + self.compareImage("testSeekAfterEOF") + player.stop() + + def onNoEOF(): + self.fail("No EOF") + + player.setFakeFPS(25) + root = self.loadEmptyScene() + node = avg.VideoNode(href="mpeg1-48x48.mov", parent=root) + node.play() + node.subscribe(avg.VideoNode.END_OF_FILE, onEOF) + player.setTimeout(100000, onNoEOF) + player.play() + + def testSound(self): + def testSoundFile(filename): + def setVolume(volume): + node.volume = volume + + def testGetVolume(volume): + self.assertAlmostEqual(node.volume, volume) + + root = self.loadEmptyScene() + node = avg.SoundNode(href=filename, parent=root) + self.start(False, + (lambda: setVolume(0.5), + lambda: testGetVolume(0.5), + lambda: node.play(), + None, + lambda: node.stop(), + lambda: node.play(), + lambda: node.pause(), + lambda: node.play(), + lambda: setVolume(0.5), + lambda: testGetVolume(0.5), + lambda: node.pause(), + lambda: node.stop(), + lambda: setVolume(0.3), + lambda: testGetVolume(0.3), + lambda: node.pause() + )) + player.setFakeFPS(-1) + player.volume = 0 + # "44.1kHz_mono.ogg" not tested for now - broken under Windows. + # Assuming buggy ffmpeg version. + for filename in ["22.050Hz_16bit_mono.wav", "44.1kHz_16bit_stereo.aif", + "44.1kHz_16bit_stereo.wav", "44.1kHz_stereo.mp3", + "48kHz_24bit_stereo.wav"]: + testSoundFile(filename) + + def testSoundInfo(self): + def checkInfo(): + node.pause() + self.assertEqual(node.getAudioCodec(), "pcm_s16le") + self.assertEqual(node.getAudioSampleRate(), 44100) + self.assertEqual(node.getNumAudioChannels(), 2) + + def checkExceptions(): + node = avg.SoundNode(href="44.1kHz_16bit_stereo.wav") + self.assertException(node.getAudioCodec) + self.assertException(node.getAudioSampleRate) + self.assertException(node.getNumAudioChannels) + + def checkVideoFile(): + node = avg.SoundNode(href="mpeg1-48x48.mov", parent=root) + self.assertException(node.pause) + + root = self.loadEmptyScene() + node = avg.SoundNode(href="44.1kHz_16bit_stereo.wav", parent=root) + checkInfo() + checkExceptions() + self.start(False, + (checkInfo, + checkExceptions, + checkVideoFile, + )) + + def testSoundSeek(self): + player.setFakeFPS(-1) + player.volume = 0 + root = self.loadEmptyScene() + soundNode = avg.SoundNode(parent=root, href="44.1kHz_16bit_stereo.wav") + soundNode.play() + soundNode.seekToTime(500) + self.start(False, + (None, + lambda: soundNode.seekToTime(200), + )) + + + def testBrokenSound(self): + def openSound(): + node = avg.SoundNode(href="44.1kHz_16bit_6Chan.ogg", parent=root) + self.assertException(node.play) + + root = self.loadEmptyScene() + self.start(False, [openSound]) + + def testSoundEOF(self): + player.setFakeFPS(-1) + player.volume = 0 + node = avg.SoundNode(href="44.1kHz_16bit_mono.wav") + self.testEOF(node) + + def testVideoWriter(self): + + def startWriter(fps, syncToPlayback): + self.videoWriter = avg.VideoWriter(canvas, "test.mov", fps, 3, 5, + syncToPlayback) + + def stopWriter(): + self.videoWriter.stop() + + def killWriter(): + self.videoWriter = None + + def pauseWriter(): + self.videoWriter.pause() + + def playWriter(): + self.videoWriter.play() + + def hideVideo(): + videoNode.opacity = 0 + + def showVideo(): + videoNode.opacity = 1 + + def checkVideo(numFrames): + savedVideoNode = avg.VideoNode(href="../test.mov", pos=(48,0), + threaded=False, parent=root) + savedVideoNode.pause() + self.assertEqual(savedVideoNode.getVideoCodec(), "mjpeg") + self.assertEqual(savedVideoNode.getNumFrames(), numFrames) + self.assertEqual(savedVideoNode.getStreamPixelFormat(), "yuvj420p") + + def testCreateException(): + self.assertException(lambda: avg.VideoWriter(player.getMainCanvas(), + "nonexistentdir/test.mov", 30)) + + if not(self._isCurrentDirWriteable()): + self.skip("Current dir not writeable.") + return + if player.isUsingGLES(): + self.skip("VideoWriter not supported under GLES.") + return + + self.assertException(lambda: + avg.VideoWriter(player.getMainCanvas(), "test.mov", 30, 3, 5, False)) + + for useCanvas in (False, True): + player.setFakeFPS(30) + + root = self.loadEmptyScene() + videoNode = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False) + if useCanvas: + canvas = player.createCanvas(id="canvas", size=(48,48), + mediadir="media") + canvas.getRootNode().appendChild(videoNode) + avg.ImageNode(parent=root, href="canvas:canvas") + testImageName = "testVideoWriterCanvas" + else: + root.appendChild(videoNode) + canvas = player.getMainCanvas() + testImageName = "testVideoWriter" + + self.start(False, + (videoNode.play, + lambda: startWriter(30, True), + lambda: self.delay(100), + stopWriter, + killWriter, + lambda: checkVideo(4), + hideVideo, + lambda: self.compareImage(testImageName+"1"), + showVideo, + testCreateException, + lambda: startWriter(15, False), + lambda: self.delay(150), + stopWriter, + killWriter, + lambda: checkVideo(2), + lambda: startWriter(30, False), + pauseWriter, + lambda: self.delay(200), + playWriter, + stopWriter, + killWriter, + lambda: checkVideo(1), + lambda: startWriter(30, False), + killWriter, + lambda: checkVideo(1), + )) + os.remove("test.mov") + + def test2VideosAtOnce(self): + player.setFakeFPS(25) + self.loadEmptyScene() + root = player.getRootNode() + for pos in ((0,0), (80,0)): + video = avg.VideoNode(pos=pos, threaded=False, href="mpeg1-48x48.mov", + parent=root) + video.play() + self.start(False, + [lambda: self.compareImage("test2VideosAtOnce1"),]) + + def testVideoAccel(self): + accelConfig = avg.VideoNode.getVideoAccelConfig() + video = avg.VideoNode(accelerated=False, href="media/mpeg1-48x48.mov") + video.play() + self.assertEqual(video.accelerated, False) + video = avg.VideoNode(accelerated=True, href="media/mpeg1-48x48.mov") + video.play() + self.assertEqual(video.accelerated, (accelConfig != avg.NO_ACCELERATION)) + + +def AVTestSuite(tests): + availableTests = [ + "testSound", + "testSoundInfo", + "testSoundSeek", + "testBrokenSound", + "testSoundEOF", + "testVideoInfo", + "testVideoFiles", + "testPlayBeforeConnect", + "testVideoState", + "testVideoActive", + "testVideoHRef", + "testVideoOpacity", + "testVideoSeek", + "testVideoFPS", + "testVideoLoop", + "testVideoMask", + "testVideoEOF", + "testVideoSeekAfterEOF", + "testException", + "testVideoWriter", + "test2VideosAtOnce", + "testVideoAccel", + ] + return createAVGTestSuite(availableTests, AVTestCase, tests) + |