!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache. PHP/5.6.40 

uname -a: Linux cpanel06wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.80.el6.x86_64 #1 SMP Thu Sep 24
01:42:00 EDT 2020 x86_64
 

uid=851(cp949260) gid=853(cp949260) groups=853(cp949260) 

Safe-mode: OFF (not secure)

/usr/lib/python2.6/site-packages/supervisor/   drwxr-xr-x
Free 201.86 GB of 981.82 GB (20.56%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test.py (114.79 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |

"""Test suite for supervisord.py."""

import os
import stat
import sys
import time
import signal
import tempfile
import unittest
import socket
import pickle
import pwd
import errno
from StringIO import StringIO
from datetime import datetime

import supervisord
import datatypes
import xmlrpc
import http
from supervisord import ProcessStates
from supervisord import SupervisorStates
try:
    __file__
except:
    __file__ = sys.argv[0]


DEBUG = 0


import unittest

class ServerOptionsTests(unittest.TestCase):
    def _getTargetClass(self):
        from options import ServerOptions
        return ServerOptions

    def _makeOne(self):
        return self._getTargetClass()()
        
    def test_options(self):
        s = """[supervisord]
http_port=127.0.0.1:8999 ; (default is to run no xmlrpc server)
http_username=chrism     ; (default is no username (open system))
http_password=foo        ; (default is no password (open system))
directory=%(tempdir)s     ; (default is not to cd during daemonization)
backofflimit=10            ; (default 3)
user=root                  ; (default is current user, required if root)
umask=022                  ; (default 022)
logfile=supervisord.log    ; (default supervisord.log)
logfile_maxbytes=1000MB    ; (default 50MB)
logfile_backups=5          ; (default 10)
loglevel=error             ; (default info)
pidfile=supervisord.pid    ; (default supervisord.pid)
nodaemon=true              ; (default false)
identifier=fleeb           ; (default supervisor)
childlogdir=%(tempdir)s           ; (default tempfile.gettempdir())
nocleanup=true             ; (default false)
minfds=2048                ; (default 1024)
minprocs=300               ; (default 200)

[program:cat]
command=/bin/cat
priority=1
autostart=true
autorestart=true
user=root
logfile=/tmp/cat.log
stopsignal=KILL
stopwaitsecs=5
startsecs=5
startretries=10

[program:cat2]
command=/bin/cat
autostart=true
autorestart=false
logfile_maxbytes = 1024
logfile_backups = 2
logfile = /tmp/cat2.log

[program:cat3]
command=/bin/cat
exitcodes=0,1,127
""" % {'tempdir':tempfile.gettempdir()}

        from StringIO import StringIO
        fp = StringIO(s)
        instance = self._makeOne()
        instance.configfile = fp
        instance.realize()
        options = instance.configroot.supervisord
        import socket
        self.assertEqual(options.directory, tempfile.gettempdir())
        self.assertEqual(options.umask, 022)
        self.assertEqual(options.logfile, 'supervisord.log')
        self.assertEqual(options.logfile_maxbytes, 1000 * 1024 * 1024)
        self.assertEqual(options.logfile_backups, 5)
        self.assertEqual(options.loglevel, 40)
        self.assertEqual(options.pidfile, 'supervisord.pid')
        self.assertEqual(options.nodaemon, True)
        self.assertEqual(options.identifier, 'fleeb')
        self.assertEqual(options.childlogdir, tempfile.gettempdir())
        self.assertEqual(options.http_port.family, socket.AF_INET)
        self.assertEqual(options.http_port.address, ('127.0.0.1', 8999))
        self.assertEqual(options.http_username, 'chrism')
        self.assertEqual(options.http_password, 'foo')
        self.assertEqual(options.nocleanup, True)
        self.assertEqual(options.minfds, 2048)
        self.assertEqual(options.minprocs, 300)
        self.assertEqual(options.nocleanup, True)
        self.assertEqual(len(options.programs), 3)

        cat = options.programs[0]
        self.assertEqual(cat.name, 'cat')
        self.assertEqual(cat.command, '/bin/cat')
        self.assertEqual(cat.priority, 1)
        self.assertEqual(cat.autostart, True)
        self.assertEqual(cat.autorestart, True)
        self.assertEqual(cat.startsecs, 5)
        self.assertEqual(cat.startretries, 10)
        self.assertEqual(cat.uid, 0)
        self.assertEqual(cat.logfile, '/tmp/cat.log')
        self.assertEqual(cat.stopsignal, signal.SIGKILL)
        self.assertEqual(cat.stopwaitsecs, 5)
        self.assertEqual(cat.logfile_maxbytes, datatypes.byte_size('50MB'))
        self.assertEqual(cat.logfile_backups, 10)
        self.assertEqual(cat.exitcodes, [0,2])

        cat2 = options.programs[1]
        self.assertEqual(cat2.name, 'cat2')
        self.assertEqual(cat2.command, '/bin/cat')
        self.assertEqual(cat2.priority, 999)
        self.assertEqual(cat2.autostart, True)
        self.assertEqual(cat2.autorestart, False)
        self.assertEqual(cat2.uid, None)
        self.assertEqual(cat2.logfile, '/tmp/cat2.log')
        self.assertEqual(cat2.stopsignal, signal.SIGTERM)
        self.assertEqual(cat2.logfile_maxbytes, 1024)
        self.assertEqual(cat2.logfile_backups, 2)
        self.assertEqual(cat2.exitcodes, [0,2])

        cat3 = options.programs[2]
        self.assertEqual(cat3.name, 'cat3')
        self.assertEqual(cat3.command, '/bin/cat')
        self.assertEqual(cat3.priority, 999)
        self.assertEqual(cat3.autostart, True)
        self.assertEqual(cat3.autorestart, True)
        self.assertEqual(cat3.uid, None)
        self.assertEqual(cat3.logfile, instance.AUTOMATIC)
        self.assertEqual(cat3.logfile_maxbytes, datatypes.byte_size('50MB'))
        self.assertEqual(cat3.logfile_backups, 10)
        self.assertEqual(cat3.exitcodes, [0,1,127])
        
        self.assertEqual(cat2.stopsignal, signal.SIGTERM)

        here = os.path.abspath(os.getcwd())
        self.assertEqual(instance.uid, 0)
        self.assertEqual(instance.gid, 0)
        self.assertEqual(instance.directory, '/tmp')
        self.assertEqual(instance.umask, 022)
        self.assertEqual(instance.logfile, os.path.join(here,'supervisord.log'))
        self.assertEqual(instance.logfile_maxbytes, 1000 * 1024 * 1024)
        self.assertEqual(instance.logfile_backups, 5)
        self.assertEqual(instance.loglevel, 40)
        self.assertEqual(instance.pidfile, os.path.join(here,'supervisord.pid'))
        self.assertEqual(instance.nodaemon, True)
        self.assertEqual(instance.passwdfile, None)
        self.assertEqual(instance.identifier, 'fleeb')
        self.assertEqual(instance.childlogdir, tempfile.gettempdir())
        self.assertEqual(instance.http_port.family, socket.AF_INET)
        self.assertEqual(instance.http_port.address, ('127.0.0.1', 8999))
        self.assertEqual(instance.http_username, 'chrism')
        self.assertEqual(instance.http_password, 'foo')
        self.assertEqual(instance.nocleanup, True)
        self.assertEqual(instance.minfds, 2048)
        self.assertEqual(instance.minprocs, 300)

    def test_readFile_failed(self):
        from options import readFile
        try:
            readFile('/notthere', 0, 10)
        except ValueError, inst:
            self.assertEqual(inst.args[0], 'FAILED')
        else:
            raise AssertionError("Didn't raise")

    def test_check_execv_args_cant_find_command(self):
        instance = self._makeOne()
        result = instance.check_execv_args('/not/there', None, None)
        self.assertEqual(result, "can't find command '/not/there'")

    def test_check_execv_args_notexecutable(self):
        instance = self._makeOne()
        result = instance.check_execv_args('/etc/passwd', None,
                                           os.stat('/etc/passwd'))
        self.assertEqual(result, "command at '/etc/passwd' is not executable")

    def test_check_execv_args_isdir(self):
        instance = self._makeOne()
        result = instance.check_execv_args('/', None, os.stat('/'))
        self.assertEqual(result, "command at '/' is a directory")

    def test_cleanup_afunix_unlink(self):
        import tempfile
        fn = tempfile.mktemp()
        f = open(fn, 'w')
        f.write('foo')
        f.close()
        instance = self._makeOne()
        import socket
        class Port:
            family = socket.AF_UNIX
            address = fn
        class Server:
            pass
        instance.http_port = Port()
        instance.httpserver = Server()
        instance.pidfile = ''
        instance.cleanup()
        self.failIf(os.path.exists(fn))

    def test_cleanup_afunix_nounlink(self):
        import tempfile
        fn = tempfile.mktemp()
        try:
            f = open(fn, 'w')
            f.write('foo')
            f.close()
            instance = self._makeOne()
            import socket
            class Port:
                family = socket.AF_UNIX
                address = fn
            class Server:
                pass
            instance.http_port = Port()
            instance.httpserver = Server()
            instance.pidfile = ''
            instance.unlink_socketfile = False
            instance.cleanup()
            self.failUnless(os.path.exists(fn))
        finally:
            try:
                os.unlink(fn)
            except os.error:
                pass

    def test_write_pidfile_ok(self):
        import tempfile
        fn = tempfile.mktemp()
        try:
            instance = self._makeOne()
            instance.logger = DummyLogger()
            instance.pidfile = fn
            instance.write_pidfile()
            self.failUnless(os.path.exists(fn))
            pid = int(open(fn, 'r').read()[:-1])
            self.assertEqual(pid, os.getpid())
            msg = instance.logger.data[0]
            self.failUnless(msg.startswith('supervisord started with pid'))
        finally:
            try:
                os.unlink(fn)
            except os.error:
                pass

    def test_write_pidfile_fail(self):
        import tempfile
        fn = '/cannot/possibly/exist'
        instance = self._makeOne()
        instance.logger = DummyLogger()
        instance.pidfile = fn
        instance.write_pidfile()
        msg = instance.logger.data[0]
        self.failUnless(msg.startswith('could not write pidfile'))

    def test_close_fd(self):
        instance = self._makeOne()
        innie, outie = os.pipe()
        os.read(innie, 0) # we can read it while its open
        os.write(outie, 'foo') # we can write to it while its open
        instance.close_fd(innie)
        self.assertRaises(os.error, os.read, innie, 0)
        instance.close_fd(outie)
        self.assertRaises(os.error, os.write, outie, 'foo')
        

class TestBase(unittest.TestCase):
    def setUp(self):
        pass

    def tearDown(self):
        pass

    def _assertRPCError(self, code, callable, *args, **kw):
        try:
            callable(*args, **kw)
        except xmlrpc.RPCError, inst:
            self.assertEqual(inst.code, code)
        else:
            raise AssertionError("Didnt raise")

class MainXMLRPCInterfaceTests(TestBase):

    def _getTargetClass(self):
        return xmlrpc.RPCInterface

    def _makeOne(self, *args, **kw):
        return self._getTargetClass()(*args, **kw)

    def test_ctor(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self.assertEqual(interface.supervisor.supervisord, supervisord)
        self.failUnless(interface.system)

    def test_traverse(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        from xmlrpc import traverse
        self._assertRPCError(xmlrpc.Faults.UNKNOWN_METHOD,
                             traverse, interface, 'notthere.hello', [])
        self._assertRPCError(xmlrpc.Faults.UNKNOWN_METHOD,
                             traverse, interface, 'supervisor._readFile', [])
        self._assertRPCError(xmlrpc.Faults.INCORRECT_PARAMETERS,
                             traverse, interface,
                             'supervisor.getIdentification', [1])
        self.assertEqual(
            traverse(interface, 'supervisor.getIdentification', []),
            'supervisor')
            
def makeExecutable(file, substitutions=None):
    if substitutions is None:
        substitutions = {}
    data = open(file).read()
    last = os.path.split(file)[1]

    substitutions['PYTHON'] = sys.executable
    for key in substitutions.keys():
        data = data.replace('<<%s>>' % key.upper(), substitutions[key])
    
    import tempfile
    tmpnam = tempfile.mktemp(prefix=last)
    f = open(tmpnam, 'w')
    f.write(data)
    f.close()
    os.chmod(tmpnam, 0755)
    return tmpnam

def makeSpew(unkillable=False):
    here = os.path.dirname(__file__)
    if not unkillable:
        return makeExecutable(os.path.join(here, 'fixtures/spew.py'))
    return makeExecutable(os.path.join(here, 'fixtures/unkillable_spew.py'))

class SupervisorNamespaceXMLRPCInterfaceTests(TestBase):
    def _getTargetClass(self):
        return xmlrpc.SupervisorNamespaceRPCInterface

    def _makeOne(self, *args, **kw):
        return self._getTargetClass()(*args, **kw)

    def test_update(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        interface._update('foo')
        self.assertEqual(interface.update_text, 'foo')
        supervisord.state = SupervisorStates.SHUTDOWN
        self._assertRPCError(xmlrpc.Faults.SHUTDOWN_STATE, interface._update,
                             'foo')

    def test_getVersion(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        version = interface.getVersion()
        self.assertEqual(version, xmlrpc.RPC_VERSION)
        self.assertEqual(interface.update_text, 'getVersion')

    def test_getIdentification(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        identifier = interface.getIdentification()
        self.assertEqual(identifier, supervisord.options.identifier)
        self.assertEqual(interface.update_text, 'getIdentification')

    def test_getState(self):
        from supervisord import getSupervisorStateDescription
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        stateinfo = interface.getState()
        statecode = supervisord.get_state()
        statename = getSupervisorStateDescription(statecode)
        self.assertEqual(stateinfo['statecode'], statecode)
        self.assertEqual(stateinfo['statename'], statename)
        self.assertEqual(interface.update_text, 'getState')

    def test_readLog_unreadable(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.readLog,
                             offset=0, length=1)

    def test_readLog_badargs(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        try:
            logfile = supervisord.options.logfile
            f = open(logfile, 'w+')
            f.write('x' * 2048)
            f.close()
            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
                                 interface.readLog, offset=-1, length=1)
            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
                                 interface.readLog, offset=-1,
                                 length=-1)
        finally:
            os.remove(logfile)

    def test_readLog(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        logfile = supervisord.options.logfile
        try:
            f = open(logfile, 'w+')
            f.write('x' * 2048)
            f.write('y' * 2048)
            f.close()
            data = interface.readLog(offset=0, length=0)
            self.assertEqual(interface.update_text, 'readLog')
            self.assertEqual(data, ('x' * 2048) + ('y' * 2048))
            data = interface.readLog(offset=2048, length=0)
            self.assertEqual(data, 'y' * 2048)
            data = interface.readLog(offset=0, length=2048)
            self.assertEqual(data, 'x' * 2048)
            data = interface.readLog(offset=-4, length=0)
            self.assertEqual(data, 'y' * 4)
        finally:
            os.remove(logfile)

    def test_clearLog_unreadable(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.NO_FILE, interface.clearLog)

    def test_clearLog(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        logfile = supervisord.options.logfile
        try:
            f = open(logfile, 'w+')
            f.write('x')
            f.close()
            result = interface.clearLog()
            self.assertEqual(interface.update_text, 'clearLog')
            self.assertEqual(result, True)
            self.failIf(os.path.exists(logfile))
        finally:
            try:
                os.remove(logfile)
            except:
                pass

        self.assertEqual(supervisord.options.logger.handlers[0].reopened, True)

    def test_shutdown(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        value = interface.shutdown()
        self.assertEqual(value, True)
        self.assertEqual(supervisord.mood, -1)

    def test_restart(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        value = interface.restart()
        self.assertEqual(value, True)
        self.assertEqual(supervisord.mood, 0)

    def test_startProcess_already_started(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False)
        process = DummyProcess(options, config)
        process.pid = 10
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo')
        self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED,
                             callback)

    def test_startProcess_badname(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.BAD_NAME,
                             interface.startProcess,
                             'foo')

    def test_startProcess_spawnerr(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False)
        process = DummyProcess(options, config, ProcessStates.STOPPED)
        process.spawnerr = 'abc'
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo')
        self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback)

    def test_startProcess(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False, startsecs=.01)
        process = DummyProcess(options, config, state=ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo')
        self.assertEqual(callback(), http.NOT_DONE_YET)
        self.assertEqual(process.spawned, True)
        self.assertEqual(interface.update_text, 'startProcess')
        process.state = ProcessStates.RUNNING
        time.sleep(.02)
        result = callback()
        self.assertEqual(result, True)

    def test_startProcess_nowait(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False)
        process = DummyProcess(options, config, state=ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo', wait=False)
        self.assertEqual(callback(), True)
        self.assertEqual(process.spawned, True)
        self.assertEqual(interface.update_text, 'startProcess')

    def test_startProcess_nostartsecs(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False, startsecs=0)
        process = DummyProcess(options, config, state=ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo', wait=True)
        self.assertEqual(callback(), True)
        self.assertEqual(process.spawned, True)
        self.assertEqual(interface.update_text, 'startProcess')

    def test_startProcess_abnormal_term(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', autostart=False)
        process = DummyProcess(options, config, ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.startProcess('foo', 100) # milliseconds
        result = callback()
        time.sleep(.1)
        self.assertEqual(process.spawned, True)
        self.assertEqual(interface.update_text, 'startProcess')
        process.state = ProcessStates.BACKOFF
        self._assertRPCError(xmlrpc.Faults.ABNORMAL_TERMINATION,
                             callback)
    
    def test_startAllProcesses(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', priority=1,
                               startsecs=.01)
        config2 = DummyPConfig('foo2', '/bin/foo2', priority=2,
                               startsecs=.01)
        process = DummyProcess(options, config, ProcessStates.STOPPED)
        process2 = DummyProcess(options, config2, ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process, 'foo2':process2})
        interface = self._makeOne(supervisord)
        callback = interface.startAllProcesses()
        from http import NOT_DONE_YET
        from xmlrpc import Faults

        # create callbacks in startall()
        self.assertEqual(callback(), NOT_DONE_YET)
        # start first process
        self.assertEqual(callback(), NOT_DONE_YET)
        # start second process
        self.assertEqual(callback(), NOT_DONE_YET)

        # wait for timeout 1
        time.sleep(.02)
        result = callback()
        # wait for timeout 2
        time.sleep(.02)

        result = callback()

        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]['name'], 'foo2')
        self.assertEqual(result[0]['status'],  Faults.SUCCESS)
        self.assertEqual(result[0]['description'], 'OK')

        self.assertEqual(result[1]['name'], 'foo')
        self.assertEqual(result[1]['status'],  Faults.SUCCESS)
        self.assertEqual(result[1]['description'], 'OK')

        self.assertEqual(interface.update_text, 'startProcess')

        self.assertEqual(process.spawned, True)
        self.assertEqual(process2.spawned, True)

    def test_startAllProcesses_nowait(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', priority=1,
                               startsecs=.01)
        config2 = DummyPConfig('foo2', '/bin/foo2', priority=2,
                               startsecs=.01)
        process = DummyProcess(options, config, ProcessStates.STOPPED)
        process2 = DummyProcess(options, config2, ProcessStates.STOPPED)
        supervisord = DummySupervisor({'foo':process, 'foo2':process2})
        interface = self._makeOne(supervisord)
        callback = interface.startAllProcesses(wait=False)
        from http import NOT_DONE_YET
        from xmlrpc import Faults

        # create callbacks in startall()
        self.assertEqual(callback(), NOT_DONE_YET)

        result = callback()

        self.assertEqual(len(result), 2)
        self.assertEqual(result[0]['name'], 'foo')
        self.assertEqual(result[0]['status'],  Faults.SUCCESS)
        self.assertEqual(result[0]['description'], 'OK')

        self.assertEqual(result[1]['name'], 'foo2')
        self.assertEqual(result[1]['status'],  Faults.SUCCESS)
        self.assertEqual(result[1]['description'], 'OK')

    def test_stopProcess_badname(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.BAD_NAME,
                             interface.stopProcess, 'foo')

    def test_stopProcess(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo')
        process = DummyProcess(options, config, ProcessStates.RUNNING)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        callback = interface.stopProcess('foo')
        self.assertEqual(interface.update_text, 'stopProcess')
        process = supervisord.processes.get('foo')
        self.assertEqual(process.backoff, 0)
        self.assertEqual(process.delay, 0)
        self.assertEqual(process.killing, 0)
        self.assertEqual(callback(), http.NOT_DONE_YET)
        self.assertEqual(process.state, ProcessStates.STOPPED)
        self.assertEqual(callback(), True)
        self.assertEqual(len(supervisord.processes), 1)
        self.assertEqual(interface.update_text, 'stopProcess')


    def test_stopAllProcesses(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo')
        config2 = DummyPConfig('foo2', '/bin/foo2')
        process = DummyProcess(options, config, ProcessStates.RUNNING)
        process2 = DummyProcess(options, config2, ProcessStates.RUNNING)
        supervisord = DummySupervisor({'foo':process, 'foo2':process2})
        interface = self._makeOne(supervisord)
        callback = interface.stopAllProcesses()
        self.assertEqual(interface.update_text, 'stopAllProcesses')
        value = http.NOT_DONE_YET
        while 1:
            value = callback()
            if value is not http.NOT_DONE_YET:
                break

        processes = supervisord.processes
        self.assertEqual(value, [
            {'status': 80, 'name': 'foo2', 'description': 'OK'},
            {'status': 80, 'name': 'foo', 'description': 'OK'}
            ] )
        self.assertEqual(len(processes), 2)
        self.assertEqual(process.stop_called, True)
        self.assertEqual(process2.stop_called, True)

    def test__interpretProcessInfo(self):
        supervisord = DummySupervisor({})
        interface = self._makeOne(supervisord)
        start = _NOW -100
        stop  = _NOW -1
        from supervisord import ProcessStates
        running = {'name':'running',
                   'pid':1,
                   'state':ProcessStates.RUNNING,
                   'start':start,
                   'stop':stop,
                   'now':_NOW}
        
        description = interface._interpretProcessInfo(running)
        self.assertEqual(description, 'pid 1, uptime 0:01:40')

        fatal = {'name':'fatal',
                 'pid':2,
                 'state':ProcessStates.FATAL,
                 'start':start,
                 'stop':stop,
                 'now':_NOW,
                 'spawnerr':'Hosed'}
                 
        description = interface._interpretProcessInfo(fatal)
        self.assertEqual(description, 'Hosed')

        fatal2 = {'name':'fatal',
                  'pid':2,
                  'state':ProcessStates.FATAL,
                  'start':start,
                  'stop':stop,
                  'now':_NOW,
                  'spawnerr':'',}
                 
        description = interface._interpretProcessInfo(fatal2)
        self.assertEqual(description, 'unknown error (try "tail fatal")')
        
        stopped = {'name':'stopped',
                   'pid':3,
                   'state':ProcessStates.STOPPED,
                   'start':start,
                   'stop':stop,
                   'now':_NOW,
                   'spawnerr':'',}

        description = interface._interpretProcessInfo(stopped)
        stoptime = datetime(*time.localtime(stop)[:7])
        self.assertEqual(description, stoptime.strftime(_TIMEFORMAT))
        
        stopped2 = {'name':'stopped',
                   'pid':3,
                   'state':ProcessStates.STOPPED,
                   'start':0,
                   'stop':stop,
                   'now':_NOW,
                   'spawnerr':'',}

        description = interface._interpretProcessInfo(stopped2)
        self.assertEqual(description, 'Not started')
                   

    def test_getProcessInfo(self):
        from supervisord import ProcessStates

        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fleeb.bar')
        process = DummyProcess(options, config)
        process.pid = 111
        process.laststart = 10
        process.laststop = 11
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        data = interface.getProcessInfo('foo')

        self.assertEqual(interface.update_text, 'getProcessInfo')
        self.assertEqual(data['logfile'], '/tmp/fleeb.bar')
        self.assertEqual(data['name'], 'foo')
        self.assertEqual(data['pid'], 111)
        self.assertEqual(data['start'], 10)
        self.assertEqual(data['stop'], 11)
        self.assertEqual(data['state'], ProcessStates.RUNNING)
        self.assertEqual(data['statename'], 'RUNNING')
        self.assertEqual(data['exitstatus'], 0)
        self.assertEqual(data['spawnerr'], '')
        self.failUnless(data['description'].startswith('pid 111'))

    def test_getAllProcessInfo(self):
        from supervisord import ProcessStates
        options = DummyOptions()

        p1config = DummyPConfig('process1', '/bin/process1', priority=1,
                                logfile='/tmp/process1.log')
        p2config = DummyPConfig('process2', '/bin/process2', priority=2,
                                logfile='/tmp/process2.log')
        process1 = DummyProcess(options, p1config, ProcessStates.RUNNING)
        process1.pid = 111
        process1.laststart = 10
        process1.laststop = 11
        process2 = DummyProcess(options, p2config, ProcessStates.STOPPED)
        process2.pid = 0
        process2.laststart = 20
        process2.laststop = 11
        supervisord = DummySupervisor({'process1':process1,
                                       'process2':process2})
        interface = self._makeOne(supervisord)

        info = interface.getAllProcessInfo()

        self.assertEqual(interface.update_text, 'getProcessInfo')
        self.assertEqual(len(info), 2)

        p1info = info[0]
        self.assertEqual(p1info['logfile'], '/tmp/process1.log')
        self.assertEqual(p1info['name'], 'process1')
        self.assertEqual(p1info['pid'], 111)
        self.assertEqual(p1info['start'], 10)
        self.assertEqual(p1info['stop'], 11)
        self.assertEqual(p1info['state'], ProcessStates.RUNNING)
        self.assertEqual(p1info['statename'], 'RUNNING')
        self.assertEqual(p1info['exitstatus'], 0)
        self.assertEqual(p1info['spawnerr'], '')
        self.failUnless(p1info['description'].startswith('pid 111'))

        p2info = info[1]
        self.assertEqual(p2info['logfile'], '/tmp/process2.log')
        self.assertEqual(p2info['name'], 'process2')
        self.assertEqual(p2info['pid'], 0)
        self.assertEqual(p2info['start'], process2.laststart)
        self.assertEqual(p2info['stop'], 11)
        self.assertEqual(p2info['state'], ProcessStates.STOPPED)
        self.assertEqual(p2info['statename'], 'STOPPED')
        self.assertEqual(p2info['exitstatus'], 0)
        self.assertEqual(p2info['spawnerr'], '')
        
        starttime = datetime(*time.localtime(process2.laststart)[:7])
        self.assertEqual(p2info['description'], 
                            starttime.strftime(_TIMEFORMAT))

    def test_readProcessLog_unreadable(self):
        options = DummyOptions()
        config = DummyPConfig('process1', '/bin/process1', priority=1,
                              logfile='/tmp/process1.log')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'process1':process})
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.NO_FILE,
                             interface.readProcessLog,
                             'process1', offset=0, length=1)

    def test_readProcessLog_badargs(self):
        options = DummyOptions()
        config = DummyPConfig('process1', '/bin/process1', priority=1,
                              logfile='/tmp/process1.log')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'process1':process})
        interface = self._makeOne(supervisord)

        try:
            logfile = process.config.logfile
            f = open(logfile, 'w+')
            f.write('x' * 2048)
            f.close()
            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
                                 interface.readProcessLog,
                                 'process1', offset=-1, length=1)
            self._assertRPCError(xmlrpc.Faults.BAD_ARGUMENTS,
                                 interface.readProcessLog, 'process1',
                                 offset=-1, length=-1)
        finally:
            os.remove(logfile)

    def test_readProcessLog(self):
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fooooooo')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        process = supervisord.processes.get('foo')
        logfile = process.config.logfile
        try:
            f = open(logfile, 'w+')
            f.write('x' * 2048)
            f.write('y' * 2048)
            f.close()
            data = interface.readProcessLog('foo', offset=0, length=0)
            self.assertEqual(interface.update_text, 'readProcessLog')
            self.assertEqual(data, ('x' * 2048) + ('y' * 2048))
            data = interface.readProcessLog('foo', offset=2048, length=0)
            self.assertEqual(data, 'y' * 2048)
            data = interface.readProcessLog('foo', offset=0, length=2048)
            self.assertEqual(data, 'x' * 2048)
            data = interface.readProcessLog('foo', offset=-4, length=0)
            self.assertEqual(data, 'y' * 4)
        finally:
            os.remove(logfile)

    def test_tailProcessLog_bad_name(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.BAD_NAME, 
                             interface.tailProcessLog, 'BAD_NAME', 0, 10)

    def test_tailProcessLog_all(self):
        """entire log is returned when offset==0 and logsize < length"""
        from string import letters
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fooooooo')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        process = supervisord.processes.get('foo')
        logfile = process.config.logfile
        try:
            f = open(logfile, 'w+')
            f.write(letters)
            f.close()
            
            data, offset, overflow = interface.tailProcessLog('foo', 
                                                        offset=0, 
                                                        length=len(letters))
            self.assertEqual(interface.update_text, 'tailProcessLog')
            self.assertEqual(overflow, False)
            self.assertEqual(offset, len(letters))
            self.assertEqual(data, letters)
        finally:
            os.remove(logfile)

    def test_tailProcessLog_none(self):
        """nothing is returned when offset <= logsize"""
        from string import letters
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fooooooo')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        process = supervisord.processes.get('foo')
        logfile = process.config.logfile
        try:
            f = open(logfile, 'w+')
            f.write(letters)
            f.close()

            # offset==logsize
            data, offset, overflow = interface.tailProcessLog('foo', 
                                                        offset=len(letters), 
                                                        length=100)
            self.assertEqual(interface.update_text, 'tailProcessLog')
            self.assertEqual(overflow, False)
            self.assertEqual(offset, len(letters))
            self.assertEqual(data, '')

            # offset > logsize
            data, offset, overflow = interface.tailProcessLog('foo', 
                                                        offset=len(letters)+5, 
                                                        length=100)
            self.assertEqual(interface.update_text, 'tailProcessLog')
            self.assertEqual(overflow, False)
            self.assertEqual(offset, len(letters))
            self.assertEqual(data, '')
        finally:
            os.remove(logfile)

    def test_tailProcessLog_overflow(self):
        """buffer overflow occurs when logsize > offset+length"""
        from string import letters
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fooooooo')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        process = supervisord.processes.get('foo')
        logfile = process.config.logfile
        try:
            f = open(logfile, 'w+')
            f.write(letters)
            f.close()

            data, offset, overflow = interface.tailProcessLog('foo', 
                                                        offset=0, length=5)
            self.assertEqual(interface.update_text, 'tailProcessLog')
            self.assertEqual(overflow, True)
            self.assertEqual(offset, len(letters))
            self.assertEqual(data, letters[-5:])
        finally:
            os.remove(logfile)
    
    def test_tailProcessLog_unreadable(self):
        """nothing is returned if the log doesn't exist yet"""
        from string import letters
        options = DummyOptions()
        config = DummyPConfig('foo', '/bin/foo', logfile='/tmp/fooooooo')
        process = DummyProcess(options, config)
        supervisord = DummySupervisor({'foo':process})
        interface = self._makeOne(supervisord)
        process = supervisord.processes.get('foo')
        logfile = process.config.logfile
                
        data, offset, overflow = interface.tailProcessLog('foo', 
                                                    offset=0, length=100)
        self.assertEqual(interface.update_text, 'tailProcessLog')
        self.assertEqual(overflow, False)
        self.assertEqual(offset, 0)
        self.assertEqual(data, '')

    def test_clearProcessLog_bad_name(self):
        supervisord = DummySupervisor()
        interface = self._makeOne(supervisord)
        self._assertRPCError(xmlrpc.Faults.BAD_NAME,
                             interface.clearProcessLog,
                             'spew')

    def test_clearProcessLog(self):
        pconfig = DummyPConfig('foo', 'foo')
        options = DummyOptions()
        process = DummyProcess(options, pconfig)
        processes = {'foo': process}
        supervisord = DummySupervisor(processes)
        interface = self._makeOne(supervisord)
        interface.clearProcessLog('foo')
        self.assertEqual(process.logsremoved, True)

    def test_clearProcessLog_failed(self):
        pconfig = DummyPConfig('foo', 'foo')
        options = DummyOptions()
        process = DummyProcess(options, pconfig)
        process.error_at_clear = True
        processes = {'foo': process}
        supervisord = DummySupervisor(processes)
        interface = self._makeOne(supervisord)
        self.assertRaises(xmlrpc.RPCError, interface.clearProcessLog, 'foo')
        

    def test_clearAllProcessLogs(self):
        pconfig = DummyPConfig('foo', 'foo')
        pconfig2 = DummyPConfig('bar', 'bar')
        options = DummyOptions()
        process = DummyProcess(options, pconfig)
        process2= DummyProcess(options, pconfig2)
        processes = {'foo': process, 'bar':process2}
        supervisord = DummySupervisor(processes)
        interface = self._makeOne(supervisord)
        callback = interface.clearAllProcessLogs()
        callback()
        callback()
        self.assertEqual(process.logsremoved, True)
        self.assertEqual(process2.logsremoved, True)

    def test_clearAllProcessLogs_onefails(self):
        pconfig = DummyPConfig('foo', 'foo')
        pconfig2 = DummyPConfig('bar', 'bar')
        options = DummyOptions()
        process = DummyProcess(options, pconfig)
        process2= DummyProcess(options, pconfig2)
        process2.error_at_clear = True
        processes = {'foo': process, 'bar':process2}
        supervisord = DummySupervisor(processes)
        interface = self._makeOne(supervisord)
        callback = interface.clearAllProcessLogs()
        callback()
        results = callback()
        self.assertEqual(process.logsremoved, True)
        self.assertEqual(process2.logsremoved, False)
        self.assertEqual(len(results), 2)
        self.assertEqual(results[0], {'name':'bar',
                                      'status':xmlrpc.Faults.FAILED,
                                      'description':'FAILED: bar'})
        self.assertEqual(results[1], {'name':'foo',
                                      'status':xmlrpc.Faults.SUCCESS,
                                      'description':'OK'})

class SystemNamespaceXMLRPCInterfaceTests(TestBase):
    def _getTargetClass(self):
        return xmlrpc.SystemNamespaceRPCInterface

    def _makeOne(self):
        supervisord = DummySupervisor()
        supervisor = xmlrpc.SupervisorNamespaceRPCInterface(supervisord)
        return self._getTargetClass()(
            [('supervisor', supervisor),
             ]
            )

    def test_ctor(self):
        interface = self._makeOne()
        self.failUnless(interface.namespaces['supervisor'])
        self.failUnless(interface.namespaces['system'])

    def test_listMethods(self):
        interface = self._makeOne()
        methods = interface.listMethods()
        methods.sort()
        keys = interface._listMethods().keys()
        keys.sort()
        self.assertEqual(methods, keys)

    def test_methodSignature(self):
        interface = self._makeOne()
        self._assertRPCError(xmlrpc.Faults.SIGNATURE_UNSUPPORTED,
                             interface.methodSignature,
                             ['foo.bar'])
        result = interface.methodSignature('system.methodSignature')
        self.assertEqual(result, ['array', 'string'])

    def test_allMethodDocs(self):
        # belt-and-suspenders test for docstring-as-typing parsing correctness
        # and documentation validity vs. implementation
        import options
        _RPCTYPES = ['int', 'double', 'string', 'boolean', 'dateTime.iso8601',
                     'base64', 'binary', 'array', 'struct']
        interface = self._makeOne()
        methods = interface._listMethods()
        for k in methods.keys():
            # if a method doesn't have a @return value, an RPCError is raised.
            # Detect that here.
            try:
                interface.methodSignature(k)
            except xmlrpc.RPCError:
                raise AssertionError, ('methodSignature for %s raises '
                                       'RPCError (missing @return doc?)' % k)

            # we want to test that the number of arguments implemented in
            # the function is the same as the number of arguments implied by
            # the doc @params, and that they show up in the same order.
            ns_name, method_name = k.split('.', 1)
            namespace = interface.namespaces[ns_name]
            meth = getattr(namespace, method_name)
            code = meth.func_code
            argnames = code.co_varnames[1:code.co_argcount]
            parsed = options.gettags(str(meth.__doc__))

            plines = []
            ptypes = []
            pnames = []
            ptexts = []

            rlines = []
            rtypes = []
            rnames = []
            rtexts = []

            for thing in parsed:
                if thing[1] == 'param': # tag name
                    plines.append(thing[0]) # doc line number
                    ptypes.append(thing[2]) # data type
                    pnames.append(thing[3]) # function name
                    ptexts.append(thing[4])  # description
                elif thing[1] == 'return': # tag name
                    rlines.append(thing[0]) # doc line number
                    rtypes.append(thing[2]) # data type
                    rnames.append(thing[3]) # function name
                    rtexts.append(thing[4])  # description
                elif thing[1] is not None:
                    raise AssertionError(
                        'unknown tag type %s for %s, parsed %s' % (thing[1],
                                                                   k,
                                                                   parsed))
            # param tokens

            if len(argnames) != len(pnames):
                raise AssertionError, ('Incorrect documentation '
                                       '(%s args, %s doc params) in %s'
                                       % (len(argnames), len(pnames), k))
            for docline in plines:
                self.failUnless(type(docline) == int, (docline,
                                                       type(docline),
                                                       k,
                                                       parsed))
            for doctype in ptypes:
                self.failUnless(doctype in _RPCTYPES, doctype)
            for x in range(len(pnames)):
                if pnames[x] != argnames[x]:
                    msg = 'Name wrong: (%s vs. %s in %s)\n%s' % (pnames[x],
                                                                 argnames[x],
                                                                 k,
                                                                 parsed)
                    raise AssertionError, msg
            for doctext in ptexts:
                self.failUnless(type(doctext) == type(''), doctext)

            # result tokens
            
            if len(rlines) > 1:
                raise AssertionError(
                    'Duplicate @return values in docs for %s' % k)
            for docline in rlines:
                self.failUnless(type(docline) == int, (docline,
                                                       type(docline), k))
            for doctype in rtypes:
                self.failUnless(doctype in _RPCTYPES, doctype)
            for docname in rnames:
                self.failUnless(type(docname) == type(''), (docname,
                                                            type(docname),
                                                            k))
            for doctext in rtexts:
                self.failUnless(type(doctext) == type(''), (doctext,
                                                            type(doctext), k))

    def test_multicall_simplevals(self):
        interface = self._makeOne()
        callback = interface.multicall([
            {'methodName':'system.methodHelp', 'params':['system.methodHelp']},
            {'methodName':'system.listMethods', 'params':[]},
            ])
        result = http.NOT_DONE_YET
        while result is http.NOT_DONE_YET:
            result = callback()
        self.assertEqual(result[0], interface.methodHelp('system.methodHelp'))
        self.assertEqual(result[1], interface.listMethods())

    def test_multicall_nested_callback(self):
        interface = self._makeOne()
        callback = interface.multicall([
            {'methodName':'supervisor.stopAllProcesses'}])
        result = http.NOT_DONE_YET
        while result is http.NOT_DONE_YET:
            result = callback()
        self.assertEqual(result[0], [])

    def test_methodHelp(self):
        interface = self._makeOne()
        self._assertRPCError(xmlrpc.Faults.SIGNATURE_UNSUPPORTED,
                             interface.methodHelp,
                             ['foo.bar'])
        help = interface.methodHelp('system.methodHelp')
        self.assertEqual(help, interface.methodHelp.__doc__)

class SubprocessTests(unittest.TestCase):
    def _getTargetClass(self):
        return supervisord.Subprocess

    def _makeOne(self, *arg, **kw):
        return supervisord.Subprocess(*arg, **kw)

    def test_ctor(self):
        options = DummyOptions()
        config = DummyPConfig('cat', 'bin/cat', logfile='/tmp/temp123.log')
        instance = self._makeOne(options, config)
        self.assertEqual(instance.options, options)
        self.assertEqual(instance.config, config)
        self.assertEqual(instance.laststart, 0)
        self.assertEqual(instance.childlog.args, (
            ('/tmp/temp123.log', 20, '%(message)s'),
            {'rotating': False, 'backups': 0, 'maxbytes': 0}))
        self.assertEqual(instance.pid, 0)
        self.assertEqual(instance.laststart, 0)
        self.assertEqual(instance.laststop, 0)
        self.assertEqual(instance.delay, 0)
        self.assertEqual(instance.administrative_stop, 0)
        self.assertEqual(instance.killing, 0)
        self.assertEqual(instance.backoff, 0)
        self.assertEqual(instance.pipes, {})
        self.assertEqual(instance.spawnerr, None)
        self.assertEqual(instance.logbuffer, '')

    def test_removelogs(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
        instance = self._makeOne(options, config)
        instance.removelogs()
        self.assertEqual(instance.childlog.handlers[0].reopened, True)
        self.assertEqual(instance.childlog.handlers[0].removed, True)

    def test_reopenlogs(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
        instance = self._makeOne(options, config)
        instance.reopenlogs()
        self.assertEqual(instance.childlog.handlers[0].reopened, True)

    def test_log_output(self):
        # stdout goes to the process log and the main log
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
        instance = self._makeOne(options, config)
        instance.logbuffer = 'foo'
        instance.log_output()
        self.assertEqual(instance.childlog.data, ['foo'])
        self.assertEqual(options.logger.data, [5, 'notthere output:\nfoo'])

    def test_drain_stdout(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.pipes['stdout'] = 'abc'
        instance.drain_stdout()
        self.assertEqual(instance.logbuffer, 'abc')

    def test_drain_stderr(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.pipes['stderr'] = 'abc'
        instance.drain_stderr()
        self.assertEqual(instance.logbuffer, '')

        instance.config.log_stderr = True
        instance.drain_stderr()
        self.assertEqual(instance.logbuffer, 'abc')

    def test_drain(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.config.log_stderr = True
        instance.pipes['stdout'] = 'abc'
        instance.pipes['stderr'] = 'def'
        instance.drain()
        self.assertEqual(instance.logbuffer, 'abcdef')

        instance.logbuffer = ''
        instance.config.log_stderr = False
        instance.drain()
        self.assertEqual(instance.logbuffer, 'abc')
        
    def test_get_pipe_drains(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.config.log_stderr = True
        instance.pipes['stdout'] = 'abc'
        instance.pipes['stderr'] = 'def'

        drains = instance.get_pipe_drains()
        self.assertEqual(len(drains), 2)
        self.assertEqual(drains[0], ['abc', instance.drain_stdout])
        self.assertEqual(drains[1], ['def', instance.drain_stderr])

        instance.pipes = {}
        drains = instance.get_pipe_drains()
        self.assertEqual(drains, [])
        

    def test_get_execv_args_abs_missing(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere')
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args, ('/notthere', ['/notthere'], None))

    def test_get_execv_args_abs_withquotes_missing(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere "an argument"')
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args, ('/notthere', ['/notthere', 'an argument'],
                                None))

    def test_get_execv_args_rel_missing(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', 'notthere')
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args, (None, ['notthere'], None))

    def test_get_execv_args_rel_withquotes_missing(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', 'notthere "an argument"')
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args, (None, ['notthere', 'an argument'], None))

    def test_get_execv_args_abs(self):
        executable = '/bin/sh foo'
        options = DummyOptions()
        config = DummyPConfig('sh', executable)
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args[0], '/bin/sh')
        self.assertEqual(args[1], ['/bin/sh', 'foo'])
        self.assertEqual(len(args[2]), 10)

    def test_get_execv_args_rel(self):
        executable = 'sh foo'
        options = DummyOptions()
        config = DummyPConfig('sh', executable)
        instance = self._makeOne(options, config)
        args = instance.get_execv_args()
        self.assertEqual(args[0], '/bin/sh')
        self.assertEqual(args[1], ['sh', 'foo'])
        self.assertEqual(len(args[2]), 10)

    def test_record_spawnerr(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.record_spawnerr('foo')
        self.assertEqual(instance.spawnerr, 'foo')
        self.assertEqual(options.logger.data[0], 'spawnerr: foo')
        self.assertEqual(instance.backoff, 1)
        self.failUnless(instance.delay)

    def test_spawn_already_running(self):
        options = DummyOptions()
        config = DummyPConfig('sh', '/bin/sh')
        instance = self._makeOne(options, config)
        instance.pid = True
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(options.logger.data[0], "process 'sh' already running")

    def test_spawn_fail_check_execv_args(self):
        options = DummyOptions()
        config = DummyPConfig('bad', '/bad/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(instance.spawnerr, 'bad filename')
        self.assertEqual(options.logger.data[0], "spawnerr: bad filename")
        self.failUnless(instance.delay)
        self.failUnless(instance.backoff)

    def test_spawn_fail_make_pipes_emfile(self):
        options = DummyOptions()
        options.make_pipes_error = errno.EMFILE
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(instance.spawnerr,
                         "too many open files to spawn 'good'")
        self.assertEqual(options.logger.data[0],
                         "spawnerr: too many open files to spawn 'good'")
        self.failUnless(instance.delay)
        self.failUnless(instance.backoff)

    def test_spawn_fail_make_pipes_other(self):
        options = DummyOptions()
        options.make_pipes_error = 1
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(instance.spawnerr, 'unknown error: EPERM')
        self.assertEqual(options.logger.data[0],
                         "spawnerr: unknown error: EPERM")
        self.failUnless(instance.delay)
        self.failUnless(instance.backoff)

    def test_spawn_fork_fail_eagain(self):
        options = DummyOptions()
        options.fork_error = errno.EAGAIN
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(instance.spawnerr,
                         "Too many processes in process table to spawn 'good'")
        self.assertEqual(options.logger.data[0],
             "spawnerr: Too many processes in process table to spawn 'good'")
        self.assertEqual(len(options.parent_pipes_closed), 6)
        self.assertEqual(len(options.child_pipes_closed), 6)
        self.failUnless(instance.delay)
        self.failUnless(instance.backoff)

    def test_spawn_fork_fail_other(self):
        options = DummyOptions()
        options.fork_error = 1
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(instance.spawnerr, 'unknown error: EPERM')
        self.assertEqual(options.logger.data[0],
                         "spawnerr: unknown error: EPERM")
        self.assertEqual(len(options.parent_pipes_closed), 6)
        self.assertEqual(len(options.child_pipes_closed), 6)
        self.failUnless(instance.delay)
        self.failUnless(instance.backoff)

    def test_spawn_as_child_setuid_ok(self):
        options = DummyOptions()
        options.forkpid = 0
        config = DummyPConfig('good', '/good/filename', uid=1)
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(options.parent_pipes_closed, None)
        self.assertEqual(options.child_pipes_closed, None)
        self.assertEqual(options.pgrp_set, True)
        self.assertEqual(len(options.duped), 3)
        self.assertEqual(len(options.fds_closed), options.minfds - 3)
        self.assertEqual(options.written, {})
        self.assertEqual(options.privsdropped, 1)
        self.assertEqual(options.execv_args,
                         ('/good/filename', ['/good/filename']) )
        self.assertEqual(options._exitcode, 127)

    def test_spawn_as_child_setuid_fail(self):
        options = DummyOptions()
        options.forkpid = 0
        options.setuid_msg = 'screwed'
        config = DummyPConfig('good', '/good/filename', uid=1)
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(options.parent_pipes_closed, None)
        self.assertEqual(options.child_pipes_closed, None)
        self.assertEqual(options.pgrp_set, True)
        self.assertEqual(len(options.duped), 3)
        self.assertEqual(len(options.fds_closed), options.minfds - 3)
        self.assertEqual(options.written,
             {1: ['good: error trying to setuid to 1!\n', 'good: screwed\n']})
        self.assertEqual(options.privsdropped, None)
        self.assertEqual(options.execv_args,
                         ('/good/filename', ['/good/filename']) )
        self.assertEqual(options._exitcode, 127)

    def test_spawn_as_child_execv_fail_oserror(self):
        options = DummyOptions()
        options.forkpid = 0
        options.execv_error = 1
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(options.parent_pipes_closed, None)
        self.assertEqual(options.child_pipes_closed, None)
        self.assertEqual(options.pgrp_set, True)
        self.assertEqual(len(options.duped), 3)
        self.assertEqual(len(options.fds_closed), options.minfds - 3)
        self.assertEqual(options.written,
                         {1: ["couldn't exec /good/filename: EPERM\n"]})
        self.assertEqual(options.privsdropped, None)
        self.assertEqual(options._exitcode, 127)

    def test_spawn_as_child_execv_fail_runtime_error(self):
        options = DummyOptions()
        options.forkpid = 0
        options.execv_error = 2
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, None)
        self.assertEqual(options.parent_pipes_closed, None)
        self.assertEqual(options.child_pipes_closed, None)
        self.assertEqual(options.pgrp_set, True)
        self.assertEqual(len(options.duped), 3)
        self.assertEqual(len(options.fds_closed), options.minfds - 3)
        self.assertEqual(len(options.written), 1)
        msg = options.written[1][0]
        self.failUnless(msg.startswith("couldn't exec /good/filename:"))
        self.failUnless("exceptions.RuntimeError" in msg)
        self.assertEqual(options.privsdropped, None)
        self.assertEqual(options._exitcode, 127)

    def test_spawn_as_parent(self):
        options = DummyOptions()
        options.forkpid = 10
        config = DummyPConfig('good', '/good/filename')
        instance = self._makeOne(options, config)
        result = instance.spawn()
        self.assertEqual(result, 10)
        self.assertEqual(options.parent_pipes_closed, None)
        self.assertEqual(len(options.child_pipes_closed), 6)
        self.assertEqual(options.logger.data[0], "spawned: 'good' with pid 10")
        self.assertEqual(instance.spawnerr, None)
        self.failUnless(instance.delay)
        self.assertEqual(instance.options.pidhistory[10], instance)

    def dont_test_spawn_and_kill(self):
        # this is a functional test
        try:
            called = 0
            def foo(*args):
                called = 1
            signal.signal(signal.SIGCHLD, foo)
            executable = makeSpew()
            options = DummyOptions()
            config = DummyPConfig('spew', executable)
            instance = self._makeOne(options, config)
            result = instance.spawn()
            msg = options.logger.data[0]
            self.failUnless(msg.startswith("spawned: 'spew' with pid"))
            self.assertEqual(len(instance.pipes), 6)
            self.failUnless(instance.pid)
            self.failUnlessEqual(instance.pid, result)
            origpid = instance.pid
            while 1:
                try:
                    data = os.popen('ps').read()
                    break
                except IOError, why:
                    if why[0] != errno.EINTR:
                        raise
                        # try again ;-)
            time.sleep(0.1) # arbitrary, race condition possible
            self.failUnless(data.find(`origpid`) != -1 )
            msg = instance.kill(signal.SIGTERM)
            time.sleep(0.1) # arbitrary, race condition possible
            self.assertEqual(msg, None)
            pid, sts = os.waitpid(-1, os.WNOHANG)
            data = os.popen('ps').read()
            self.assertEqual(data.find(`origpid`), -1) # dubious
        finally:
            try:
                os.remove(executable)
            except:
                pass
            signal.signal(signal.SIGCHLD, signal.SIG_DFL)

    def test_stop(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.pid = 11
        instance.stop()
        self.assertEqual(instance.administrative_stop, 1)
        self.failUnless(instance.delay)
        self.assertEqual(options.logger.data[0], 'killing test (pid 11) with '
                         'signal SIGTERM')
        self.assertEqual(instance.killing, 1)
        self.assertEqual(options.kills[11], signal.SIGTERM)

    def test_kill_nopid(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.kill(signal.SIGTERM)
        self.assertEqual(options.logger.data[0],
              'attempted to kill test with sig SIGTERM but it wasn\'t running')
        self.assertEqual(instance.killing, 0)

    def test_kill_error(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        options.kill_error = 1
        instance = self._makeOne(options, config)
        instance.pid = 11
        instance.kill(signal.SIGTERM)
        self.assertEqual(options.logger.data[0], 'killing test (pid 11) with '
                         'signal SIGTERM')
        self.failUnless(options.logger.data[1].startswith(
            'unknown problem killing test'))
        self.assertEqual(instance.killing, 0)

    def test_kill(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.pid = 11
        instance.kill(signal.SIGTERM)
        self.assertEqual(options.logger.data[0], 'killing test (pid 11) with '
                         'signal SIGTERM')
        self.assertEqual(instance.killing, 1)
        self.assertEqual(options.kills[11], signal.SIGTERM)

    def test_finish(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
        instance = self._makeOne(options, config)
        instance.waitstatus = (123, 1) # pid, waitstatus
        instance.options.pidhistory[123] = instance
        instance.killing = 1
        pipes = {'stdout':'','stderr':''}
        instance.pipes = pipes
        instance.finish(123, 1)
        self.assertEqual(instance.killing, 0)
        self.assertEqual(instance.pid, 0)
        self.assertEqual(options.parent_pipes_closed, pipes)
        self.assertEqual(instance.pipes, {})
        self.assertEqual(options.logger.data[0], 'stopped: notthere '
                         '(terminated by SIGHUP)')
        self.assertEqual(instance.exitstatus, -1)

    def test_set_uid_no_uid(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test')
        instance = self._makeOne(options, config)
        instance.set_uid()
        self.assertEqual(options.privsdropped, None)

    def test_set_uid(self):
        options = DummyOptions()
        config = DummyPConfig('test', '/test', uid=1)
        instance = self._makeOne(options, config)
        msg = instance.set_uid()
        self.assertEqual(options.privsdropped, 1)
        self.assertEqual(msg, None)

    def test_cmp_bypriority(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo',
                              priority=1)
        instance = self._makeOne(options, config)

        config = DummyPConfig('notthere1', '/notthere', logfile='/tmp/foo',
                              priority=2)
        instance1 = self._makeOne(options, config)

        config = DummyPConfig('notthere2', '/notthere', logfile='/tmp/foo',
                              priority=3)
        instance2 = self._makeOne(options, config)

        L = [instance2, instance, instance1]
        L.sort()

        self.assertEqual(L, [instance, instance1, instance2])

    def test_get_state(self):
        options = DummyOptions()
        config = DummyPConfig('notthere', '/notthere', logfile='/tmp/foo')
        from supervisord import ProcessStates

        instance = self._makeOne(options, config)
        instance.killing = True
        instance.laststart = 100
        self.assertEqual(instance.get_state(), ProcessStates.STOPPING)

        instance = self._makeOne(options, config)
        instance.laststart = 1
        instance.delay = 1
        instance.pid = 1
        self.assertEqual(instance.get_state(), ProcessStates.STARTING)

        instance = self._makeOne(options, config)
        instance.laststart = 1
        instance.pid = 11
        self.assertEqual(instance.get_state(), ProcessStates.RUNNING)
        
        instance = self._makeOne(options, config)
        instance.system_stop = True
        instance.laststart = 100
        self.assertEqual(instance.get_state(), ProcessStates.FATAL)

        instance = self._makeOne(options, config)
        instance.administrative_stop = True
        self.assertEqual(instance.get_state(), ProcessStates.STOPPED)
        
        instance = self._makeOne(options, config)
        instance.laststart = 1
        instance.exitstatus = 1
        self.assertEqual(instance.get_state(), ProcessStates.EXITED)

        instance = self._makeOne(options, config)
        instance.laststart = 1
        instance.delay = 1
        self.assertEqual(instance.get_state(), ProcessStates.BACKOFF)

        instance = self._makeOne(options, config)
        instance.laststart = 1
        self.assertEqual(instance.get_state(), ProcessStates.UNKNOWN)

class XMLRPCMarshallingTests(unittest.TestCase):
    def test_xmlrpc_marshal(self):
        import xmlrpclib
        data = xmlrpc.xmlrpc_marshal(1)
        self.assertEqual(data, xmlrpclib.dumps((1,), methodresponse=True))
        fault = xmlrpclib.Fault(1, 'foo')
        data = xmlrpc.xmlrpc_marshal(fault)
        self.assertEqual(data, xmlrpclib.dumps(fault))

class XMLRPCHandlerTests(unittest.TestCase):
    def _getTargetClass(self):
        from xmlrpc import supervisor_xmlrpc_handler
        return supervisor_xmlrpc_handler
    
    def _makeOne(self, supervisord):
        return self._getTargetClass()(supervisord)

    def test_ctor(self):
        supervisor = DummySupervisor()
        handler = self._makeOne(supervisor)
        self.assertEqual(handler.supervisord, supervisor)
        from xmlrpc import RPCInterface
        self.assertEqual(handler.rpcinterface.__class__, RPCInterface)

    def test_continue_request_nosuchmethod(self):
        supervisor = DummySupervisor()
        handler = self._makeOne(supervisor)
        handler.rpcinterface = DummyRPCServer()
        import xmlrpclib
        data = xmlrpclib.dumps(('a', 'b'), 'supervisor.noSuchMethod')
        request = DummyRequest('/what/ever', None, None, None)
        handler.continue_request(data, request)
        logdata = supervisor.options.logger.data
        self.assertEqual(len(logdata), 2)
        self.assertEqual(logdata[0],
                         u'XML-RPC method called: supervisor.noSuchMethod()')
        self.assertEqual(logdata[1],
           (u'XML-RPC method supervisor.noSuchMethod() returned fault: '
            '[1] UNKNOWN_METHOD'))
        self.assertEqual(len(request.producers), 1)
        xml_response = request.producers[0]
        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, xml_response)

    def test_continue_request_methodsuccess(self):
        supervisor = DummySupervisor()
        handler = self._makeOne(supervisor)
        handler.rpcinterface = DummyRPCServer()
        import xmlrpclib
        data = xmlrpclib.dumps((), 'supervisor.getVersion')
        request = DummyRequest('/what/ever', None, None, None)
        handler.continue_request(data, request)
        logdata = supervisor.options.logger.data
        self.assertEqual(len(logdata), 2)
        self.assertEqual(logdata[0],
               u'XML-RPC method called: supervisor.getVersion()')
        self.assertEqual(logdata[1],
            u'XML-RPC method supervisor.getVersion() returned successfully')
        self.assertEqual(len(request.producers), 1)
        xml_response = request.producers[0]
        response = xmlrpclib.loads(xml_response)
        self.assertEqual(response[0][0], '1.0')
        self.assertEqual(request._done, True)
        self.assertEqual(request.headers['Content-Type'], 'text/xml')
        self.assertEqual(request.headers['Content-Length'], len(xml_response))

    def test_continue_request_500(self):
        supervisor = DummySupervisor()
        handler = self._makeOne(supervisor)
        handler.rpcinterface = DummyRPCServer()
        import xmlrpclib
        data = xmlrpclib.dumps((), 'supervisor.raiseError')
        request = DummyRequest('/what/ever', None, None, None)
        handler.continue_request(data, request)
        logdata = supervisor.options.logger.data
        self.assertEqual(len(logdata), 2)
        self.assertEqual(logdata[0],
               u'XML-RPC method called: supervisor.raiseError()')
        self.failUnless(logdata[1].startswith('Traceback'))
        self.failUnless(logdata[1].endswith('ValueError: error\n'))
        self.assertEqual(len(request.producers), 0)
        self.assertEqual(request._error, 500)

class LogtailHandlerTests(unittest.TestCase):
    def _getTargetClass(self):
        from http import logtail_handler
        return logtail_handler

    def _makeOne(self, supervisord):
        return self._getTargetClass()(supervisord)

    def test_handle_request_logfile_none(self):
        supervisor = DummySupervisor()
        pconfig = DummyPConfig('foo', 'foo', None)
        options = DummyOptions()
        supervisor.processes = {'foo':DummyProcess(options, pconfig)}
        handler = self._makeOne(supervisor)
        request = DummyRequest('/logtail/foo', None, None, None)
        handler.handle_request(request)
        self.assertEqual(request._error, 410)

    def test_handle_request_logfile_missing(self):
        supervisor = DummySupervisor()
        pconfig = DummyPConfig('foo', 'foo', 'it/is/missing')
        options = DummyOptions()
        supervisor.processes = {'foo':DummyProcess(options, pconfig)}
        handler = self._makeOne(supervisor)
        request = DummyRequest('/logtail/foo', None, None, None)
        handler.handle_request(request)
        self.assertEqual(request._error, 410)

    def test_handle_request(self):
        supervisor = DummySupervisor()
        f = tempfile.NamedTemporaryFile()
        t = f.name
        options = DummyOptions()
        pconfig = DummyPConfig('foo', 'foo', logfile=t)
        supervisor.processes = {'foo':DummyProcess(options, pconfig)}
        handler = self._makeOne(supervisor)
        request = DummyRequest('/logtail/foo', None, None, None)
        handler.handle_request(request)
        self.assertEqual(request._error, None)
        from medusa import http_date
        self.assertEqual(request.headers['Last-Modified'],
                         http_date.build_http_date(os.stat(t)[stat.ST_MTIME]))
        self.assertEqual(request.headers['Content-Type'], 'text/plain')
        self.assertEqual(len(request.producers), 1)
        self.assertEqual(request._done, True)

class SupervisordTests(unittest.TestCase):
    def _getTargetClass(self):
        from supervisord import Supervisor
        return Supervisor

    def _makeOne(self, options):
        return self._getTargetClass()(options)

    def test_main(self):
        options = DummyOptions()
        supervisord = self._makeOne(options)
        pconfig = DummyPConfig('foo', 'foo', '/bin/foo')
        options.programs = [pconfig]
        supervisord.main(args='abc', test=True, first=True)
        self.assertEqual(options.realizeargs, 'abc')
        self.assertEqual(options.environment_processed, True)
        self.assertEqual(options.fds_cleaned_up, True)
        self.assertEqual(options.rlimits_set, True)
        self.assertEqual(options.make_logger_messages,
                         (['setuid_called'], ['rlimits_set']))
        self.assertEqual(options.autochildlogdir_cleared, True)
        self.assertEqual(options.autochildlogs_created, True)
        self.assertEqual(len(supervisord.processes), 1)
        self.assertEqual(supervisord.processes['foo'].options, options)
        self.assertEqual(options.pidfile_written, True)
        self.assertEqual(options.httpserver_opened, True)
        self.assertEqual(options.signals_set, True)
        self.assertEqual(options.daemonized, True)
        self.assertEqual(options.cleaned_up, True)

    def test_get_state(self):
        from supervisord import SupervisorStates
        options = DummyOptions()
        supervisord = self._makeOne(options)
        self.assertEqual(supervisord.get_state(), SupervisorStates.ACTIVE)
        supervisord.mood = -1
        self.assertEqual(supervisord.get_state(), SupervisorStates.SHUTDOWN)

    def test_start_necessary(self):
        from supervisord import ProcessStates
        options = DummyOptions()
        pconfig1 = DummyPConfig('killed', 'killed', '/bin/killed')
        process1 = DummyProcess(options, pconfig1, ProcessStates.EXITED)
        pconfig2 = DummyPConfig('error', 'error', '/bin/error')
        process2 = DummyProcess(options, pconfig2, ProcessStates.FATAL)
        pconfig3 = DummyPConfig('notstarted', 'notstarted', '/bin/notstarted',
                                autostart=True)
        process3 = DummyProcess(options, pconfig3, ProcessStates.STOPPED)
        pconfig4 = DummyPConfig('wontstart', 'wonstart', '/bin/wontstart',
                                autostart=True)
        process4 = DummyProcess(options, pconfig4, ProcessStates.BACKOFF)
        pconfig5 = DummyPConfig('backingoff', 'backingoff', '/bin/backingoff',
                                autostart=True)
        process5 = DummyProcess(options, pconfig5, ProcessStates.BACKOFF)
        now = time.time()
        process5.delay = now + 1000

        supervisord = self._makeOne(options)
        supervisord.processes = {'killed': process1, 'error': process2,
                                 'notstarted':process3, 'wontstart':process4,
                                 'backingoff':process5}
        supervisord.start_necessary()
        self.assertEqual(process1.spawned, True)
        self.assertEqual(process2.spawned, False)
        self.assertEqual(process3.spawned, True)
        self.assertEqual(process4.spawned, True)
        self.assertEqual(process5.spawned, False)

    def test_stop_all(self):
        options = DummyOptions()
        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
        process1 = DummyProcess(options, pconfig1, state=ProcessStates.STOPPED)
        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
        process2 = DummyProcess(options, pconfig2, state=ProcessStates.RUNNING)
        pconfig3 = DummyPConfig('process3', 'process3', '/bin/process3')
        process3 = DummyProcess(options, pconfig3, state=ProcessStates.STARTING)
        pconfig4 = DummyPConfig('process4', 'process4', '/bin/process4')
        process4 = DummyProcess(options, pconfig4, state=ProcessStates.BACKOFF)
        process4.delay = 1000
        process4.backoff = 10
        supervisord = self._makeOne(options)
        supervisord.processes = {'process1': process1, 'process2': process2,
                                 'process3':process3, 'process4':process4}

        supervisord.stop_all()
        self.assertEqual(process1.stop_called, False)
        self.assertEqual(process2.stop_called, True)
        self.assertEqual(process3.stop_called, True)
        self.assertEqual(process4.stop_called, False)

        self.assertEqual(process4.delay, 0)
        self.assertEqual(process4.backoff, 0)
        self.assertEqual(process4.system_stop, 1)

        
    def test_transition(self):
        options = DummyOptions()

        # this should go to FATAL via transition()
        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
        process1 = DummyProcess(options, pconfig1, state=ProcessStates.BACKOFF)
        process1.backoff = 10000
        process1.delay = 1
        process1.system_stop = 0

        # this should go to RUNNING via transition()
        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
        process2 = DummyProcess(options, pconfig2, state=ProcessStates.STARTING)
        process2.backoff = 1
        process2.delay = 1
        process2.system_stop = 0
        process2.laststart = 1

        supervisord = self._makeOne(options)
        supervisord.processes = { 'process1': process1, 'process2': process2 }

        supervisord.transition()

        # this implies FATAL
        self.assertEqual(process1.backoff, 0)
        self.assertEqual(process1.delay, 0)
        self.assertEqual(process1.system_stop, 1)

        # this implies RUNNING
        self.assertEqual(process2.backoff, 0)
        self.assertEqual(process2.delay, 0)
        self.assertEqual(process2.system_stop, 0)

    def test_get_undead(self):
        options = DummyOptions()

        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
        process1 = DummyProcess(options, pconfig1, state=ProcessStates.STOPPING)
        process1.delay = time.time() - 1

        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
        process2 = DummyProcess(options, pconfig2, state=ProcessStates.STOPPING)
        process2.delay = time.time() + 1000

        pconfig3 = DummyPConfig('process3', 'process3', '/bin/process3')
        process3 = DummyProcess(options, pconfig3, state=ProcessStates.RUNNING)

        supervisord = self._makeOne(options)
        supervisord.processes = { 'process1': process1, 'process2': process2,
                                  'process3':process3 }

        undead = supervisord.get_undead()
        self.assertEqual(undead, [process1])

    def test_kill_undead(self):
        options = DummyOptions()

        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
        process1 = DummyProcess(options, pconfig1, state=ProcessStates.STOPPING)
        process1.delay = time.time() - 1

        pconfig2 = DummyPConfig('process2', 'process2', '/bin/process2')
        process2 = DummyProcess(options, pconfig2, state=ProcessStates.STOPPING)
        process2.delay = time.time() + 1000

        supervisord = self._makeOne(options)
        supervisord.processes = { 'process1': process1, 'process2': process2}

        supervisord.kill_undead()
        self.assertEqual(process1.killed_with, signal.SIGKILL)

    def test_reap(self):
        options = DummyOptions()
        options.waitpid_return = 1, 1
        pconfig = DummyPConfig('process', 'process', '/bin/process1')
        process = DummyProcess(options, pconfig)
        process.drained = False
        process.killing = 1
        process.laststop = None
        process.waitstatus = None, None
        options.pidhistory = {1:process}
        supervisord = self._makeOne(options)
        
        supervisord.reap(once=True)
        self.assertEqual(process.finished, (1,1))

    def test_handle_sigterm(self):
        options = DummyOptions()
        options.signal = signal.SIGTERM
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, -1)
        self.assertEqual(options.logger.data[0],
                         'received SIGTERM indicating exit request')

    def test_handle_sigint(self):
        options = DummyOptions()
        options.signal = signal.SIGINT
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, -1)
        self.assertEqual(options.logger.data[0],
                         'received SIGINT indicating exit request')

    def test_handle_sigquit(self):
        options = DummyOptions()
        options.signal = signal.SIGQUIT
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, -1)
        self.assertEqual(options.logger.data[0],
                         'received SIGQUIT indicating exit request')

    def test_handle_sighup(self):
        options = DummyOptions()
        options.signal = signal.SIGHUP
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, 0)
        self.assertEqual(options.logger.data[0],
                         'received SIGHUP indicating restart request')

    def test_handle_sigusr2(self):
        options = DummyOptions()
        options.signal = signal.SIGUSR2
        pconfig1 = DummyPConfig('process1', 'process1', '/bin/process1')
        process1 = DummyProcess(options, pconfig1, state=ProcessStates.STOPPING)
        process1.delay = time.time() - 1
        supervisord = self._makeOne(options)
        supervisord.processes = {'process1':process1}
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, 1)
        self.assertEqual(options.logs_reopened, True)
        self.assertEqual(process1.logs_reopened, True)
        self.assertEqual(options.logger.data[0],
                         'received SIGUSR2 indicating log reopen request')

    def test_handle_unknown_signal(self):
        options = DummyOptions()
        options.signal = signal.SIGUSR1
        supervisord = self._makeOne(options)
        supervisord.handle_signal()
        self.assertEqual(supervisord.mood, 1)
        self.assertEqual(options.logger.data[0],
                         'received SIGUSR1 indicating nothing')
        

class ControllerTests(unittest.TestCase):
    def _getTargetClass(self):
        from supervisorctl import Controller
        return Controller

    def _makeOne(self, options):
        return self._getTargetClass()(options)

    def test_ctor(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        self.assertEqual(controller.prompt, options.prompt + '> ')

    def test__upcheck(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        result = controller._upcheck()
        self.assertEqual(result, True)

    def test_onecmd(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.onecmd('help')
        self.assertEqual(result, None)
        self.failUnless(
            controller.stdout.getvalue().find('Documented commands') != -1
            )

    def test_tail_noname(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('')
        self.assertEqual(result, None)
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(lines[0], 'Error: too few arguments')

    def test_tail_toomanyargs(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('one two three')
        self.assertEqual(result, None)
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(lines[0], 'Error: too many arguments')

    def test_tail_onearg(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('foo')
        self.assertEqual(result, None)
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(len(lines), 12)
        self.assertEqual(lines[0], 'output line')

    def test_tail_no_file(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('NO_FILE')
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(len(lines), 2)
        self.assertEqual(lines[0], 'NO_FILE: ERROR (no log file)')

    def test_tail_failed(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('FAILED')
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(len(lines), 2)
        self.assertEqual(lines[0], 'FAILED: ERROR (unknown error reading log)')

    def test_tail_bad_name(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('BAD_NAME')
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(len(lines), 2)
        self.assertEqual(lines[0], 'BAD_NAME: ERROR (no such process name)')

    def test_tail_twoargs(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_tail('-10 foo')
        self.assertEqual(result, None)
        lines = controller.stdout.getvalue().split('\n')
        self.assertEqual(len(lines), 3)
        self.assertEqual(lines[0], 'tput line')

    def test_status_oneprocess(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_status('foo')
        self.assertEqual(result, None)
        expected = "foo            RUNNING    foo description\n"
        self.assertEqual(controller.stdout.getvalue(), expected)

    def test_status_allprocesses(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_status('')
        self.assertEqual(result, None)
        expected = """\
foo            RUNNING    foo description
bar            FATAL      bar description
baz            STOPPED    baz description
"""
        self.assertEqual(controller.stdout.getvalue(), expected)

    def test_start_fail(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('')
        self.assertEqual(result, None)
        expected = "Error: start requires a process name"
        self.assertEqual(controller.stdout.getvalue().split('\n')[0], expected)

    def test_start_badname(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('BAD_NAME')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'BAD_NAME: ERROR (no such process)\n')

    def test_start_alreadystarted(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('ALREADY_STARTED')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'ALREADY_STARTED: ERROR (already started)\n')

    def test_start_spawnerror(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('SPAWN_ERROR')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'SPAWN_ERROR: ERROR (spawn error)\n')

    def test_start_one_success(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('foo')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(), 'foo: started\n')

    def test_start_many(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('foo bar')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'foo: started\nbar: started\n')

    def test_start_all(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_start('all')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue(),
                'foo: started\nfoo2: started\nfailed: ERROR (spawn error)\n')


    def test_stop_fail(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('')
        self.assertEqual(result, None)
        expected = "Error: stop requires a process name"
        self.assertEqual(controller.stdout.getvalue().split('\n')[0], expected)

    def test_stop_badname(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('BAD_NAME')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'BAD_NAME: ERROR (no such process)\n')

    def test_stop_notrunning(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('NOT_RUNNING')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'NOT_RUNNING: ERROR (not running)\n')

    def test_stop_failed(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('FAILED')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(), 'FAILED\n')

    def test_stop_one_success(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('foo')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(), 'foo: stopped\n')

    def test_stop_many(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('foo bar')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'foo: stopped\nbar: stopped\n')

    def test_stop_all(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_stop('all')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue(),
         'foo: stopped\nfoo2: stopped\nfailed: ERROR (no such process)\n')

    def test_restart_fail(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_restart('')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue().split('\n')[0],
         'Error: restart requires a process name')

    def test_restart_one(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_restart('foo')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue(),
                         'foo: stopped\nfoo: started\n')

    def test_restart_all(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_restart('all')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue(),
                         ('foo: stopped\nfoo2: stopped\n'
                          'failed: ERROR (no such process)\n'
                          'foo: started\nfoo2: started\n'
                          'failed: ERROR (spawn error)\n'))

    def test_reload_fail(self):
        options = DummyClientOptions()
        options.interactive = False
        options._server.supervisor._restartable = False
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_reload('')
        self.assertEqual(result, None)
        self.assertEqual(options._server.supervisor._restarted, False)
        
    def test_reload(self):
        options = DummyClientOptions()
        options.interactive = False
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_reload('')
        self.assertEqual(result, None)
        self.assertEqual(options._server.supervisor._restarted, True)
        
    def test_shutdown_fail(self):
        options = DummyClientOptions()
        options.interactive = False
        options._server.supervisor._restartable = False
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_shutdown('')
        self.assertEqual(result, None)
        self.assertEqual(options._server.supervisor._shutdown, False)

    def test_shutdown(self):
        options = DummyClientOptions()
        options.interactive = False
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_shutdown('')
        self.assertEqual(result, None)
        self.assertEqual(options._server.supervisor._shutdown, True)





    def test_clear_fail(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_clear('')
        self.assertEqual(result, None)
        expected = "Error: clear requires a process name"
        self.assertEqual(controller.stdout.getvalue().split('\n')[0], expected)

    def test_clear_badname(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_clear('BAD_NAME')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'BAD_NAME: ERROR (no such process)\n')

    def test_clear_one_success(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_clear('foo')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(), 'foo: cleared\n')

    def test_clear_many(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_clear('foo bar')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'foo: cleared\nbar: cleared\n')

    def test_clear_all(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_clear('all')
        self.assertEqual(result, None)

        self.assertEqual(controller.stdout.getvalue(),
         'foo: cleared\nfoo2: cleared\nfailed: ERROR (failed)\n')

    def test_open_fail(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_open('badname')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(),
                         'ERROR: url must be http:// or unix://\n')

    def test_open_succeed(self):
        options = DummyClientOptions()
        controller = self._makeOne(options)
        controller.stdout = StringIO()
        result = controller.do_open('http://localhost:9002')
        self.assertEqual(result, None)
        self.assertEqual(controller.stdout.getvalue(), """\
foo            RUNNING    foo description
bar            FATAL      bar description
baz            STOPPED    baz description
""")
        
class TailFProducerTests(unittest.TestCase):
    def _getTargetClass(self):
        from http import tail_f_producer
        return tail_f_producer

    def _makeOne(self, request, filename, head):
        return self._getTargetClass()(request, filename, head)

    def test_handle_more(self):
        request = DummyRequest('/logtail/foo', None, None, None)
        f = tempfile.NamedTemporaryFile()
        f.write('a' * 80)
        f.flush()
        t = f.name
        producer = self._makeOne(request, t, 80)
        result = producer.more()
        self.assertEqual(result, 'a' * 80)
        f.write('w' * 100)
        f.flush()
        result = producer.more()
        self.assertEqual(result, 'w' * 100)
        result = producer.more()
        self.assertEqual(result, http.NOT_DONE_YET)
        f.truncate(0)
        f.flush()
        result = producer.more()
        self.assertEqual(result, '==> File truncated <==\n')

class BasicAuthTransportTests(unittest.TestCase):
    def _getTargetClass(self):
        from options import BasicAuthTransport
        return BasicAuthTransport

    def _makeOne(self, username=None, password=None, serverurl=None):
        klass = self._getTargetClass()
        return klass(username, password, serverurl)

    def test_ctor(self):
        instance = self._makeOne('username', 'password', 'serverurl')
        self.assertEqual(instance.username, 'username')
        self.assertEqual(instance.password, 'password')
        self.assertEqual(instance.serverurl, 'serverurl')
        self.assertEqual(instance.verbose, False)

    def test_works_with_py25(self):
        instance = self._makeOne('username', 'password', 'serverurl')
        # the test is just to insure that this method can be called; failure
        # would be an AttributeError for _use_datetime under Python 2.5
        parser, unmarshaller = instance.getparser() # this uses _use_datetime

class DummyProcess:
    # Initial state; overridden by instance variables
    pid = 0 # Subprocess pid; 0 when not running
    laststart = 0 # Last time the subprocess was started; 0 if never
    laststop = 0  # Last time the subprocess was stopped; 0 if never
    delay = 0 # If nonzero, delay starting or killing until this time
    administrative_stop = 0 # true if the process has been stopped by an admin
    system_stop = 0 # true if the process has been stopped by the system
    killing = 0 # flag determining whether we are trying to kill this proc
    backoff = 0 # backoff counter (to backofflimit)
    waitstatus = None
    exitstatus = None
    pipes = None
    childlog = None # the current logger 
    spawnerr = None
    logbuffer = '' # buffer of characters to send to child process' stdin
    
    def __init__(self, options, config, state=ProcessStates.RUNNING):
        self.options = options
        self.config = config
        self.childlog = DummyLogger()
        self.logsremoved = False
        self.stop_called = False
        self.backoff_secs = None
        self.spawned = False
        self.state = state
        self.error_at_clear = False
        self.killed_with = None
        self.drained = False
        self.logbuffer = ''
        self.logged = ''
        self.pipes = {}
        self.finished = None
        self.logs_reopened = False

    def reopenlogs(self):
        self.logs_reopened = True

    def removelogs(self):
        if self.error_at_clear:
            raise IOError('whatever')
        self.logsremoved = True

    def get_state(self):
        return self.state

    def stop(self):
        self.stop_called = True
        self.killing = False
        self.state = ProcessStates.STOPPED

    def kill(self, signal):
        self.killed_with = signal

    def spawn(self):
        self.spawned = True
        self.state = ProcessStates.RUNNING

    def drain(self):
        self.drained = True

    def get_pipe_drains(self):
        return []

    def __cmp__(self, other):
        return cmp(self.config.priority, other.config.priority)

    def readable_fds(self):
        return []

    def log_output(self):
        self.logged = self.logged + self.logbuffer
        self.logbuffer = ''

    def finish(self, pid, sts):
        self.finished = pid, sts

class DummyPConfig:
    def __init__(self, name, command, priority=999, autostart=True,
                 autorestart=True, startsecs=10, startretries=999,
                 uid=None, logfile=None, logfile_backups=0,
                 logfile_maxbytes=0, log_stdout=True, log_stderr=False,
                 stopsignal=signal.SIGTERM, stopwaitsecs=10,
                 exitcodes=[0,2]):
        self.name = name
        self.command = command
        self.priority = priority
        self.autostart = autostart
        self.autorestart = autorestart
        self.startsecs = startsecs
        self.startretries = startretries
        self.uid = uid
        self.logfile = logfile
        self.logfile_backups = logfile_backups
        self.logfile_maxbytes = logfile_maxbytes
        self.log_stdout = log_stdout
        self.log_stderr = log_stderr
        self.stopsignal = stopsignal
        self.stopwaitsecs = stopwaitsecs
        self.exitcodes = exitcodes
        

class DummyLogger:
    def __init__(self):
        self.reopened = False
        self.removed = False
        self.closed = False
        self.data = []

    def info(self, *args):
        for arg in args:
            self.data.append(arg)
    warn = log = debug = critical = trace = info
    def reopen(self):
        self.reopened = True
    def close(self):
        self.closed = True
    def remove(self):
        self.removed = True

class DummyOptions:

    TRACE = 5
    make_pipes_error = None
    fork_error = None
    execv_error = None
    kill_error = None
    minfds = 5

    def __init__(self):
        self.identifier = 'supervisor'
        self.childlogdir = '/tmp'
        self.uid = 999
        self.logger = self.getLogger()
        self.backofflimit = 10
        self.logfile = '/tmp/logfile'
        self.nocleanup = False
        self.pidhistory = {}
        self.programs = []
        self.nodaemon = False
        self.socket_map = {}
        self.mood = 1
        self.mustreopen = False
        self.realizeargs = None
        self.fds_cleaned_up = False
        self.rlimit_set = False
        self.setuid_called = False
        self.httpserver_opened = False
        self.signals_set = False
        self.daemonized = False
        self.make_logger_messages = None
        self.autochildlogs_created = False
        self.autochildlogdir_cleared = False
        self.cleaned_up = False
        self.pidfile_written = False
        self.directory = None
        self.waitpid_return = None, None
        self.kills = {}
        self.signal = None
        self.parent_pipes_closed = None
        self.child_pipes_closed = None
        self.forkpid = 0
        self.pgrp_set = None
        self.duped = {}
        self.written = {}
        self.fds_closed = []
        self._exitcode = None
        self.execv_args = None
        self.setuid_msg = None
        self.privsdropped = None
        self.logs_reopened = False
        self.environment_processed = False

    def getLogger(self, *args, **kw):
        logger = DummyLogger()
        logger.handlers = [DummyLogger()]
        logger.args = args, kw
        return logger

    def realize(self, args):
        self.realizeargs = args

    def cleanup_fds(self):
        self.fds_cleaned_up = True

    def set_rlimits(self):
        self.rlimits_set = True
        return ['rlimits_set']

    def set_uid(self):
        self.setuid_called = True
        return 'setuid_called'

    def openhttpserver(self, supervisord):
        self.httpserver_opened = True

    def daemonize(self):
        self.daemonized = True

    def setsignals(self):
        self.signals_set = True

    def get_socket_map(self):
        return self.socket_map

    def make_logger(self, critical_msgs, info_msgs):
        self.make_logger_messages = critical_msgs, info_msgs

    def create_autochildlogs(self):
        self.autochildlogs_created = True

    def clear_autochildlogdir(self):
        self.autochildlogdir_cleared = True

    def cleanup(self):
        self.cleaned_up = True

    def write_pidfile(self):
        self.pidfile_written = True

    def waitpid(self):
        return self.waitpid_return

    def make_process(self, config):
        return DummyProcess(self, config)

    def kill(self, pid, sig):
        if self.kill_error:
            raise OSError(self.kill_error)
        self.kills[pid] = sig

    def stat(self, filename):
        return os.stat(filename)

    def get_path(self):
        return ["/bin", "/usr/bin", "/usr/local/bin"]

    def check_execv_args(self, filename, argv, st):
        if filename == '/bad/filename':
            return 'bad filename'
        return None

    def make_pipes(self):
        if self.make_pipes_error:
            raise OSError(self.make_pipes_error)
        pipes = {}
        pipes['child_stdin'], pipes['stdin'] = (3, 4)
        pipes['stdout'], pipes['child_stdout'] = (5, 6)
        pipes['stderr'], pipes['child_stderr'] = (7, 8)
        return pipes

    def fork(self):
        if self.fork_error:
            raise OSError(self.fork_error)
        return self.forkpid

    def close_fd(self, fd):
        self.fds_closed.append(fd)

    def close_parent_pipes(self, pipes):
        self.parent_pipes_closed = pipes

    def close_child_pipes(self, pipes):
        self.child_pipes_closed = pipes

    def setpgrp(self):
        self.pgrp_set = True

    def dup2(self, frm, to):
        self.duped[frm] = to

    def write(self, fd, data):
        old_data = self.written.setdefault(fd, [])
        old_data.append(data)

    def _exit(self, code):
        self._exitcode = code

    def execv(self, filename, argv):
        if self.execv_error:
            if self.execv_error == 1:
                raise OSError(self.execv_error)
            else:
                raise RuntimeError(self.execv_error)
        self.execv_args = (filename, argv)

    def dropPrivileges(self, uid):
        if self.setuid_msg:
            return self.setuid_msg
        self.privsdropped = uid

    def readfd(self, fd):
        return fd

    def reopenlogs(self):
        self.logs_reopened = True

    def process_environment(self):
        self.environment_processed = True

class DummyClientOptions:
    def __init__(self):
        self.prompt = 'supervisor'
        self.serverurl = 'http://localhost:9001'
        self.username = 'chrism'
        self.password = '123'
        self._server = DummyRPCServer()

    def getServerProxy(self):
        return self._server

_NOW = 1151365354
_TIMEFORMAT = '%b %d %I:%M %p'

class DummySupervisorRPCNamespace:
    _restartable = True
    _restarted = False
    _shutdown = False

    def getVersion(self):
        return '1.0'

    def readProcessLog(self, name, offset, length):
        import xmlrpclib
        if name == 'BAD_NAME':
            raise xmlrpclib.Fault(xmlrpc.Faults.BAD_NAME, 'BAD_NAME')
        elif name == 'FAILED':
            raise xmlrpclib.Fault(xmlrpc.Faults.FAILED, 'FAILED')
        elif name == 'NO_FILE':
            raise xmlrpclib.Fault(xmlrpc.Faults.NO_FILE, 'NO_FILE')
        a = 'output line\n' * 10
        return a[offset:]

    def getAllProcessInfo(self):
        return [
            {
            'name':'foo',
            'pid':11,
            'state':ProcessStates.RUNNING,
            'statename':'RUNNING',
            'start':_NOW - 100,
            'stop':0,
            'spawnerr':'',
            'now':_NOW,
            'description':'foo description',
             },
            {
            'name':'bar',
            'pid':12,
            'state':ProcessStates.FATAL,
            'statename':'FATAL',
            'start':_NOW - 100,
            'stop':_NOW - 50,
            'spawnerr':'screwed',
            'now':_NOW,
            'description':'bar description',
             },
            {
            'name':'baz',
            'pid':12,
            'state':ProcessStates.STOPPED,
            'statename':'STOPPED',
            'start':_NOW - 100,
            'stop':_NOW - 25,
            'spawnerr':'',
            'now':_NOW,
            'description':'baz description',
             },
            ]
                

    def getProcessInfo(self, name):
        return {
            'name':'foo',
            'pid':11,
            'state':ProcessStates.RUNNING,
            'statename':'RUNNING',
            'start':_NOW - 100,
            'stop':0,
            'spawnerr':'',
            'now':_NOW,
            'description':'foo description',
             }

    def startProcess(self, name):
        from xmlrpclib import Fault
        if name == 'BAD_NAME':
            raise Fault(xmlrpc.Faults.BAD_NAME, 'BAD_NAME')
        if name == 'ALREADY_STARTED':
            raise Fault(xmlrpc.Faults.ALREADY_STARTED, 'ALREADY_STARTED')
        if name == 'SPAWN_ERROR':
            raise Fault(xmlrpc.Faults.SPAWN_ERROR, 'SPAWN_ERROR')
        return True

    def startAllProcesses(self):
        return [
            {'name':'foo', 'status': xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'foo2', 'status':xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'failed', 'status':xmlrpc.Faults.SPAWN_ERROR,
             'description':'SPAWN_ERROR'}
            ]

    def stopProcess(self, name):
        from xmlrpclib import Fault
        if name == 'BAD_NAME':
            raise Fault(xmlrpc.Faults.BAD_NAME, 'BAD_NAME')
        if name == 'NOT_RUNNING':
            raise Fault(xmlrpc.Faults.NOT_RUNNING, 'NOT_RUNNING')
        if name == 'FAILED':
            raise Fault(xmlrpc.Faults.FAILED, 'FAILED')
        
        return True
    
    def stopAllProcesses(self):
        return [
            {'name':'foo','status': xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'foo2', 'status':xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'failed', 'status':xmlrpc.Faults.BAD_NAME,
             'description':'FAILED'}
            ]

    def restart(self):
        if self._restartable:
            self._restarted = True
            return
        from xmlrpclib import Fault
        raise Fault(xmlrpc.Faults.SHUTDOWN_STATE, '')

    def shutdown(self):
        if self._restartable:
            self._shutdown = True
            return
        from xmlrpclib import Fault
        raise Fault(xmlrpc.Faults.SHUTDOWN_STATE, '')

    def clearProcessLog(self, name):
        from xmlrpclib import Fault
        if name == 'BAD_NAME':
            raise Fault(xmlrpc.Faults.BAD_NAME, 'BAD_NAME')
        return True

    def clearAllProcessLogs(self):
        return [
            {'name':'foo', 'status':xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'foo2', 'status':xmlrpc.Faults.SUCCESS,'description': 'OK'},
            {'name':'failed','status':xmlrpc.Faults.FAILED,
             'description':'FAILED'}
            ]

    def raiseError(self):
        raise ValueError('error')
        

class DummySystemRPCNamespace:
    pass

class DummyRPCServer:
    def __init__(self):
        self.supervisor = DummySupervisorRPCNamespace()
        self.system = DummySystemRPCNamespace()

class DummySupervisor:
    def __init__(self, processes=None, state=SupervisorStates.ACTIVE):
        if processes is None:
            processes = {}
        self.processes = processes
        self.options = DummyOptions()
        self.state = state

    def get_state(self):
        return self.state

class DummyRequest:
    command = 'GET'
    _error = None
    _done = False
    def __init__(self, path, params, query, fragment):
        self.args = path, params, query, fragment
        self.producers = []
        self.headers = {}

    def split_uri(self):
        return self.args

    def error(self, code):
        self._error = code

    def push(self, producer):
        self.producers.append(producer)

    def __setitem__(self, header, value):
        self.headers[header] = value

    def done(self):
        self._done = True
        
def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(SupervisordTests))
    suite.addTest(unittest.makeSuite(ControllerTests))
    suite.addTest(unittest.makeSuite(ServerOptionsTests))
    suite.addTest(unittest.makeSuite(SupervisorNamespaceXMLRPCInterfaceTests))
    suite.addTest(unittest.makeSuite(MainXMLRPCInterfaceTests))
    suite.addTest(unittest.makeSuite(SystemNamespaceXMLRPCInterfaceTests))
    suite.addTest(unittest.makeSuite(SubprocessTests))
    suite.addTest(unittest.makeSuite(XMLRPCMarshallingTests))
    suite.addTest(unittest.makeSuite(XMLRPCHandlerTests))
    suite.addTest(unittest.makeSuite(LogtailHandlerTests))
    suite.addTest(unittest.makeSuite(TailFProducerTests))
    suite.addTest(unittest.makeSuite(BasicAuthTransportTests))
    return suite

if __name__ == '__main__':
    __file__ = sys.argv[0]
    unittest.main(defaultTest='test_suite')

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.1361 ]--