[Pymoul-svn] SF.net SVN: pymoul: [80] pymoul/trunk/src/moul
Status: Alpha
Brought to you by:
tiran
|
From: <ti...@us...> - 2007-01-26 14:17:52
|
Revision: 80
http://pymoul.svn.sourceforge.net/pymoul/?rev=80&view=rev
Author: tiran
Date: 2007-01-26 06:17:50 -0800 (Fri, 26 Jan 2007)
Log Message:
-----------
More cleanups
Use WMI (f.... slow) to get process informations under Linux. Memo to me: put it inside a thread
Moved os specific stuff around
Modified Paths:
--------------
pymoul/trunk/src/moul/config/__init__.py
pymoul/trunk/src/moul/log.py
pymoul/trunk/src/moul/osdependent/__init__.py
pymoul/trunk/src/moul/osdependent/linux/__init__.py
pymoul/trunk/src/moul/osdependent/processinfo.py
pymoul/trunk/src/moul/osdependent/win32/__init__.py
pymoul/trunk/src/moul/qt/ui/mainwindow.py
Added Paths:
-----------
pymoul/trunk/src/moul/osdependent/tests/test_osdependent.py
pymoul/trunk/src/moul/osdependent/win32/wmi.py
Modified: pymoul/trunk/src/moul/config/__init__.py
===================================================================
--- pymoul/trunk/src/moul/config/__init__.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/config/__init__.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -18,70 +18,13 @@
"""Configuration package
"""
import os
-import sys
-from logging import getLogger
-LOG = getLogger('pyMoul')
+from moul.osdependent import getMoulUserDataDir
+from moul.osdependent import getPyMoulDataDir
-# a program under py2exe is sys.frozen
-__FROZEN__ = bool(getattr(sys, 'frozen', False))
-# OS stuff
-__WIN32__ = sys.platform.startswith('win32') # win64, cygwin?
-__LINUX__ = sys.platform.startswith('linux2')
-__MACOSX__ = sys.platform.startswith('darwin')
-__UNIX__ = __LINUX__ or __MACOSX__
-
-LOG.debug("sys.frozen status: %s" % __FROZEN__)
-LOG.debug("OS detected: win32: %s, Linux: %s, Mac: %s, Un*x: %s" %
- (__WIN32__, __LINUX__, __MACOSX__, __UNIX__))
-
_marker=object()
-# XXX: what about cygwin, bsd and others?
-if __WIN32__:
- from moul.osdependent.win32 import (
- getMoulUserDataDir,
- getPyMoulDataDir as _getPyMoulDataDir,
- _startMOUL,
- EXEC_NAME
- )
-elif __LINUX__:
- from moul.osdependent.linux import (
- getMoulUserDataDir,
- getPyMoulDataDir as _getPyMoulDataDir,
- _startMOUL,
- EXEC_NAME
- )
-elif __MACOSX__:
- from moul.osdependent.darwin import (
- getMoulUserDataDir,
- getPyMoulDataDir as _getPyMoulDataDir,
- _startMOUL,
- EXEC_NAME
- )
-else:
- raise RuntimeError('platform %s not supported' % sys.platform)
-def getPyMoulDataDir(check=False):
- """Get pyMoul data directory
-
- The directory contains log files, ini files and other local stuff
- """
- datadir = _getPyMoulDataDir()
- if check:
- if os.path.abspath(datadir) != datadir:
- raise ValueError("Datadir is not absolute %s" % datadir)
- if not os.path.isdir(datadir):
- parent = os.path.abspath(os.path.join(datadir, os.pardir))
- if not os.path.isdir(parent):
- raise ValueError("Datadir's parent dir does not exist: %s"
- % par)
- else:
- LOG.debug("Creating pyMoul data dir %s" % datadir)
- os.mkdir(datadir, 0750)
- LOG.debug("Using pyMoul data dir %s" % datadir)
- return datadir
-
-## configuration
+# configuration
class Configuration(dict):
def __new__(cls, *args, **kwargs):
self = dict.__new__(cls, *args, **kwargs)
Modified: pymoul/trunk/src/moul/log.py
===================================================================
--- pymoul/trunk/src/moul/log.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/log.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -33,9 +33,7 @@
getLogger = logging.getLogger
-# copied from moul.config to prevent circular imports
__FROZEN__ = bool(getattr(sys, 'frozen', False))
-__LOG_SIGNALS__ = not __FROZEN__
format = Formatter('%(asctime)s %(name)-8s %(levelname)-7s %(message)s',
#'%a, %d %b %Y %H:%M:%S'
@@ -92,6 +90,7 @@
fhdlr.doRollover()
def _systemInfo():
+ from moul.osdependent import __INFO__
LOG.debug("pyMoul version: %s" % moul_version)
LOG.debug("Python: %s" % repr(sys.version_info))
LOG.debug("Python: %s" % sys.version.replace('\n', ' '))
@@ -99,6 +98,9 @@
LOG.debug("system %r, node %r, release %r, version %r, machine %r, processor %r"
% platform.uname())
LOG.debug("platform name: %s" % platform.platform(True))
+ LOG.debug("sys.frozen status: %s" % __FROZEN__)
+ LOG.debug("OS detected: win32: %r, cygwin: %r, Linux: %r, Mac: %r, BSD: %r, "
+ "posix/Un*x: %r, NT: %r" % __INFO__)
# no setup the logging stuff!
@@ -108,6 +110,7 @@
shdlr = logging.StreamHandler(sys.stderr)
shdlr.setFormatter(format)
root.addHandler(shdlr)
+ #_systemInfo()
else:
_installMemoryHdlr()
# Redirect stdout and stderr to logger when running as frozen app
@@ -117,6 +120,7 @@
_installFileHdlr()
_removeMemoryHdlr()
+__LOG_SIGNALS__ = not __FROZEN__
def signalLogDecorator(__logger__):
"""Decorator to log signals
Modified: pymoul/trunk/src/moul/osdependent/__init__.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/__init__.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/osdependent/__init__.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -15,8 +15,118 @@
# with this program; if not, write to the Free Software Foundation, Inc., 59
# Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
+"""OS dependent code for linux, mac and win32
+
+>>> pids = getCurrentPids()
+>>> len(pids) > 1
+True
+
+>>> pids = getCurrentPidNames()
+>>> found = False
+>>> for pid, name in pids.items():
+... if name.lower().startswith('python'):
+... found = True
+>>> found
+True
"""
-"""
__author__ = "Christian Heimes"
__version__ = "$Id$"
__revision__ = "$Revision$"
+
+import os
+import sys
+
+# a program under py2exe is sys.frozen
+__FROZEN__ = bool(getattr(sys, 'frozen', False))
+# OS stuff
+_plat = sys.platform.startswith
+__WIN32__ = _plat('win32') # win64, cygwin?
+__CYGWIN__ = _plat('cygwin')
+__LINUX__ = _plat('linux2')
+__MACOSX__ = _plat('darwin')
+__BSD__ = _plat('freebsd') or _plat('netbsd') or _plat('openbsd')
+__POSIX__ = os.name.lower() == 'posix'
+__NT__ = os.name.lower() == 'nt'
+__INFO__ = (__WIN32__, __CYGWIN__, __LINUX__, __MACOSX__, __BSD__,
+ __POSIX__, __NT__)
+
+# names to import: from moul.osdependent.ID import NAME (as NAME)
+NAMES = ('getMoulUserDataDir',
+ ('getPyMoulDataDir', '_getPyMoulDataDir'), # as
+ '_startMOUL', 'EXEC_NAME',
+ 'getCurrentPids', 'getCurrentPidNames',
+ )
+
+_marker = object()
+
+def _importHelper(modname, names=None, target=None):
+ """Import a list of variables from a module
+
+ >>> mod = _importHelper('moul.osdependent')
+ >>> mod == _thismodule or mod
+ True
+ >>> vars = _importHelper('moul.osdependent', ('_marker', ))
+ >>> vars[0] is _marker
+ True
+ >>> class Target(object):
+ ... pass
+ >>> target = Target()
+ >>> vars = _importHelper('moul.osdependent', ('_marker', ), target=target)
+ >>> target._marker is _marker
+ True
+ >>> vars[0] is _marker
+ True
+
+ >>> vars = _importHelper('moul.osdependent', (('_marker', 'another'), ),
+ ... target=target)
+ >>> target.another is _marker
+ True
+ >>> vars[0] is _marker
+ True
+ """
+ mod = __import__(modname, globals(), locals(), [''])
+ if names is None:
+ return mod
+ else:
+ vars = []
+ for name in names:
+ if isinstance(name, (tuple, list)):
+ name, nameas = name
+ else:
+ nameas = name
+ var = getattr(mod, name)
+ vars.append(var)
+ if target is not None:
+ setattr(target, nameas, var)
+ return vars
+
+# XXX: what about cygwin, bsd and others?
+_thismodule = sys.modules[__name__]
+if __WIN32__:
+ _importHelper('moul.osdependent.win32', NAMES, target=_thismodule)
+elif __LINUX__:
+ _importHelper('moul.osdependent.linux', NAMES, target=_thismodule)
+elif __MACOSX__:
+ _importHelper('moul.osdependent.darwin', NAMES, target=_thismodule)
+else:
+ raise RuntimeError('platform %s not supported' % sys.platform)
+
+def getPyMoulDataDir(check=False):
+ """Get pyMoul data directory
+
+ The directory contains log files, ini files and other local stuff
+ """
+ datadir = _getPyMoulDataDir()
+ if check:
+ if os.path.abspath(datadir) != datadir:
+ raise ValueError("Datadir is not absolute %s" % datadir)
+ if not os.path.isdir(datadir):
+ parent = os.path.abspath(os.path.join(datadir, os.pardir))
+ if not os.path.isdir(parent):
+ raise ValueError("Datadir's parent dir does not exist: %s"
+ % par)
+ else:
+ LOG.debug("Creating pyMoul data dir %s" % datadir)
+ os.mkdir(datadir, 0750)
+ LOG.debug("Using pyMoul data dir %s" % datadir)
+ return datadir
Modified: pymoul/trunk/src/moul/osdependent/linux/__init__.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/linux/__init__.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/osdependent/linux/__init__.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -53,3 +53,52 @@
path = os.path.join(installdir, EXEC_NAME)
args = (EXEC_NAME,) + args
return os.spawnv(mode, path, args)
+
+# process info
+# based on http://gelb.bcom.at/trac/misc/browser/processinfo
+
+def getCurrentPids():
+ """Returns current process ids
+ """
+ pids = []
+ for fname in os.listdir("/proc"):
+ if os.path.isdir(os.path.join("/proc", fname)):
+ try:
+ pids.append(int(fname))
+ except ValueError:
+ continue
+ return pids
+
+def getCurrentPidDetails():
+ """Returns mapping pid -> detailed informations
+ """
+ mapping = {}
+ for pid in getCurrentPids():
+ try:
+ try:
+ # read entiry file to avoid race condition bugs
+ fd = open('/proc/%i/status' % pid, 'rb')
+ status = fd.read().split('\n')
+ finally:
+ if fd:
+ fd.close()
+ except IoError:
+ continue
+ details = {}
+ for line in status:
+ try:
+ key, value = line.split(':\t')
+ except ValueError:
+ continue
+ details[key.lower()] = value.strip()
+ mapping[pid] = details
+
+ return mapping
+
+def getCurrentPidNames():
+ """Returns mapping pid -> name
+ """
+ mapping = {}
+ for pid, details in getCurrentPidDetails().items():
+ mapping[pid] = details.get('name', None)
+ return mapping
Modified: pymoul/trunk/src/moul/osdependent/processinfo.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/processinfo.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/osdependent/processinfo.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -1,219 +1,219 @@
-#!/usr/bin/env python
-# -*- coding: iso-8859-1 -*-
-"""Process info
-
-Based on http://gelb.bcom.at/trac/misc/browser/processinfo
-
-Modified and optimized by Christian Heimes
-"""
-
-import sys
-import os
-
-__all__ = ['PLATFORM', 'UnsupportedOsError', 'getCurrentPids',
- 'getCurrentPidNames', 'getCurrentPidDetails']
-
-class UnsupportedOsError(NotImplementedError):
- pass
-
-PLATFORM = None
-
-_plat = sys.platform.startswith
-if _plat('win'):
- PLATFORM = 'win'
- win32process = None
- ctypes = None
- import csv
- try:
- import win32process
- except ImportError:
- try:
- import ctypes
- except ImportError:
- pass
-elif _plat('linux2') or _plat('cygwin'):
- PLATFORM = 'linux' # Linux or Cygwin
-elif _plat('darwin'):
- # XXX: unsupported
- PLATFORM = 'mac' # Mac OS X
-elif _plat('freebsd') or _plat('netbsd') or _plat('openbsd'):
- # XXX: unsupported
- PLATFORM = 'bsd' # *BSD
-else:
- PLATFORM = 'unknown'
- raise UnsupportedOsError(sys.platform)
-
-# ****************************************************************************
-# Linux / cygwin implementation
-
-def _getCurrentPids_linux():
- """
- Returns current process-id's.
-
- :return: list with process-id's.
- """
- pids = []
- for fname in os.listdir("/proc"):
- if os.path.isdir(os.path.join("/proc", fname)):
- try:
- pids.append(int(fname))
- except ValueError:
- continue
- return pids
-
-def _getCurrentPidDetails_linux():
- """Returns mapping pid -> detailed informations
- """
- mapping = {}
- for pid in getCurrentPids():
- try:
- try:
- # read entiry file to avoid race condition bugs
- fd = open('/proc/%i/status' % pid, 'rb')
- status = fd.read().split('\n')
- finally:
- if fd:
- fd.close()
- except IoError:
- continue
- details = {}
- for line in status:
- try:
- key, value = line.split(':\t')
- except ValueError:
- continue
- details[key.lower()] = value.strip()
- mapping[pid] = details
-
- return mapping
-
-# ****************************************************************************
-# Windows / win32 implementaton
-
-def _getCurrentPids_win():
- """
- Returns current process-id's.
-
- :return: List with process-id's.
- """
- if win32process is not None:
- return list(win32process.EnumProcesses())
- elif ctypes is not None:
- # ctypes is installed --> try psapi.dll
- psapi = ct.windll.psapi
- arr = ct.c_long * 1024
- process_ids = arr()
- cb = ct.sizeof(process_ids)
- bytes_returned = ct.c_ulong()
- psapi.EnumProcesses(ct.byref(process_ids), cb, ct.byref(bytes_returned))
- return sorted(list(set(process_ids)))
- else:
- csvlines = []
- current_pids = []
- for line in os.popen("tasklist.exe /fo csv /nh"):
- line = line.strip()
- if line:
- csvlines.append(line)
- for line in csv.reader(csvlines):
- current_pids.append(int(line[1]))
- if not csvlines:
- raise NotImplementedError("tasklist.exe not found (>WinXP)")
- return current_pids
-
-
-def _getCurrentPidDetails_win():
- """
- Returns processinfos. (pid, name and size_kb)
-
- On Windows-Systems, it uses ``tasklist.exe`` or WMI to list the processes.
- On other platforms, it reads the "/proc"-directory.
-
- :return: Dictionary with PID's as keys and the infos as values in an
- inner dictionary. (It is possible, that "size_kb" doesn't exist.):
- {
- 123: {"name": "bash", "pid": 123},
- 234: {"name": "mc", "pid": 234, "size_kb": 1000},
- }
- """
-
- retdict = {}
-
- # tasklist.exe runs on Windows XP and higher. (To parse the ouput of
- # tasklist.exe is faster than WMI.)
- import csv
- csvlines = []
- for line in os.popen("tasklist.exe /fo csv /nh"):
- line = line.strip()
- if line:
- csvlines.append(line)
- for line in csv.reader(csvlines):
- pid = int(line[1])
- details = {
- "name": line[0].decode("cp850"), # to unicode
- "pid": pid,
- }
- value = "".join(
- char for char in line[4]
- if char.isdigit()
- )
- details["size_kb"] = int(value)
- retdict[pid] = details
- if not csvlines:
- try:
- from win32com.client import GetObject
- # pywin32 is installed --> use WMI
- wmi = GetObject('winmgmts:')
- processes = wmi.InstancesOf('Win32_Process')
- for process in processes:
- pid = int(process.Properties_("ProcessId").value)
- details = {
- "name": process.Properties_("Name").value,
- "pid": pid,
- "size_kb": int(process.Properties_("WorkingSetSize").value) / 1000
- }
- retdict[pid] = details
- except ImportError:
- raise NotImplementedError("No tasklist.exe and no WMI.")
- return retdict
-
-# ****************************************************************************
-# general
-
-def unsupported():
- """Platform not supported
- """
- raise UnsupportedOsError(PLATFORM)
-
-def getCurrentPidNames():
- """Returns mapping pid -> name
- """
- mapping = {}
- for pid, details in getCurrentPidDetails().items():
- mapping[pid] = details.get('name', None)
- return mapping
-
-def _initialize():
- mod = sys.modules[__name__]
- for name in ('getCurrentPids','getCurrentPidDetails'):
- func = getattr(mod, "_%s_%s" % (name, PLATFORM), unsupported)
- #func.__name__ = name
- setattr(mod, name, func)
-_initialize()
-
-def main():
- """Testing"""
-
- from pprint import pformat
- # Current PIDs
- current_pids = getCurrentPids()
- print "Current PIDs: %s" % pformat(current_pids)
- print
- print "Current PID Count: %s" % len(current_pids)
- print
-
- print "Current PIDs with names: %s" % pformat(getCurrentPidNames())
- print
-
-if __name__ == "__main__":
- main()
-
+#!/usr/bin/env python
+# -*- coding: iso-8859-1 -*-
+"""Process info
+
+Based on http://gelb.bcom.at/trac/misc/browser/processinfo
+
+Modified and optimized by Christian Heimes
+"""
+
+import sys
+import os
+
+__all__ = ['PLATFORM', 'UnsupportedOsError', 'getCurrentPids',
+ 'getCurrentPidNames', 'getCurrentPidDetails']
+
+class UnsupportedOsError(NotImplementedError):
+ pass
+
+PLATFORM = None
+
+_plat = sys.platform.startswith
+if _plat('win'):
+ PLATFORM = 'win'
+ win32process = None
+ ctypes = None
+ import csv
+ try:
+ import win32process
+ except ImportError:
+ try:
+ import ctypes
+ except ImportError:
+ pass
+elif _plat('linux2') or _plat('cygwin'):
+ PLATFORM = 'linux' # Linux or Cygwin
+elif _plat('darwin'):
+ # XXX: unsupported
+ PLATFORM = 'mac' # Mac OS X
+elif _plat('freebsd') or _plat('netbsd') or _plat('openbsd'):
+ # XXX: unsupported
+ PLATFORM = 'bsd' # *BSD
+else:
+ PLATFORM = 'unknown'
+ raise UnsupportedOsError(sys.platform)
+
+# ****************************************************************************
+# Linux / cygwin implementation
+
+def _getCurrentPids_linux():
+ """
+ Returns current process-id's.
+
+ :return: list with process-id's.
+ """
+ pids = []
+ for fname in os.listdir("/proc"):
+ if os.path.isdir(os.path.join("/proc", fname)):
+ try:
+ pids.append(int(fname))
+ except ValueError:
+ continue
+ return pids
+
+def _getCurrentPidDetails_linux():
+ """Returns mapping pid -> detailed informations
+ """
+ mapping = {}
+ for pid in getCurrentPids():
+ try:
+ try:
+ # read entiry file to avoid race condition bugs
+ fd = open('/proc/%i/status' % pid, 'rb')
+ status = fd.read().split('\n')
+ finally:
+ if fd:
+ fd.close()
+ except IoError:
+ continue
+ details = {}
+ for line in status:
+ try:
+ key, value = line.split(':\t')
+ except ValueError:
+ continue
+ details[key.lower()] = value.strip()
+ mapping[pid] = details
+
+ return mapping
+
+def _getCurrentPidNames_linux():
+ """Returns mapping pid -> name
+ """
+ mapping = {}
+ for pid, details in getCurrentPidDetails().items():
+ mapping[pid] = details.get('name', None)
+ return mapping
+
+
+# ****************************************************************************
+# Windows / win32 implementaton
+
+def _getCurrentPids_win():
+ """
+ Returns current process-id's.
+
+ :return: List with process-id's.
+ """
+ if win32process is not None:
+ return list(win32process.EnumProcesses())
+ elif ctypes is not None:
+ # ctypes is installed --> try psapi.dll
+ psapi = ct.windll.psapi
+ arr = ct.c_long * 1024
+ process_ids = arr()
+ cb = ct.sizeof(process_ids)
+ bytes_returned = ct.c_ulong()
+ psapi.EnumProcesses(ct.byref(process_ids), cb, ct.byref(bytes_returned))
+ return sorted(list(set(process_ids)))
+ else:
+ csvlines = []
+ current_pids = []
+ for line in os.popen("tasklist.exe /fo csv /nh"):
+ line = line.strip()
+ if line:
+ csvlines.append(line)
+ for line in csv.reader(csvlines):
+ current_pids.append(int(line[1]))
+ if not csvlines:
+ raise NotImplementedError("tasklist.exe not found (>WinXP)")
+ return current_pids
+
+def _getCurrentPidNames_win():
+ """Returns mapping pid -> name
+ """
+ mapping = {}
+ for pid, details in getCurrentPidDetails().items():
+ mapping[pid] = details.get('name', None)
+ return mapping
+
+def _getCurrentPidDetails_win():
+ """
+ Returns processinfos. (pid, name and size_kb)
+ """
+ result = {}
+
+ # tasklist.exe runs on Windows XP and higher. (To parse the ouput of
+ # tasklist.exe is faster than WMI.)
+ csvlines = []
+ for line in os.popen("tasklist.exe /fo csv /nh"):
+ line = line.strip()
+ if line:
+ csvlines.append(line)
+ for line in csv.reader(csvlines):
+ pid = int(line[1])
+ details = {
+ "name": line[0].decode("cp850"), # to unicode
+ "pid": pid,
+ }
+ value = "".join(
+ char for char in line[4]
+ if char.isdigit()
+ )
+ details["size_kb"] = int(value)
+ retdict[pid] = details
+ if not csvlines:
+ try:
+ from win32com.client import GetObject
+ # pywin32 is installed --> use WMI
+ wmi = GetObject('winmgmts:')
+ processes = wmi.InstancesOf('Win32_Process')
+ for process in processes:
+ pid = int(process.Properties_("ProcessId").value)
+ details = {
+ "name": process.Properties_("Name").value,
+ "pid": pid,
+ "size_kb": int(process.Properties_("WorkingSetSize").value) / 1000
+ }
+ retdict[pid] = details
+ except ImportError:
+ raise NotImplementedError("No tasklist.exe and no WMI.")
+ return retdict
+
+# ****************************************************************************
+# general
+
+def unsupported():
+ """Platform not supported
+ """
+ raise UnsupportedOsError(PLATFORM)
+unsupported.__unsupported__ = True
+
+def _initialize():
+ mod = sys.modules[__name__]
+ for name in ('getCurrentPids','getCurrentPidDetails', 'getCurrentPidNames'):
+ func = getattr(mod, "_%s_%s" % (name, PLATFORM), None)
+ if func is None:
+ func = unsupported
+ else:
+ func.__name__ = name
+ setattr(mod, name, func)
+_initialize()
+
+def main():
+ """Testing"""
+
+ from pprint import pformat
+ # Current PIDs
+ current_pids = getCurrentPids()
+ print "Current PIDs: %s" % pformat(current_pids)
+ print
+ print "Current PID Count: %s" % len(current_pids)
+ print
+
+ print "Current PIDs with names: %s" % pformat(getCurrentPidNames())
+ print
+
+if __name__ == "__main__":
+ main()
+
Added: pymoul/trunk/src/moul/osdependent/tests/test_osdependent.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/tests/test_osdependent.py (rev 0)
+++ pymoul/trunk/src/moul/osdependent/tests/test_osdependent.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -0,0 +1,36 @@
+# pyMoul - Python interface to Myst Online URU Live
+# Copyright (C) 2007 Christian Heimes <christian (at) cheimes (dot) de>
+
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc., 59
+# Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+"""moul.osdependent.singleapp
+"""
+__author__ = "Christian Heimes"
+__version__ = "$Id$"
+__revision__ = "$Revision$"
+
+import unittest
+from doctest import DocTestSuite
+
+import moul.osdependent
+
+def test_suite():
+ return unittest.TestSuite((
+ DocTestSuite('moul.osdependent'),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest="test_suite")
+
Property changes on: pymoul/trunk/src/moul/osdependent/tests/test_osdependent.py
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: pymoul/trunk/src/moul/osdependent/win32/__init__.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/win32/__init__.py 2007-01-26 02:16:36 UTC (rev 79)
+++ pymoul/trunk/src/moul/osdependent/win32/__init__.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -22,29 +22,32 @@
__revision__ = "$Revision$"
import os
-#from moul.osdependent.win32.miniwinshell import my_documents
-#from moul.osdependent.win32.miniwinshell import application_data
-from moul.osdependent.win32.winpath import get_homedir
-from moul.osdependent.win32.winpath import get_appdata
+from moul.osdependent.win32.miniwinshell import my_documents as getMyDocuments
+from moul.osdependent.win32.miniwinshell import application_data as getAppdata
+#from moul.osdependent.win32.winpath import get_homedir as getMyDocuments
+#from moul.osdependent.win32.winpath import get_appdata as getAppdata
+from moul.osdependent.win32 import wmi
MOUL_DIR = "Uru Live"
EXEC_NAME = "UruLauncher.exe"
+mycomputer = None
+mydocs = getMyDocuments()
+myappdata = getAppdata()
+
def getMoulUserDataDir():
"""Get path of MOUL data directory
The MOUL data directory contains log files, chatlogs, KI images and many
more things.
"""
- mydoc = get_homedir()
- moul_data = os.path.join(mydoc, MOUL_DIR)
+ moul_data = os.path.join(mydocs, MOUL_DIR)
return moul_data
def getPyMoulDataDir():
"""Get path to the pyMoul ini file
"""
- app_data = get_appdata()
- inidir = os.path.join(app_data, 'pyMoul')
+ inidir = os.path.join(myappdata , 'pyMoul')
return inidir
def _startMOUL(installdir, *args):
@@ -57,3 +60,22 @@
path = os.path.join(installdir, EXEC_NAME)
args = (EXEC_NAME,) + args
return os.spawnv(mode, path, args)
+
+def getCurrentPids():
+ """Returns mapping pid -> name
+ """
+ global mycomputer
+ if mycomputer is None:
+ # XXX: extremely slow, also see getCurrentPidNames
+ mycomputer = wmi.WMI()
+ return [process.ProcessId for process in mycomputer.Win32_Process()]
+
+def getCurrentPidNames():
+ """Returns mapping pid -> name
+ """
+ global mycomputer
+ if mycomputer is None:
+ mycomputer = wmi.WMI()
+ return dict([(process.ProcessId, process.Name)
+ for process in mycomputer.Win32_Process()
+ ])
Added: pymoul/trunk/src/moul/osdependent/win32/wmi.py
===================================================================
--- pymoul/trunk/src/moul/osdependent/win32/wmi.py (rev 0)
+++ pymoul/trunk/src/moul/osdependent/win32/wmi.py 2007-01-26 14:17:50 UTC (rev 80)
@@ -0,0 +1,1246 @@
+##
+# wmi - a lightweight Python wrapper around Microsoft's WMI interface
+#
+# Windows Management Instrumentation (WMI) is Microsoft's answer to
+# the DMTF's Common Information Model. It allows you to query just
+# about any conceivable piece of information from any computer which
+# is running the necessary agent and over which have you the
+# necessary authority.
+#
+# The implementation is by means of COM/DCOM and most of the examples
+# assume you're running one of Microsoft's scripting technologies.
+# Fortunately, Mark Hammond's pywin32 has pretty much all you need
+# for a workable Python adaptation. I haven't tried any of the fancier
+# stuff like Async calls and so on, so I don't know if they'd work.
+#
+# Since the COM implementation doesn't give much away to Python
+# programmers, I've wrapped it in some lightweight classes with
+# some getattr / setattr magic to ease the way. In particular:
+#
+# <ul>
+# <li>
+# The _wmi_namespace object itself will determine its classes
+# and allow you to return all instances of any of them by
+# using its name as an attribute. As an additional shortcut,
+# you needn't specify the Win32_; if the first lookup fails
+# it will try again with a Win32_ on the front:
+#
+# <pre class="code">
+# disks = wmi.WMI ().Win32_LogicalDisk ()
+# </pre>
+#
+# In addition, you can specify what would become the WHERE clause
+# as keyword parameters:
+#
+# <pre class="code">
+# fixed_disks = wmi.WMI ().Win32_LogicalDisk (DriveType = 3)
+# </pre>
+# </li>
+#
+# <li>
+# The objects returned by a WMI lookup are wrapped in a Python
+# class which determines their methods and classes and allows
+# you to access them as though they were Python classes. The
+# methods only allow named parameters.
+#
+# <pre class="code">
+# for p in wmi.WMI ().Win32_Process ():
+# if p.Name.lower () == 'notepad.exe':
+# p.Terminate (Result=1)
+# </pre>
+# </li>
+#
+# <li>
+# Doing a print on one of the WMI objects will result in its
+# GetObjectText_ method being called, which usually produces
+# a meaningful printout of current values.
+# The repr of the object will include its full WMI path,
+# which lets you get directly to it if you need to.
+# </li>
+#
+# <li>
+# You can get the associators and references of an object as
+# a list of python objects by calling the associators () and
+# references () methods on a WMI Python object.
+# NB Don't do this on a Win32_ComputerSystem object; it will
+# take all day and kill your machine!
+#
+# <pre class="code">
+# for p in wmi.WMI ().Win32_Process ():
+# if p.Name.lower () == 'notepad.exe':
+# for r in p.references ():
+# print r.Name
+# </pre>
+# </li>
+#
+# <li>
+# WMI classes (as opposed to instances) are first-class
+# objects, so you can get hold of a class, and call
+# its methods or set up a watch against it.
+#
+# <pre class="code">
+# process = wmi.WMI ().Win32_Process
+# process.Create (CommandLine="notepad.exe")
+# </pre>
+#
+# </li>
+#
+# <li>
+# To make it easier to use in embedded systems and py2exe-style
+# executable wrappers, the module will not force early Dispatch.
+# To do this, it uses a handy hack by Thomas Heller for easy access
+# to constants.
+# </li>
+#
+# <li>
+# Typical usage will be:
+#
+# <pre class="code">
+# import wmi
+#
+# vodev1 = wmi.WMI ("vodev1")
+# for disk in vodev1.Win32_LogicalDisk ():
+# if disk.DriveType == 3:
+# space = 100 * long (disk.FreeSpace) / long (disk.Size)
+# print "%s has %d%% free" % (disk.Name, space)
+# </pre>
+# </li>
+#
+# </ul>
+#
+# Many thanks, obviously to Mark Hammond for creating the win32all
+# extensions, but also to Alex Martelli and Roger Upole, whose
+# c.l.py postings pointed me in the right direction.
+# Thanks especially in release 1.2 to Paul Tiemann for his code
+# contributions and robust testing.
+#
+# (c) Tim Golden - mail at timgolden.me.uk 5th June 2003
+# Licensed under the (GPL-compatible) MIT License:
+# http://www.opensource.org/licenses/mit-license.php
+#
+# For change history see CHANGELOG.TXT
+##
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
+
+try:
+ object
+except NameError:
+ class object: pass
+
+__VERSION__ = "1.2.1"
+
+_DEBUG = False
+
+import sys
+import re
+from win32com.client import GetObject, Dispatch
+import pywintypes
+
+class ProvideConstants (object):
+ """
+ A class which, when called on a win32com.client.Dispatch object,
+ provides lazy access to constants defined in the typelib.
+
+ They can be accessed as attributes of the _constants property.
+ From Thomas Heller on c.l.py
+ """
+ def __init__(self, comobj):
+ "@param comobj A COM object whose typelib constants are to be exposed"
+ comobj.__dict__["_constants"] = self
+ # Get the typelibrary's typecomp interface
+ self.__typecomp = \
+ comobj._oleobj_.GetTypeInfo().GetContainingTypeLib()[0].GetTypeComp()
+
+ def __getattr__(self, name):
+ if name.startswith("__") and name.endswith("__"):
+ raise AttributeError, name
+ result = self.__typecomp.Bind(name)
+ # Bind returns a 2-tuple, first item is TYPEKIND,
+ # the second item has the value
+ if not result[0]:
+ raise AttributeError, name
+ return result[1].value
+
+obj = GetObject ("winmgmts:")
+ProvideConstants (obj)
+
+wbemErrInvalidQuery = obj._constants.wbemErrInvalidQuery
+wbemErrTimedout = obj._constants.wbemErrTimedout
+wbemFlagReturnImmediately = obj._constants.wbemFlagReturnImmediately
+wbemFlagForwardOnly = obj._constants.wbemFlagForwardOnly
+
+def handle_com_error (error_info):
+ """Convenience wrapper for displaying all manner of COM errors.
+ Raises a x_wmi exception with more useful information attached
+
+ @param error_info The structure attached to a pywintypes.com_error
+ """
+ hresult_code, hresult_name, additional_info, parameter_in_error = error_info
+ exception_string = ["%s - %s" % (hex (hresult_code), hresult_name)]
+ if additional_info:
+ wcode, source_of_error, error_description, whlp_file, whlp_context, scode = additional_info
+ exception_string.append (" Error in: %s" % source_of_error)
+ exception_string.append (" %s - %s" % (hex (scode), error_description.strip ()))
+ raise x_wmi, "\n".join (exception_string)
+
+def from_time (year=None, month=None, day=None, hours=None, minutes=None, seconds=None, microseconds=None, timezone=None):
+ """
+ Convenience wrapper to take a series of date/time elements and return a WMI time
+ of the form yyyymmddHHMMSS.mmmmmm+UUU. All elements may be int, string or
+ omitted altogether. If omitted, they will be replaced in the output string
+ by a series of stars of the appropriate length.
+
+ @param year The year element of the date/time
+ @param month The month element of the date/time
+ @param day The day element of the date/time
+ @param hours The hours element of the date/time
+ @param minutes The minutes element of the date/time
+ @param seconds The seconds element of the date/time
+ @param microseconds The microseconds element of the date/time
+ @param timezone The timeezone element of the date/time
+
+ @return A WMI datetime string of the form: yyyymmddHHMMSS.mmmmmm+UUU
+ """
+ def str_or_stars (i, length):
+ if i is None:
+ return "*" * length
+ else:
+ return str (i).rjust (length, "0")
+
+ wmi_time = ""
+ wmi_time += str_or_stars (year, 4)
+ wmi_time += str_or_stars (month, 2)
+ wmi_time += str_or_stars (day, 2)
+ wmi_time += str_or_stars (hours, 2)
+ wmi_time += str_or_stars (minutes, 2)
+ wmi_time += str_or_stars (seconds, 2)
+ wmi_time += "."
+ wmi_time += str_or_stars (microseconds, 6)
+ wmi_time += str_or_stars (timezone, 4)
+
+ return wmi_time
+
+def to_time (wmi_time):
+ """
+ Convenience wrapper to take a WMI datetime string of the form
+ yyyymmddHHMMSS.mmmmmm+UUU and return a 9-tuple containing the
+ individual elements, or None where string contains placeholder
+ stars.
+
+ @param wmi_time The WMI datetime string in yyyymmddHHMMSS.mmmmmm+UUU format
+
+ @return A 9-tuple of (year, month, day, hours, minutes, seconds, microseconds, timezone)
+ """
+ def int_or_none (s, start, end):
+ try:
+ return int (s[start:end])
+ except ValueError:
+ return None
+
+ year = int_or_none (wmi_time, 0, 4)
+ month = int_or_none (wmi_time, 4, 6)
+ day = int_or_none (wmi_time, 6, 8)
+ hours = int_or_none (wmi_time, 8, 10)
+ minutes = int_or_none (wmi_time, 10, 12)
+ seconds = int_or_none (wmi_time, 12, 14)
+ microseconds = int_or_none (wmi_time, 15, 21)
+ timezone = wmi_time[21:]
+
+ return year, month, day, hours, minutes, seconds, microseconds, timezone
+
+#
+# Exceptions
+#
+class x_wmi (Exception):
+ pass
+
+class x_wmi_invalid_query (x_wmi):
+ pass
+
+class x_wmi_timed_out (x_wmi):
+ pass
+
+class x_wmi_no_namespace (x_wmi):
+ pass
+
+WMI_EXCEPTIONS = {
+ wbemErrInvalidQuery : x_wmi_invalid_query,
+ wbemErrTimedout : x_wmi_timed_out
+}
+
+def _set (obj, attribute, value):
+ """
+ Helper function to add an attribute directly into the instance
+ dictionary, bypassing possible __getattr__ calls
+
+ @param obj Any python object
+ @param attribute String containing attribute name
+ @param value Any python object
+ """
+ obj.__dict__[attribute] = value
+
+class _wmi_method:
+ """
+ A currying sort of wrapper around a WMI method name. It
+ abstract's the method's parameters and can be called like
+ a normal Python object passing in the parameter values.
+
+ Output parameters are returned from the call as a tuple.
+ In addition, the docstring is set up as the method's
+ signature, including an indication as to whether any
+ given parameter is expecting an array, and what
+ special privileges are required to call the method.
+ """
+
+ def __init__ (self, ole_object, method_name):
+ """
+ @param ole_object The WMI class/instance whose method is to be called
+ @param method_name The name of the method to be called
+ """
+ try:
+ self.ole_object = Dispatch (ole_object)
+ self.method = ole_object.Methods_ (method_name)
+ self.qualifiers = {}
+ for q in self.method.Qualifiers_:
+ self.qualifiers[q.Name] = q.Value
+ self.provenance = "\n".join (self.qualifiers.get ("MappingStrings", []))
+
+ self.in_parameters = self.method.InParameters
+ self.out_parameters = self.method.OutParameters
+ if self.in_parameters is None:
+ self.in_parameter_names = []
+ else:
+ self.in_parameter_names = [(i.Name, i.IsArray) for i in self.in_parameters.Properties_]
+ if self.out_parameters is None:
+ self.out_parameter_names = []
+ else:
+ self.out_parameter_names = [(i.Name, i.IsArray) for i in self.out_parameters.Properties_]
+
+ doc = "%s (%s) => (%s)" % (
+ method_name,
+ ", ".join ([name + ("", "[]")[is_array] for (name, is_array) in self.in_parameter_names]),
+ ", ".join ([name + ("", "[]")[is_array] for (name, is_array) in self.out_parameter_names])
+ )
+ privileges = self.qualifiers.get ("Privileges", [])
+ if privileges:
+ doc += " | Needs: " + ", ".join (privileges)
+ self.__doc__ = doc
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __call__ (self, **kwargs):
+ """
+ Execute the call to a WMI method, returning
+ a tuple (even if is of only one value) containing
+ the out and return parameters.
+ """
+ try:
+ if self.in_parameters:
+ parameter_names = {}
+ for name, is_array in self.in_parameter_names:
+ parameter_names[name] = is_array
+
+ parameters = self.in_parameters
+ for k, v in kwargs.items ():
+ is_array = parameter_names.get (k)
+ if is_array is None:
+ raise AttributeError, "%s is not a valid parameter for %s" % (k, self.__doc__)
+ else:
+ if is_array:
+ try: list (v)
+ except TypeError: raise TypeError, "%s must be iterable" % k
+
+ parameters.Properties_ (k).Value = v
+
+ result = self.ole_object.ExecMethod_ (self.method.Name, self.in_parameters)
+ else:
+ result = self.ole_object.ExecMethod_ (self.method.Name)
+
+ results = []
+ for name, is_array in self.out_parameter_names:
+ value = result.Properties_ (name).Value
+ if is_array:
+ #
+ # Thanks to Jonas Bjering for bug report and path
+ #
+ results.append (list (value or []))
+ else:
+ results.append (value)
+ return tuple (results)
+
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __repr__ (self):
+ return "<function %s>" % self.__doc__
+
+#
+# class _wmi_object
+#
+class _wmi_object:
+ "A lightweight wrapper round an OLE WMI object"
+
+ def __init__ (self, ole_object, instance_of=None, fields=[]):
+ try:
+ _set (self, "ole_object", ole_object)
+ _set (self, "_instance_of", instance_of)
+ _set (self, "properties", {})
+ _set (self, "methods", {})
+
+ if self._instance_of:
+ if fields:
+ for field in fields:
+ self.properties[field] = None
+ else:
+ _set (self, "properties", self._instance_of.properties.copy ())
+ _set (self, "methods", self._instance_of.methods)
+ else:
+ for p in ole_object.Properties_:
+ self.properties[p.Name] = None
+ for m in ole_object.Methods_:
+ self.methods[m.Name] = None
+
+ _set (self, "_properties", self.properties.keys ())
+ _set (self, "_methods", self.methods.keys ())
+
+ _set (self, "qualifiers", {})
+ for q in self.ole_object.Qualifiers_:
+ self.qualifiers[q.Name] = q.Value
+ _set (self, "is_association", self.qualifiers.has_key ("Association"))
+
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __str__ (self):
+ """
+ For a call to print [object] return the OLE description
+ of the properties / values of the object
+ """
+ try:
+ return self.ole_object.GetObjectText_ ()
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __repr__ (self):
+ """
+ Indicate both the fact that this is a wrapped WMI object
+ and the WMI object's own identifying class.
+ """
+ try:
+ return "<%s: %s>" % (self.__class__.__name__, str (self.Path_.Path))
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def _cached_properties (self, attribute):
+ if self.properties[attribute] is None:
+ self.properties[attribute] = self.ole_object.Properties_ (attribute)
+ return self.properties[attribute]
+
+ def _cached_methods (self, attribute):
+ if self.methods[attribute] is None:
+ self.methods[attribute] = _wmi_method (self.ole_object, attribute)
+ return self.methods[attribute]
+
+ def __getattr__ (self, attribute):
+ """
+ Attempt to pass attribute calls to the proxied COM object.
+ If the attribute is recognised as a property, return its value;
+ if it is recognised as a method, return a method wrapper which
+ can then be called with parameters; otherwise pass the lookup
+ on to the underlying object.
+ """
+ try:
+ if self.properties.has_key (attribute):
+ value = self._cached_properties (attribute).Value
+ #
+ # If this is an association, its properties are
+ # actually the paths to the two aspects of the
+ # association, so translate them automatically
+ # into WMI objects.
+ #
+ if self.is_association:
+ return WMI (moniker=value)
+ else:
+ return value
+ elif self.methods.has_key (attribute):
+ return self._cached_methods (attribute)
+ else:
+ return getattr (self.ole_object, attribute)
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __setattr__ (self, attribute, value):
+ """
+ If the attribute to be set is valid for the proxied
+ COM object, set that objects's parameter value; if not,
+ raise an exception.
+ """
+ try:
+ if self.properties.has_key (attribute):
+ self._cached_properties (attribute).Value = value
+ if self.ole_object.Path_.Path:
+ self.ole_object.Put_ ()
+ else:
+ raise AttributeError, attribute
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __eq__ (self, other):
+ """
+ Use WMI's CompareTo_ to compare this object with
+ another. Don't try to do anything if the other
+ object is not a wmi object. It might be possible
+ to compare this object's unique key with a string
+ or something, but this doesn't seem to be universal
+ enough to merit a special case.
+ """
+ if isinstance (other, self.__class__):
+ return self.ole_object.CompareTo_ (other.ole_object)
+ else:
+ raise x_wmi, "Can't compare a WMI object with something else"
+
+ def put (self):
+ self.ole_object.Put_ ()
+
+ def set (self, **kwargs):
+ """
+ Set several properties of the underlying object
+ at one go. This is particularly useful in combination
+ with the new () method below. However, an instance
+ which has been spawned in this way won't have enough
+ information to write pack, so only try if the
+ instance has a path.
+ """
+ if kwargs:
+ try:
+ for attribute, value in kwargs.items ():
+ if self.properties.has_key (attribute):
+ self._cached_properties (attribute).Value = value
+ else:
+ raise AttributeError, attribute
+ #
+ # Only try to write the attributes
+ # back if the object exists.
+ #
+ if self.ole_object.Path_.Path:
+ self.ole_object.Put_ ()
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def path (self):
+ """
+ Return the WMI URI to this object. Can be used to
+ determine the path relative to the parent namespace. eg,
+
+ <pre class="code">
+ pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
+ print pp0.path ().RelPath
+ </pre>
+ """
+ try:
+ return self.ole_object.Path_
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def derivation (self):
+ """Return a tuple representing the object derivation for
+ this object, with the most specific object first. eg,
+
+ pp0 = wmi.WMI ().Win32_ParallelPort ()[0]
+ print ' <- '.join (pp0.derivation ())
+ """
+ try:
+ return self.ole_object.Derivation_
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def associators (self, wmi_association_class="", wmi_result_class=""):
+ """Return a list of objects related to this one, optionally limited
+ either by association class (ie the name of the class which relates
+ them) or by result class (ie the name of the class which would be
+ retrieved)
+
+ <pre class="code">
+c = wmi.WMI ()
+pp = c.Win32_ParallelPort ()[0]
+
+for i in pp.associators (wmi_association_class="Win32_PortResource"):
+ print i
+
+for i in pp.associators (wmi_result_class="Win32_PnPEntity"):
+ print i
+ </pre>
+ """
+ try:
+ return [
+ _wmi_object (i) for i in \
+ self.ole_object.Associators_ (
+ strAssocClass=wmi_association_class,
+ strResultClass=wmi_result_class
+ )
+ ]
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def references (self, wmi_class=""):
+ """Return a list of associations involving this object, optionally
+ limited by the result class (the name of the association class).
+
+ NB Associations are treated specially; although WMI only returns
+ the string corresponding to the instance of each associated object,
+ this module will automatically convert that to the object itself.
+
+ <pre class="code">
+ c = wmi.WMI ()
+ sp = c.Win32_SerialPort ()[0]
+
+ for i in sp.references ():
+ print i
+
+ for i in sp.references (wmi_class="Win32_SerialPortSetting"):
+ print i
+ </pre>
+ """
+ try:
+ return [_wmi_object (i) for i in self.ole_object.References_ (strResultClass=wmi_class)]
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+#
+# class _wmi_class
+#
+class _wmi_class (_wmi_object):
+ """Currying class to assist in issuing queries against
+ a WMI namespace. The idea is that when someone issues
+ an otherwise unknown method against the WMI object, if
+ it matches a known WMI class a query object will be
+ returned which may then be called with one or more params
+ which will form the WHERE clause. eg,
+
+ <pre class="code">
+ c = wmi.WMI ()
+ c_drive = c.Win32_LogicalDisk (Name='C:')
+ </pre>
+ """
+ def __init__ (self, namespace, wmi_class):
+ _wmi_object.__init__ (self, wmi_class)
+ _set (self, "_namespace", namespace)
+ _set (self, "_class_name", wmi_class.Path_.Class)
+
+ def query (self, fields=[], **where_clause):
+ """Make it slightly easier to query against the class,
+ by calling the namespace's query with the class preset.
+ Won't work if the class has been instantiated directly.
+ """
+ if self._namespace is None:
+ raise x_wmi_no_namespace, "You cannot query directly from a WMI class"
+
+ try:
+ field_list = ", ".join (fields) or "*"
+ wql = "SELECT " + field_list + " FROM " + self._class_name
+ if where_clause:
+ wql += " WHERE " + " AND ". join (["%s = '%s'" % (k, v) for k, v in where_clause.items ()])
+ return self._namespace.query (wql, self, fields)
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ __call__ = query
+
+ def watch_for (
+ self,
+ notification_type=None,
+ delay_secs=1,
+ **where_clause
+ ):
+ if self._namespace is None:
+ raise x_wmi_no_namespace, "You cannot watch directly from a WMI class"
+
+ return self._namespace.watch_for (
+ notification_type=notification_type,
+ wmi_class=self._class_name,
+ delay_secs=delay_secs,
+ **where_clause
+ )
+
+ def instances (self):
+ """Return a list of instances of the WMI class
+ """
+ try:
+ return [_wmi_object (instance, self) for instance in self.Instances_ ()]
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def new (self, **kwargs):
+ """This is the equivalent to the raw-WMI SpawnInstance_
+ method. Note that there are relatively few uses for
+ this, certainly fewer than you might imagine. Most
+ classes which need to create a new *real* instance
+ of themselves, eg Win32_Process, offer a .Create
+ method. SpawnInstance_ is generally reserved for
+ instances which are passed as parameters to such
+ .Create methods, a common example being the
+ Win32_SecurityDescriptor, passed to Win32_Share.Create
+ and other instances which need security.
+
+ The example here is Win32_ProcessStartup, which
+ controls the shown/hidden state etc. of a new
+ Win32_Process instance.
+
+ <pre class="code">
+ import win32con
+ import wmi
+ c = wmi.WMI ()
+ startup = c.Win32_ProcessStartup.new (ShowWindow=win32con.SW_SHOWMINIMIZED)
+ pid, retval = c.Win32_Process.Create (
+ CommandLine="notepad.exe",
+ ProcessStartupInformation=startup
+ )
+ </pre>
+
+ NB previous versions of this module, used this function
+ to create new process. This is *not* a good example
+ of its use; it is better handled with something like
+ the example above.
+ """
+ try:
+ obj = _wmi_object (self.SpawnInstance_ (), self)
+ obj.set (**kwargs)
+ return obj
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+#
+# class _wmi_result
+#
+class _wmi_result:
+ """Simple, data only result for targeted WMI queries which request
+ data only result classes via fetch_as_classes.
+ """
+ def __init__(self, obj, attributes):
+ if attributes:
+ for attr in attributes:
+ self.__dict__[attr] = obj.Properties_ (attr).Value
+ else:
+ for p in obj.Properties_:
+ attr = p.Name
+ self.__dict__[attr] = obj.Properties_(attr).Value
+
+#
+# class WMI
+#
+class _wmi_namespace:
+ """A WMI root of a computer system. The classes attribute holds a list
+ of the classes on offer. This means you can explore a bit with
+ things like this:
+
+ <pre class="code">
+ c = wmi.WMI ()
+ for i in c.classes:
+ if "user" in i.lower ():
+ print i
+ </pre>
+ """
+ def __init__ (self, namespace, find_classes):
+ _set (self, "_namespace", namespace)
+ #
+ # wmi attribute preserved for backwards compatibility
+ #
+ _set (self, "wmi", namespace)
+
+ # Initialise the "classes" attribute, to avoid infinite recursion in the
+ # __getattr__ method (which uses it).
+ self.classes = {}
+ #
+ # Pick up the list of classes under this namespace
+ # so that they can be queried, and used as though
+ # properties of the namespace by means of the __getattr__
+ # hook below.
+ # If the namespace does not support SubclassesOf, carry on
+ # regardless
+ #
+ if find_classes:
+ try:
+ self.classes.update (self.subclasses_of ())
+ except AttributeError:
+ pass
+
+ def __repr__ (self):
+ return "<_wmi_namespace: %s>" % self.wmi
+
+ def __str__ (self):
+ return repr (self)
+
+ def get (self, moniker):
+ try:
+ return _wmi_object (self.wmi.Get (moniker))
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def handle (self):
+ """The raw OLE object representing the WMI namespace"""
+ return self._namespace
+
+ def subclasses_of (self, root="", regex=r".*"):
+ classes = {}
+ for c in self._namespace.SubclassesOf (root):
+ klass = c.Path_.Class
+ if re.match (regex, klass):
+ classes[klass] = None
+ return classes
+
+ def instances (self, class_name):
+ """Return a list of instances of the WMI class. This is
+ (probably) equivalent to querying with no qualifiers.
+
+ <pre class="code">
+ system.instances ("Win32_LogicalDisk")
+ # should be the same as
+ system.Win32_LogicalDisk ()
+ </pre>
+ """
+ try:
+ return [_wmi_object (obj) for obj in self._namespace.InstancesOf (class_name)]
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def new (self, wmi_class, **kwargs):
+ """This is now implemented by a call to _wmi_namespace.new (qv)"""
+ return getattr (self, wmi_class).new (**kwargs)
+
+ new_instance_of = new
+
+ def _raw_query (self, wql):
+ """Execute a WQL query and return its raw results. Use the flags
+ recommended by Microsoft to achieve a read-only, semi-synchronous
+ query where the time is taken while looping through. Should really
+ be a generator, but ...
+ NB Backslashes need to be doubled up.
+ """
+ flags = wbemFlagReturnImmediately | wbemFlagForwardOnly
+ wql = wql.replace ("\\", "\\\\")
+ if _DEBUG: print "_raw_query(wql):", wql
+ try:
+ return self._namespace.ExecQuery (strQuery=wql, iFlags=flags)
+ except pywintypes.com_error, (hresult, hresult_text, additional, param_in_error):
+ raise WMI_EXCEPTIONS.get (hresult, x_wmi (hresult))
+
+ def query (self, wql, instance_of=None, fields=[]):
+ """Perform an arbitrary query against a WMI object, and return
+ a list of _wmi_object representations of the results.
+ """
+ return [ _wmi_object (obj, instance_of, fields) for obj in self._raw_query(wql) ]
+
+ def fetch_as_classes (self, wmi_classname, fields=(), **where_clause):
+ """Build and execute a wql query to fetch the specified list of fields from
+ the specified wmi_classname + where_clause, then return the results as
+ a list of simple class instances with attributes matching fields_list.
+
+ If fields is left empty, select * and pre-load all class attributes for
+ each class returned.
+ """
+ wql = "SELECT %s FROM %s" % (fields and ", ".join (fields) or "*", wmi_classname)
+ if where_clause:
+ wql += " WHERE " + " AND ".join (["%s = '%s'" % (k, v) for k, v in where_clause.items()])
+ return [_wmi_result (obj, fields) for obj in self._raw_query(wql)]
+
+ def fetch_as_lists (self, wmi_classname, fields, **where_clause):
+ """Build and execute a wql query to fetch the specified list of fields from
+ the specified wmi_classname + where_clause, then return the results as
+ a list of lists whose values correspond fields_list.
+ """
+ wql = "SELECT %s FROM %s" % (", ".join (fields), wmi_classname)
+ if where_clause:
+ wql += " WHERE " + " AND ".join (["%s = '%s'" % (k, v) for k, v in where_clause.items()])
+ results = []
+ for obj in self._raw_query(wql):
+ results.append ([obj.Properties_ (field).Value for field in fields])
+ return results
+
+ def watch_for (
+ self,
+ raw_wql=None,
+ notification_type=None,
+ wmi_class=None,
+ delay_secs=1,
+ **where_clause
+ ):
+ """Set up an event tracker on a WMI event. This function
+ returns an wmi_watcher which can be called to get the
+ next event. eg,
+
+ <pre class="code">
+ c = wmi.WMI ()
+
+ raw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA 'Win32_Process'"
+ watcher = c.watch_for (raw_wql=raw_wql)
+ while 1:
+ process_created = watcher ()
+ print process_created.Name
+
+ # or
+
+ watcher = c.watch_for (
+ notification_type="Creation",
+ wmi_class="Win32_Process",
+ delay_secs=2,
+ Name='calc.exe'
+ )
+ calc_created = watcher ()
+ </pre>
+
+ Now supports timeout on the call to watcher, eg:
+
+ <pre class="code">
+ import pythoncom
+ import wmi
+ c = wmi.WMI (privileges=["Security"])
+ watcher1 = c.watch_for (
+ notification_type="Creation",
+ wmi_class="Win32_NTLogEvent",
+ Type="error"
+ )
+ watcher2 = c.watch_for (
+ notification_type="Creation",
+ wmi_class="Win32_NTLogEvent",
+ Type="warning"
+ )
+
+ while 1:
+ try:
+ error_log = watcher1 (500)
+ except wmi.x_wmi_timed_out:
+ pythoncom.PumpWaitingMessages ()
+ else:
+ print error_log
+
+ try:
+ warning_log = watcher2 (500)
+ except wmi.x_wmi_timed_out:
+ pythoncom.PumpWaitingMessages ()
+ else:
+ print warning_log
+ </pre>
+ """
+ class_name = wmi_class
+ if raw_wql:
+ wql = raw_wql
+ else:
+ if where_clause:
+ where = " AND " + " AND ".join (["TargetInstance.%s = '%s'" % (k, v) for k, v in where_clause.items ()])
+ else:
+ where = ""
+ wql = \
+ "SELECT * FROM __Instance%sEvent WITHIN %d WHERE TargetInstance ISA '%s' %s" % \
+ (notification_type, delay_secs, class_name, where)
+
+ if _DEBUG: print wql
+
+ try:
+ return _wmi_watcher (self._namespace.ExecNotificationQuery (wql))
+ except pywintypes.com_error, error_info:
+ handle_com_error (error_info)
+
+ def __getattr__ (self, attribute):
+ """Offer WMI classes as simple attributes. Pass through any untrapped
+ unattribute to the underlying OLE object. This means that new or
+ unmapped functionality is still available to the module user.
+ """
+ #
+ # Don't try to match against known classes as was previously
+ # done since the list may not have been requested
+ # (find_classes=False).
+ #
+ try:
+ return self._cached_classes (attribute)
+ except pywintypes.com_error, error_i...
[truncated message content] |