[Pymoul-svn] SF.net SVN: pymoul: [201] pymoul/trunk/src/moul/osdependent/processinfo.py
Status: Alpha
Brought to you by:
tiran
|
From: <ti...@us...> - 2007-02-27 11:46:40
|
Revision: 201
http://pymoul.svn.sourceforge.net/pymoul/?rev=201&view=rev
Author: tiran
Date: 2007-02-27 03:46:40 -0800 (Tue, 27 Feb 2007)
Log Message:
-----------
processinfo cleanup
Modified Paths:
--------------
pymoul/trunk/src/moul/osdependent/processinfo.py
Modified: pymoul/trunk/src/moul/osdependent/processinfo.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/processinfo.py 2007-02-27 03:20:16 UTC (rev 200)
+++ pymoul/trunk/src/moul/osdependent/processinfo.py 2007-02-27 11:46:40 UTC (rev 201)
@@ -51,69 +51,87 @@
True
>>> getPidDetails(cur)['name'] == mapping[cur]
True
+
+>>> for impl in (WinEnumProcesses(), LinuxProcReader(), PsParser()):
+... if impl.supported():
+... isinstance(impl.getPids(), list) and None
+... isinstance(impl.getPidNames(), dict) and None
"""
__author__ = "Christian Heimes"
__version__ = "$Id$"
__revision__ = "$Revision$"
+__all__ = ['getPids', 'getPidNames', 'getPidDetails', 'supportedOS',
+ 'UnsupportedError']
+import logging
import os
+import os.path
import sys
-from logging import getLogger
-LOG = getLogger("processinfo")
+LOG = logging.getLogger("processinfo")
+NULL = "\x00"
+
_plat = sys.platform.startswith
if _plat('win') or _plat('cygwin'):
- LOG.debug("Using ctypes on Windows")
- IMPL = 'win'
from ctypes import windll, c_ulong, sizeof, c_buffer, byref
-
PSAPI = windll.psapi
KERNEL = windll.kernel32
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
PROCESS_FLAGS = PROCESS_QUERY_INFORMATION | PROCESS_VM_READ
else:
- import os
- PROC = '/proc'
- if os.path.isfile("%s/self/status" % PROC):
- LOG.debug("Using /proc on Linux")
- IMPL = 'proc'
- elif os.system('ps') == 0:
- LOG.debug("Using the 'ps' command on POSIX os")
- IMPL = 'ps'
- from subprocess import Popen
- from subprocess import PIPE
- else:
- LOG.warning("Unsupported OS. Neither /proc nor 'ps' works.")
- IMPL = "unsupported"
+ from subprocess import Popen
+ from subprocess import PIPE
-NULL = "\x00"
-
class UnsupportedError(OSError):
+ """OS or command not supported error
+ """
pass
class Unsupported(object):
- """Unsupported OS
+ """Unsupported OS implementation
+
+ Raises L{UnsupportedError}
"""
__slots__ = ()
- def supported(self):
+ @classmethod
+ def supported(cls):
+ """Supported flag property"""
return False
- supported = property(supported)
- def getPids(self):
+ @classmethod
+ def log(cls):
+ """Log information about the implementation"""
+ LOG.warning("Unsupported OS. Neither proc filesystem nor 'ps' works.")
+
+ @classmethod
+ def getPids(cls):
"""Get a list of pids
+
+ @return: a list of pid numbers
+ @rtype: list(int)
"""
raise UnsupportedError
- def getPidNames(self):
- """Get a list of pid -> name
+ @classmethod
+ def getPidNames(cls):
+ """Get a mapping of pid -> name
+
+ @return: mapping of pid -> name
+ @rtype: dict(int:str)
"""
raise UnsupportedError
- def getPidDetails(self, pid):
+ @classmethod
+ def getPidDetails(pid):
"""Get detailed informations about a process
+
+ @param pid: pid number or 'self'
+ @type pid: int or basestring
+ @return: detailed information about a process
+ @rtype: dict(str:data)
"""
raise UnsupportedError
@@ -126,14 +144,26 @@
PIDNAMES = "%s --format=pid,ucmd" % CMD
PIDS = "%s --format=pid" % CMD
- def supported(self):
- return False
- supported = property(supported)
+ @classmethod
+ def supported(cls):
+ """Supported flag property"""
+ try:
+ cls._exec(cls.PIDS)
+ except Exception: # catch all!
+ return False
+ else:
+ return True
- def getPids(self):
+ @classmethod
+ def log(cls):
+ """Log information about the implementation"""
+ LOG.debug("Using the 'ps' command on POSIX os")
+
+ @classmethod
+ def getPids(cls):
"""Get a list of pids
"""
- stdout = self._exec(self.PIDS)
+ stdout = cls._exec(cls.PIDS)
if stdout is None:
return None
pids = []
@@ -146,10 +176,11 @@
pids.append(pid)
return pids
- def getPidNames(self):
+ @classmethod
+ def getPidNames(cls):
"""Get a list of pid -> name
"""
- stdout = self._exec(self.PIDNAMES)
+ stdout = cls._exec(cls.PIDNAMES)
if stdout is None:
return None
mapping = {}
@@ -165,14 +196,18 @@
mapping[pid] = name
return mapping
- def getPidDetails(self, pid):
+ @classmethod
+ def getPidDetails(cls, pid):
"""Get detailed informations about a process
TODO
"""
- raise UnsupportedError
+ if pid == 'self':
+ pid = os.getpid()
+ raise UnsupportedError("not implemented yet")
- def _exec(self, cmd):
+ @staticmethod
+ def _exec(cmd):
"""Execute command cmd
The method waits until the command has finished. It returns None of
@@ -203,42 +238,53 @@
"""
__slots__ = ()
- def supported(self):
- return True
- supported = property(supported)
+ PROC = "/proc"
- @staticmethod
- def getPids():
+ @classmethod
+ def supported(cls):
+ return os.path.isfile("%s/self/status" % cls.PROC)
+
+ @classmethod
+ def log(cls):
+ """Log information about the implementation"""
+ LOG.debug("Using the proc filesystem on Linux")
+
+ @classmethod
+ def getPids(cls):
"""Get a list of pids
"""
pids = []
- for name in os.listdir(PROC):
- if os.path.isdir(PROC + '/' + name):
+ for name in os.listdir(cls.PROC):
+ if os.path.isdir(cls.PROC + '/' + name):
try:
pids.append(int(name))
except ValueError:
pass
return pids
- def getPidNames(self):
+ @classmethod
+ def getPidNames(cls):
"""Get a list of pid -> name
"""
mapping = {}
- for pid in self.getPids():
- name = self._readProcStatus(pid, searchkey='name')
+ for pid in cls.getPids():
+ name = cls._readProcStatus(pid, searchkey='name')
if name is not None:
mapping[pid] = name
return mapping
- def getPidDetails(self, pid):
+ def getPidDetails(cls, pid):
"""Get detailed informations about a process
"""
- details = self._readProcStatus(pid)
- details.update(self._readProcOther(pid))
- details['cmdline'] = self._readProcCmdline(pid)
+ details = cls._readProcStatus(pid)
+ if details is None:
+ return None
+ details.update(cls._readProcOther(pid))
+ details['cmdline'] = cls._readProcCmdline(pid)
return details
- def _readProcStatus(self, pid, searchkey=None):
+ @classmethod
+ def _readProcStatus(cls, pid, searchkey=None):
"""Read and parse status informations for PID pid
pid - pid as long or int or 'self'
@@ -249,10 +295,12 @@
or returns None if they key can't be found.
"""
mapping = {}
+ status = '%s/%s/status' % (cls.PROC, pid)
try:
# read entiry file to avoid race conditions
- lines = open('%s/%s/status' % (PROC, pid), 'r').readlines()
+ lines = open(status, 'r').readlines()
except IOError:
+ LOG.exception("%s not found" % status)
return None
for line in lines:
try:
@@ -270,25 +318,27 @@
return None
return mapping
- def _readProcCmdline(self, pid):
+ @classmethod
+ def _readProcCmdline(cls, pid):
"""Read cmdline informations for pid and returns a list similar to sys.argv
"""
try:
# read entiry file to avoid race conditions
- data = open('%s/%s/cmdline' % (PROC, pid), 'r').read()
+ data = open('%s/%s/cmdline' % (cls.PROC, pid), 'r').read()
except IOError:
return None
return data.split(NULL)
- def _readProcOther(self, pid):
+ @classmethod
+ def _readProcOther(cls, pid):
"""Read other possible useful things
cwd -> current work directory (may not exist)
exe -> path to executable (may not exist)
"""
return {
- 'cwd' : os.path.realpath('%s/%s/cwd' % (PROC, pid)),
- 'exe' : os.path.realpath('%s/%s/exe' % (PROC, pid)),
+ 'cwd' : os.path.realpath('%s/%s/cwd' % (cls.PROC, pid)),
+ 'exe' : os.path.realpath('%s/%s/exe' % (cls.PROC, pid)),
}
class WinEnumProcesses(object):
@@ -299,11 +349,22 @@
"""
__slots__ = ()
- def supported(self):
- return True
- supported = property(supported)
+ @classmethod
+ def supported(cls):
+ try:
+ cls.getPids()
+ except Exception: # catch all!
+ return False
+ else:
+ return True
- def getPids(self):
+ @classmethod
+ def log(cls):
+ """Log information about the implementation"""
+ LOG.debug("Using ctypes on Windows")
+
+ @classmethod
+ def getPids(cls):
"""Get a list of pids
"""
arr = c_ulong * 256
@@ -316,14 +377,15 @@
nReturned = cbNeeded.value/sizeof(c_ulong()) # number of processes returned
return [pid for pid in lpidProcess][:nReturned]
- def getPidNames(self):
+ @classmethod
+ def getPidNames(cls):
"""Get a list of pid -> name
"""
hModule = c_ulong()
count = c_ulong()
modname = c_buffer(51)
mapping = {}
- for pid in self.getPids():
+ for pid in cls.getPids():
# Get handle to the process based on PID
hProcess = KERNEL.OpenProcess(PROCESS_FLAGS, False, pid)
if hProcess:
@@ -342,7 +404,8 @@
return mapping
- def getPidDetails(self, pid):
+ @classmethod
+ def getPidDetails(cls, pid):
"""Get detailed informations about a process
"""
if pid == 'self':
@@ -371,21 +434,32 @@
# Initialize global methods
-if IMPL == 'proc':
- _enumProcesses = LinuxProcReader()
-elif IMPL == 'ps':
- _enumProcesses = PsParser()
-elif IMPL == 'win':
- _enumProcesses = WinEnumProcesses()
-else:
+_enumProcesses = None
+for impl in (WinEnumProcesses(), LinuxProcReader(), PsParser()):
+ if impl.supported():
+ impl.log()
+ _enumProcesses = impl
+ break
+
+if _enumProcesses is None:
LOG.error("System %s is not supported" % sys.platform)
_enumProcesses = Unsupported()
getPids = _enumProcesses.getPids
getPidNames = _enumProcesses.getPidNames
getPidDetails = _enumProcesses.getPidDetails
-supportedOS = _enumProcesses.supported
+supportedOS = _enumProcesses.supported()
+def test_suite():
+ import unittest
+ from doctest import DocTestSuite
+ return unittest.TestSuite((
+ DocTestSuite()
+ ))
+
if __name__ == '__main__':
+ import unittest
+ logging.basicConfig()
+ unittest.main(defaultTest="test_suite")
+ print getPids()
print getPidNames()
- print getPidDetails('self')
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|