#!/usr/bin/python # -*- coding: utf-8 -*- # libavg - Media Playback Engine. # Copyright (C) 2003-2014 Ulrich von Zadow # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Current versions can be found at www.libavg.de # from libavg import avg, player from testcase import * class AVTestCase(AVGTestCase): def __init__(self, testFuncName): AVGTestCase.__init__(self, testFuncName) def setUp(self): AVGTestCase.setUp(self) def testEOF(self, node): def onEOF(): player.stop() def onNoEOF(): self.fail("No EOF") def onSubscribeEOF(): self.eofCalled = True self.eofCalled = False root = self.loadEmptyScene() root.appendChild(node) node.play() node.setEOFCallback(onEOF) node.subscribe(avg.Node.END_OF_FILE, onSubscribeEOF) player.setTimeout(100000, onNoEOF) player.play() self.assert_(self.eofCalled) def testVideoInfo(self): def checkInfo(): node.pause() self.assertEqual(node.getContainerFormat(), "avi") self.assertEqual(node.getCurFrame(), 0) self.assertEqual(node.getCurTime(), 0) self.assertEqual(node.getDuration(), 1000) self.assertEqual(node.getBitrate(), 224064) self.assertEqual(node.getVideoCodec(), "mpeg4") self.assertEqual(node.getStreamPixelFormat(), "yuv420p") self.assertEqual(node.getVideoDuration(), 1000) if isThreaded: self.assertEqual(node.getAudioCodec(), "mp2") self.assertEqual(node.getAudioSampleRate(), 44100) self.assertEqual(node.getNumAudioChannels(), 2) self.assert_(node.getVideoDuration() >= 1000) def checkEnableSound(): node = avg.VideoNode(href="mpeg1-48x48-sound.avi", threaded=isThreaded, enablesound=False, parent=root) node.pause() self.assertEqual(node.getVideoCodec(), "mpeg4") self.assertException(node.getAudioCodec) def checkExceptions(): node = avg.VideoNode(href="mpeg1-48x48.mov", threaded=isThreaded) self.assertException(node.getDuration) self.assertException(node.getBitrate) self.assertException(node.getVideoCodec) self.assertException(node.getStreamPixelFormat) node.pause() self.assertException(node.getAudioCodec) self.assertException(node.getAudioSampleRate) self.assertException(node.getNumAudioChannels) root.appendChild(node) def checkAudioFile(): node = avg.VideoNode(href="44.1kHz_16bit_stereo.wav", threaded=isThreaded, parent=root) self.assertException(node.pause) sys.stderr.write("\n") for isThreaded in (False, True): sys.stderr.write(" Threaded: " + str(isThreaded) + "\n") root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48-sound.avi", threaded=isThreaded, parent=root) checkInfo() checkEnableSound() checkExceptions() self.start(False, (checkInfo, checkExceptions, checkAudioFile, )) sys.stderr.write(" Nonstandard queue length\n") root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48-sound.avi", queuelength=23, parent=root) self.assertEqual(node.queuelength, 23) def testVideoFiles(self): def testVideoFile(filename, isThreaded): def setVolume(volume): node.volume = volume def testGetVolume(volume): self.assertAlmostEqual(node.volume, volume) def checkImage(filename): if not(isThreaded): self.compareImage("testVideo-"+filename+"1") def testInfo(): if filename == "mpeg1-48x48-sound.avi" and isThreaded: self.assert_(node.hasAudio()) else: self.assert_(not(node.hasAudio())) self.assert_((filename == "rgba-48x48.mov" or filename == "vp6a-yuva-48x48.flv") == node.hasAlpha()) root = self.loadEmptyScene() node = avg.VideoNode(href=filename, volume=0.8, size=(96,96), threaded=isThreaded) self.assertEqual(node.threaded, isThreaded) setVolume(0.6) root.appendChild(node) self.assertException(node.hasAudio) self.start(False, (lambda: setVolume(0.5), lambda: testGetVolume(0.5), lambda: node.play(), lambda: checkImage(filename), lambda: setVolume(0.3), lambda: testGetVolume(0.3), testInfo, lambda: node.stop() )) videoFiles = ["mjpeg-48x48.avi", "mpeg1-48x48.mov", #"mpeg1-48x48-sound.avi", "rgba-48x48.mov", "h264-48x48.h264", "vp6a-yuva-48x48.flv"] sys.stderr.write("\n") for filename in videoFiles: sys.stderr.write(" "+filename+"\n") for isThreaded in [False, True]: sys.stderr.write(" threaded: "+str(isThreaded)+"\n") testVideoFile(filename, isThreaded) def testPlayBeforeConnect(self): node = avg.VideoNode(href="media/mpeg1-48x48.mov", threaded=False) node.play() player.createMainCanvas(size=(160,120)) root = player.getRootNode() root.insertChild(node, 0) player.setFakeFPS(25) self.start(False, (lambda: self.assertEqual(node.size, (48, 48)), lambda: self.compareImage("testPlayBeforeConnect"), )) def testVideoState(self): for accelerated in [True, False]: root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48.mov", size=(96,96), threaded=False, accelerated=accelerated, parent=root) player.setFakeFPS(25) self.start(False, (lambda: node.play(), lambda: self.compareImage("testVideoState1"), lambda: node.pause(), lambda: self.compareImage("testVideoState2"), lambda: self.compareImage("testVideoState2"), lambda: node.play(), lambda: self.compareImage("testVideoState3"), lambda: node.stop(), lambda: self.compareImage("testVideoState4"), lambda: node.pause(), lambda: self.compareImage("testVideoState5"), lambda: self.compareImage("testVideoState5"), lambda: node.stop(), lambda: self.compareImage("testVideoState4"), )) def testVideoActive(self): def deactivate(): node.active=0 def activate(): node.active=1 root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48.mov", size=(96,96), threaded=False, parent=root) player.setFakeFPS(25) self.start(False, (lambda: node.play(), deactivate, lambda: self.compareImage("testVideoActive1"), activate, lambda: self.compareImage("testVideoActive2") )) def testVideoHRef(self): def testGetMediaSize(): self.assertEqual(node.getMediaSize(), (48, 48)) def setHRefLoaded(): node.href = "h264-48x48.h264" def setHRefUnloaded(): node = avg.VideoNode() node.href = "h264-48x48.h264" node.play() def testVideoNotFound(): # Missing file, but no play() or pause(): Should just work. node = avg.VideoNode(href="MissingFile.mov") node.href = "SecondMissingFile.mov" # Now libavg notices the missing file. self.assertException(node.play) def testVideoBroken(): node = avg.VideoNode(href="rgb24-64x64.png") self.assertException(node.play) root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False, parent=root) player.setFakeFPS(25) testVideoNotFound() testVideoBroken() setHRefUnloaded() self.start(False, (lambda: node.play(), testGetMediaSize, setHRefLoaded, lambda: self.compareImage("testVideoHRef1"), testGetMediaSize, testVideoNotFound, testVideoBroken )) def testVideoOpacity(self): def testWithFile(filename, testImgName): def hide(): self.videoNode.opacity=0 def show(): self.videoNode.opacity=1 player.setFakeFPS(25) root = self.loadEmptyScene() self.videoNode = avg.VideoNode(href=filename, loop=True, threaded=False, parent=root) self.start(False, (lambda: self.videoNode.play(), None, lambda: self.compareImage(testImgName+"1"), hide, None, None, show, lambda: self.compareImage(testImgName+"2") )) testWithFile("rgba-48x48.mov", "testVideoOpacityRGBA") testWithFile("mpeg1-48x48.mov", "testVideoOpacityYUV") def testVideoSeek(self): def seek(frame): videoNode.seekToFrame(frame) def checkCurFrame(): self.assertEqual(videoNode.getCurFrame(), 26) player.setFakeFPS(25) for useCustomFPS in [False, True]: root = self.loadEmptyScene() if useCustomFPS: videoNode = avg.VideoNode(parent=root, loop=True, size=(96,96), fps=25, threaded=False, href="mjpeg-48x48.avi") else: videoNode = avg.VideoNode(parent=root, loop=True, size=(96,96), threaded=False, href="mjpeg-48x48.avi") videoNode.play() seek(26) self.start(False, (checkCurFrame, lambda: self.compareImage("testVideoSeek0"), lambda: seek(100), lambda: self.compareImage("testVideoSeek1"), lambda: videoNode.pause(), lambda: seek(26), None, lambda: self.compareImage("testVideoSeek2"), lambda: videoNode.play(), None, lambda: self.compareImage("testVideoSeek3") )) def checkSeek(): seek(26) self.assertNotEqual(videoNode.getCurFrame(), 0) def testVideoFPS(self): player.setFakeFPS(25) root = self.loadEmptyScene() root = root videoNode = avg.VideoNode(size=(80,80), loop=True, threaded=False, href="mjpeg-48x48.avi", fps=250, parent=root) self.start(False, (lambda: videoNode.play(), None, lambda: self.compareImage("testVideoFPS") )) def testVideoLoop(self): def onEOF(): self.eof = True def onFrame(): if self.eof: if not(threaded): self.compareImage("testVideoLoop") player.stop() for threaded in [False, True]: self.eof = False player.setFakeFPS(25) root = self.loadEmptyScene() videoNode = avg.VideoNode(parent=root, loop=True, fps=25, size=(96,96), threaded=threaded, href="mpeg1-48x48.mov") videoNode.subscribe(avg.Node.END_OF_FILE, onEOF) videoNode.play() player.subscribe(player.ON_FRAME, onFrame) player.play() def testVideoMask(self): def testWithFile(filename, testImgName): def setMask(href): video.maskhref = href def setOpacity(): video.opacity = 0.5 player.setFakeFPS(25) root = self.loadEmptyScene() video = avg.VideoNode(href=filename, threaded=False, parent=root) video.play() self.start(False, (lambda: setMask("mask.png"), lambda: self.compareImage(testImgName+"1"), lambda: video.seekToFrame(10), lambda: setMask(""), lambda: self.compareImage(testImgName+"2"), lambda: setMask("mask2.png"), lambda: self.compareImage(testImgName+"3"), setOpacity, lambda: self.compareImage(testImgName+"4"), )) testWithFile("mpeg1-48x48.mov", "testVideoMaskYUV") testWithFile("mjpeg-48x48.avi", "testVideoMaskYUVJ") testWithFile("rgba-48x48.mov", "testVideoMaskRGBA") def testException(self): class TestException(Exception): pass def throwException(): raise TestException player.setFakeFPS(0.1) videoNode = avg.VideoNode(threaded = False) videoNode.href = "../testmediadir/mjpeg-48x48.avi" videoNode.subscribe(avg.Node.END_OF_FILE, throwException) root = self.loadEmptyScene() root.appendChild(videoNode) self.__exceptionThrown = False try: self.start(False, (videoNode.pause, lambda: videoNode.seekToFrame(videoNode.getNumFrames()), videoNode.play, lambda: None )) except TestException: self.__exceptionThrown = True self.assert_(self.__exceptionThrown) def testVideoEOF(self): player.setFakeFPS(25) for filename in ["mpeg1-48x48.mov", "mpeg1-48x48-sound.avi"]: node = avg.VideoNode(href=filename) self.testEOF(node) node = avg.VideoNode(href="mpeg1-48x48.mov", opacity=0) self.testEOF(node) root = self.loadEmptyScene() video = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False, parent=root) player.setFakeFPS(0.1) # Should never be called eofID = video.subscribe(avg.Node.END_OF_FILE, lambda: self.assert_(False)) self.start(False, (lambda: video.unsubscribe(avg.Node.END_OF_FILE, eofID), video.play, None )) def testVideoSeekAfterEOF(self): def onEOF(): node.seekToTime(0) player.subscribe(avg.Player.ON_FRAME, onFrame) def onFrame(): if node.getCurTime() < 100: self.compareImage("testSeekAfterEOF") player.stop() def onNoEOF(): self.fail("No EOF") player.setFakeFPS(25) root = self.loadEmptyScene() node = avg.VideoNode(href="mpeg1-48x48.mov", parent=root) node.play() node.subscribe(avg.VideoNode.END_OF_FILE, onEOF) player.setTimeout(100000, onNoEOF) player.play() def testSound(self): def testSoundFile(filename): def setVolume(volume): node.volume = volume def testGetVolume(volume): self.assertAlmostEqual(node.volume, volume) root = self.loadEmptyScene() node = avg.SoundNode(href=filename, parent=root) self.start(False, (lambda: setVolume(0.5), lambda: testGetVolume(0.5), lambda: node.play(), None, lambda: node.stop(), lambda: node.play(), lambda: node.pause(), lambda: node.play(), lambda: setVolume(0.5), lambda: testGetVolume(0.5), lambda: node.pause(), lambda: node.stop(), lambda: setVolume(0.3), lambda: testGetVolume(0.3), lambda: node.pause() )) player.setFakeFPS(-1) player.volume = 0 # "44.1kHz_mono.ogg" not tested for now - broken under Windows. # Assuming buggy ffmpeg version. for filename in ["22.050Hz_16bit_mono.wav", "44.1kHz_16bit_stereo.aif", "44.1kHz_16bit_stereo.wav", "44.1kHz_stereo.mp3", "48kHz_24bit_stereo.wav"]: testSoundFile(filename) def testSoundInfo(self): def checkInfo(): node.pause() self.assertEqual(node.getAudioCodec(), "pcm_s16le") self.assertEqual(node.getAudioSampleRate(), 44100) self.assertEqual(node.getNumAudioChannels(), 2) def checkExceptions(): node = avg.SoundNode(href="44.1kHz_16bit_stereo.wav") self.assertException(node.getAudioCodec) self.assertException(node.getAudioSampleRate) self.assertException(node.getNumAudioChannels) def checkVideoFile(): node = avg.SoundNode(href="mpeg1-48x48.mov", parent=root) self.assertException(node.pause) root = self.loadEmptyScene() node = avg.SoundNode(href="44.1kHz_16bit_stereo.wav", parent=root) checkInfo() checkExceptions() self.start(False, (checkInfo, checkExceptions, checkVideoFile, )) def testSoundSeek(self): player.setFakeFPS(-1) player.volume = 0 root = self.loadEmptyScene() soundNode = avg.SoundNode(parent=root, href="44.1kHz_16bit_stereo.wav") soundNode.play() soundNode.seekToTime(500) self.start(False, (None, lambda: soundNode.seekToTime(200), )) def testBrokenSound(self): def openSound(): node = avg.SoundNode(href="44.1kHz_16bit_6Chan.ogg", parent=root) self.assertException(node.play) root = self.loadEmptyScene() self.start(False, [openSound]) def testSoundEOF(self): player.setFakeFPS(-1) player.volume = 0 node = avg.SoundNode(href="44.1kHz_16bit_mono.wav") self.testEOF(node) def testVideoWriter(self): def startWriter(fps, syncToPlayback): self.videoWriter = avg.VideoWriter(canvas, "test.mov", fps, 3, 5, syncToPlayback) def stopWriter(): self.videoWriter.stop() def killWriter(): self.videoWriter = None def pauseWriter(): self.videoWriter.pause() def playWriter(): self.videoWriter.play() def hideVideo(): videoNode.opacity = 0 def showVideo(): videoNode.opacity = 1 def checkVideo(numFrames): savedVideoNode = avg.VideoNode(href="../test.mov", pos=(48,0), threaded=False, parent=root) savedVideoNode.pause() self.assertEqual(savedVideoNode.getVideoCodec(), "mjpeg") self.assertEqual(savedVideoNode.getNumFrames(), numFrames) self.assertEqual(savedVideoNode.getStreamPixelFormat(), "yuvj420p") def testCreateException(): self.assertException(lambda: avg.VideoWriter(player.getMainCanvas(), "nonexistentdir/test.mov", 30)) if not(self._isCurrentDirWriteable()): self.skip("Current dir not writeable.") return if player.isUsingGLES(): self.skip("VideoWriter not supported under GLES.") return self.assertException(lambda: avg.VideoWriter(player.getMainCanvas(), "test.mov", 30, 3, 5, False)) for useCanvas in (False, True): player.setFakeFPS(30) root = self.loadEmptyScene() videoNode = avg.VideoNode(href="mpeg1-48x48.mov", threaded=False) if useCanvas: canvas = player.createCanvas(id="canvas", size=(48,48), mediadir="media") canvas.getRootNode().appendChild(videoNode) avg.ImageNode(parent=root, href="canvas:canvas") testImageName = "testVideoWriterCanvas" else: root.appendChild(videoNode) canvas = player.getMainCanvas() testImageName = "testVideoWriter" self.start(False, (videoNode.play, lambda: startWriter(30, True), lambda: self.delay(100), stopWriter, killWriter, lambda: checkVideo(4), hideVideo, lambda: self.compareImage(testImageName+"1"), showVideo, testCreateException, lambda: startWriter(15, False), lambda: self.delay(150), stopWriter, killWriter, lambda: checkVideo(2), lambda: startWriter(30, False), pauseWriter, lambda: self.delay(200), playWriter, stopWriter, killWriter, lambda: checkVideo(1), lambda: startWriter(30, False), killWriter, lambda: checkVideo(1), )) os.remove("test.mov") def test2VideosAtOnce(self): player.setFakeFPS(25) self.loadEmptyScene() root = player.getRootNode() for pos in ((0,0), (80,0)): video = avg.VideoNode(pos=pos, threaded=False, href="mpeg1-48x48.mov", parent=root) video.play() self.start(False, [lambda: self.compareImage("test2VideosAtOnce1"),]) def testVideoAccel(self): accelConfig = avg.VideoNode.getVideoAccelConfig() video = avg.VideoNode(accelerated=False, href="media/mpeg1-48x48.mov") video.play() self.assertEqual(video.accelerated, False) video = avg.VideoNode(accelerated=True, href="media/mpeg1-48x48.mov") video.play() self.assertEqual(video.accelerated, (accelConfig != avg.NO_ACCELERATION)) def AVTestSuite(tests): availableTests = [ "testSound", "testSoundInfo", "testSoundSeek", "testBrokenSound", "testSoundEOF", "testVideoInfo", "testVideoFiles", "testPlayBeforeConnect", "testVideoState", "testVideoActive", "testVideoHRef", "testVideoOpacity", "testVideoSeek", "testVideoFPS", "testVideoLoop", "testVideoMask", "testVideoEOF", "testVideoSeekAfterEOF", "testException", "testVideoWriter", "test2VideosAtOnce", "testVideoAccel", ] return createAVGTestSuite(availableTests, AVTestCase, tests)