[Pydev-cvs] org.python.pydev.debug/pysrc jpydaemon.py,NONE,1.1 inspector.py,NONE,1.1
Brought to you by:
fabioz
From: Aleksandar T. <at...@us...> - 2004-03-29 17:17:19
|
Update of /cvsroot/pydev/org.python.pydev.debug/pysrc In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv32273/pysrc Added Files: jpydaemon.py inspector.py Log Message: jpydbg v0.5 --- NEW FILE: jpydaemon.py --- #! /usr/bin/env python """ this daemon may be used by 'external' python sollicitors interfaces""" import sys import bdb import socket import string import traceback import os import inspect import types from inspector import * HOST = '' PORT = 29000 # default listening port OK = "OK" COMMAND = 0 SET_BP = 2 DEBUG = 31 STEP = 4 NEXT = 5 RUN = 6 FREEZE = 7 # remain on current line CLEAR_BP = 8 STACK = 9 QUIT = 10 LOCALS = 11 GLOBALS = 12 SETARGS = 13 READSRC = 14 UNKNOWN = -1 # instanciate a jpyutil object utils = jpyutils() class JPyDbg(bdb.Bdb) : def __init__(self): bdb.Bdb.__init__(self) # store debugger script name to avoid debugging it's own frame self.debuggerFName = os.path.normcase(sys.argv[0]) print self.debuggerFName # client debugger connection self.connection = None # frame debuggee contexts self.globalContext = None self.localContext = None self.verbose = 0 # hide is used to prevent debug inside debugger self.hide = 0 # debugger active information self.debuggee = None self.cmd = UNKNOWN # net buffer content self.lastBuffer = "" # EXCEPTION raised flag self.exceptionRaised = 0 # debuggee current 'command line arguments' self.debuggeeArgs = None # last executed line exception or None self.lastLineException = None def populateToClient( self , bufferList ) : buffer = '<JPY>' for element in bufferList: buffer = buffer + ' ' + str(element) buffer = buffer + '</JPY>\n' #print buffer self.connection.send( buffer ) # bdb overwitten to capture call debug event def user_call(self, frame, args): name = frame.f_code.co_name if not name: name = '???' fn = self.canonic(frame.f_code.co_filename) if not fn: fn = '???' # discard debugger frame if fn == self.debuggerFName or self.hide: self.hide = self.hide + 1 if self.hide: return None self.populateToClient( [ '<CALL', 'cmd="'+str(self.cmd)+'"' , 'fn="'+ utils.removeForXml(fn) +'"' , 'name="'+name+'"', 'args="'+str(args)+'"' , '/>' ] ) def checkDbgAction( self , frame ): if ( self.cmd == DEBUG ) or ( self.cmd == STEP ) or ( self.cmd == NEXT ) or ( self.cmd == RUN ): # DEBUG STARTING event # Debuggin starts stop on first line wait for NEXT , STEP , RUN , STOP .... while ( self.parseSubCommand( self.receiveCommand() , frame ) == FREEZE ): pass # bdb overwitten to capture line debug event def user_line(self, frame): if self.hide: return None import linecache name = frame.f_code.co_name if not name: name = '???' fn = self.canonic(frame.f_code.co_filename) if not fn: fn = '???' # populate info to client side line = linecache.getline(fn, frame.f_lineno) self.populateToClient( [ '<LINE', 'cmd="'+str(self.cmd)+'"' , 'fn="'+ utils.removeForXml(fn)+'"' , 'lineno="'+str(frame.f_lineno)+'"' , 'name="' + name + '"' , 'line="' + utils.removeForXml(line.strip())+'"', '/>'] ) # what's on next self.checkDbgAction( frame ) # bdb overwitten to capture return debug event def user_return(self, frame, retval): fn = self.canonic(frame.f_code.co_filename) if not fn: fn = '???' if self.hide: self.hide = self.hide - 1 return None self.populateToClient( [ '<RETURN', 'cmd="'+str(self.cmd)+'"' , 'fn="'+utils.removeForXml(fn)+'"' , 'retval="'+str(retval)+'"' , '/>'] ) def send_client_exception( self , cmd , content ): self.populateToClient( ['<EXCEPTION', 'cmd="'+cmd+'"' , 'content="'+content+'"' , '/>'] ) def populate_exception( self , exc_stuff): if ( self.exceptionRaised == 0 ): # exception not yet processed extype = exc_stuff[0] details = exc_stuff[1] ex = exc_stuff # Deal With SystemExit in specific way to reflect debuggee's return if issubclass( extype , SystemExit): content = 'System Exit REQUESTED BY DEBUGGEE with code =' + ex.code elif issubclass(extype, SyntaxError): content = str(details) error = details[0] compd = details[1] content = 'SOURCE:SYNTAXERROR:"'+str(compd[0])+'":('+str(compd[1])+','+str(compd[2])+')'+':'+error elif issubclass(extype,NameError): content = 'SOURCE:NAMEERROR:'+str(details) elif issubclass(extype,ImportError): content = 'SOURCE::IMPORTERROR:'+str(details) else: content = str(details) # keep track of received exception self.lastLineException = ['<EXCEPTION', 'cmd="'+str(self.cmd)+'"' , 'content="'+utils.removeForXml(content)+'"' , '/>'] self.send_client_exception( str(self.cmd) , utils.removeForXml(content) ) self.exceptionRaised = 1 # set ExceptionFlag On # bdb overwitten to capture Exception events def user_exception(self, frame, exc_stuff): # capture next / step go ahead when exception is around # current steatement while steping if self.cmd==NEXT or self.cmd==STEP: # self.populate_exception( exc_stuff ) self.set_step() sys.settrace(self.trace_dispatch) else: self.populate_exception( exc_stuff ) self.set_continue() def parsedReturned( self , command = 'COMMAND' , argument = None , message = None , details = None ): parsedCommand = [] parsedCommand.append(command) parsedCommand.append(argument) parsedCommand.append(message) parsedCommand.append(details) return parsedCommand # acting as stdio => redirect to client side def write( self , toPrint ): # transform eol pattern if ( toPrint == "\n" ): toPrint = "/EOL/" self.populateToClient( ['<STDOUT' , 'content="'+utils.removeForXml(toPrint)+'"' , '/>' ] ) def buildEvalArguments( self , arg ): posEqual = arg.find('=') if posEqual == -1: return None,None # Syntax error on provided expession couple return arg[:posEqual].strip() , arg[posEqual+1:].strip() # # parse & execute buffer command # def dealWithCmd( self , verb , arg , myGlobals = globals() , myLocals = locals() ): cmd = COMMAND msgOK = OK cmdType = "single" silent , silentarg = self.commandSyntax( arg ) if silent == 'silent': arg = silentarg # consume # "exec" is the magic way which makes # used debuggees dictionaries updatable while # stopped in debugging hooks cmdType = "exec" msgOK = silent # we use ';' as a python multiline syntaxic separator arg = string.replace(arg,';','\n') # execute requested dynamic command on this side try: oldstd = sys.stdout sys.stdout=self code = compile( arg ,"<string>" , cmdType) exec code in myGlobals , myLocals sys.stdout=oldstd return utils.parsedReturned( argument = arg , message = msgOK ) except: try: return utils.populateCMDException(arg,oldstd) except: tb , exctype , value = sys.exc_info() excTrace = traceback.format_exception( tb , exctype , value ) print excTrace # # build an xml CDATA structure # usage of plus is for jpysource.py xml CDATA encapsulation of itself # def CDATAForXml( self , data ): return '<'+'![CDATA['+ data + ']'+']>' # # parse & execute buffer command # def dealWithRead( self , verb , arg ): cmd = READSRC # check python code and send back any found syntax error if arg == None: return utils.parsedReturned( message = "JPyDaemon ReadSrc Argument missing") try: arg , lineno = self.nextArg(arg) candidate = file(arg) myBuffer = utils.parsedReturned( argument = arg , message=OK ) # # append the python source in <FILEREAD> TAG myBuffer.append( ['<FILEREAD' , 'fn="'+arg+'"' , 'lineno="'+str(lineno)+'">' + self.CDATAForXml(candidate.read()) + '</FILEREAD>' ] ) return myBuffer except IOError, e: return utils.parsedReturned( argument = arg , message = e.strerror ) # # parse & execute buffer command # def dealWithSetArgs( self , arg ): cmd = SETARGS # populate given command line argument before debugging start if arg == None: self.debuggeeArgs = [] # nor args provided else: self.debuggeeArgs = string.split(arg) self.debuggeeArgs.insert(0,'') # keep program name empty for the moment sys.argv = self.debuggeeArgs # store new argument list ins sys argv return utils.parsedReturned( argument = arg , message = OK ) # load the candidate source to debug # Run under debugger control def dealWithDebug( self , verb , arg ): self.cmd = DEBUG if self.debuggee == None: result = "source not found : " + arg for dirname in sys.path: fullname = os.path.join(dirname,arg) if os.path.exists(fullname): # Insert script directory in front of module search path # and make it current path (#sourceforge REQID 88108 fix) debugPath = os.path.dirname(fullname) sys.path.insert(0, debugPath) os.chdir(debugPath) oldstd = sys.stdout sys.stdout=self self.debuggee = fullname sys.argv[0] = fullname # keep sys.argv in sync try: self.run('execfile(' + `fullname` + ')') # send a dedicated message for syntax error in order for the # frontend debugger to handle a specific message and display the involved line # inside the frontend editor except: tb , exctype , value = sys.exc_info() excTrace = traceback.format_exception( tb , exctype , value ) # self.populateException(excTrace) self.send_client_exception(str(self.cmd) , utils.removeForXml(str(excTrace))) #print excTrace pass sys.stdout=oldstd result ="OK" self.debuggee = None break else: result = "debug already in progress on : " + self.debuggee return utils.parsedReturned( command = 'DEBUG' , argument = arg , message = result ) def formatStackElement( self , element ): curCode = element[0].f_code fName = curCode.co_filename line = element[1] if ( fName == '<string>' ): return ("program entry point") return utils.removeForXml(fName + ' (' + str(line) + ') ') # populate current stack info to client side def dealWithStack( self , frame ): stackList , size = self.get_stack ( frame , None ) stackList.reverse() xmlStack = ['<STACKLIST>' ] for stackElement in stackList: xmlStack.append('<STACK') xmlStack.append('content="'+ self.formatStackElement(stackElement) +'"') xmlStack.append( '/>') xmlStack.append('</STACKLIST>') self.populateToClient( xmlStack ) # populate requested disctionary to client side def dealWithVariables( self , frame , type , stackIndex ): # get the stack frame first stackList , size = self.get_stack ( frame , None ) stackList.reverse() stackElement = stackList[int(stackIndex)] if ( type == 'GLOBALS' ): variables = stackElement[0].f_globals else: variables = stackElement[0].f_locals xmlVariables = ['<VARIABLES type="'+type+'">' ] for mapElement in variables.iteritems(): xmlVariables.append('<VARIABLE ') xmlVariables.append('name="'+utils.removeForXml(mapElement[0])+'" ') xmlVariables.append('content="'+utils.removeForXml(str(mapElement[1]))+'" ') xmlVariables.append( '/>') xmlVariables.append('</VARIABLES>') self.populateToClient( xmlVariables ) def variablesSubCommand( self , frame , verb , arg , cmd ): self.cmd = cmd if ( arg == None ): arg = "0" else: arg , optarg = self.nextArg(arg) # split BP arguments self.dealWithVariables( frame , verb , arg ) self.cmd = FREEZE def nextArg( self , toParse ): nextSpace = string.strip(toParse).find(" ") if ( nextSpace == -1 ): return string.strip(toParse) , None else: return string.strip(toParse[:nextSpace]) , string.strip(toParse[nextSpace+1:]) # rough command/subcommand syntax analyzer def commandSyntax( self , command ): self.cmd = UNKNOWN verb , arg = self.nextArg(command) return verb , arg def quiting( self ): self.populateToClient( ['<TERMINATE/>'] ) self.set_quit() def parseSingleCommand( self , command ): verb , arg = self.commandSyntax( command ) if ( string.upper(verb) == "CMD" ): return self.dealWithCmd( verb , arg ) if ( string.upper(verb) == "READSRC" ): return self.dealWithRead( verb , arg ) if ( string.upper(verb) == "SETARGS" ): return self.dealWithSetArgs( arg ) elif ( string.upper(verb) == "DBG" ): return self.dealWithDebug( verb, arg ) elif ( string.upper(verb) == "STOP" ): return None else: return utils.parsedReturned( message = "JPyDaemon SYNTAX ERROR : " + command ) # receive a command when in debugging state using debuggee's frame local and global # contexts def parseSubCommand( self , command , frame ): if ( command == None ): # in case of IP socket Failures return UNKNOWN verb , arg = self.commandSyntax( command ) if ( string.upper(verb) == "CMD" ): self.populateCommandToClient( command , self.dealWithCmd( verb , arg , myGlobals= frame.f_globals , myLocals = frame.f_locals ) ) self.cmd = FREEZE elif ( string.upper(verb) == "READSRC" ): self.populateCommandToClient( command , self.dealWithRead( verb , arg ) ) self.cmd = FREEZE elif ( string.upper(verb) == "NEXT" ): self.cmd = NEXT self.set_next(frame) elif ( string.upper(verb) == "STEP" ): self.cmd = STEP self.set_step() elif ( string.upper(verb) == "RUN" ): self.cmd = RUN self.set_continue() elif ( string.upper(verb) == "STOP"): self.cmd = QUIT self.quiting() elif ( string.upper(verb) == "BP+"): self.cmd = SET_BP arg , optarg = self.nextArg(arg) # split BP arguments self.set_break( arg , int(optarg) ) self.cmd = FREEZE elif ( string.upper(verb) == "STACK"): self.cmd = STACK self.dealWithStack(frame) self.cmd = FREEZE elif ( string.upper(verb) == "LOCALS"): self.variablesSubCommand( frame , verb , arg , LOCALS ) elif ( string.upper(verb) == "GLOBALS"): self.variablesSubCommand( frame , verb , arg , GLOBALS ) elif ( string.upper(verb) == "BP-"): self.cmd = CLEAR_BP arg , optarg = self.nextArg(arg) # split BP arguments self.clear_break( arg , int(optarg) ) self.cmd = FREEZE return self.cmd # send command result back def populateCommandToClient( self , command , result ): self.populateToClient( [ '<' + result[0] , 'cmd="' +command+'"' , 'operation="' +utils.removeForXml(str(result[1]))+'"' , 'result="' +str(result[2])+'"' , '/>' ] ) if ( result[3] != None ): for element in result[3]: # print strElement self.populateToClient( [ '<COMMANDDETAIL ' , 'content="'+utils.removeForXml(element)+'"', ' />' ] ) # complementary TAG may be provided starting at position 4 if len(result) > 4 and (result[4]!=None): self.populateToClient( result[4] ) # check and execute a received command def parseCommand( self , command ): # IP exception populating None object if ( command == None ): return 0 # => stop daemon if ( self.verbose ): print command result = self.parseSingleCommand(command) if ( result == None ): self.populateToClient( ['<TERMINATE/>'] ) return 0 # stop requested self.populateCommandToClient( command , result ) return 1 # reading on network def readNetBuffer( self ): try: if ( self.lastBuffer.find('\n') != -1 ): return self.lastBuffer ; # buffer stills contains commands networkData = self.connection.recv(1024) if not networkData: # capture network interuptions if any return None data = self.lastBuffer + networkData return data except socket.error, (errno,strerror): print "recv interupted errno(%s) : %s" % ( errno , strerror ) return None # receive a command from the net def receiveCommand( self ): data = self.readNetBuffer() ; # data reception from Ip while ( data != None and data): eocPos = data.find('\n') nextPos = eocPos ; while ( nextPos < len(data) and \ ( data[nextPos] == '\n' or data[nextPos] == '\r') ): # ignore consecutive \n\r nextPos = nextPos+1 if ( eocPos != -1 ): # full command received in buffer self.lastBuffer = data[nextPos:] # cleanup received command from buffer returned = data[:eocPos] if (returned[-1] == '\r'): return returned[:-1] return returned data = self.readNetBuffer() ; # returning None on Ip Exception return None # start the deamon def start( self , port = PORT , host = None , debuggee = None ,debuggeeArgs = None ): if ( host == None ): # start in listen mode waiting for incoming sollicitors print "JPyDbg listening on " , port s = socket.socket( socket.AF_INET , socket.SOCK_STREAM ) s.bind( (HOST , port) ) s.listen(1) self.connection , addr = s.accept() print "connected by " , addr else: # connect back provided listening host print "JPyDbg connecting " , host , " on port " , port try: self.connection = socket.socket( socket.AF_INET , socket.SOCK_STREAM ) self.connection.connect( (host , port) ) print "JPyDbgI0001 : connected to " , host except socket.error, (errno,strerror): print "ERROR:JPyDbg connection failed errno(%s) : %s" % ( errno , strerror ) return None welcome = [ '<WELCOME/>' ] # populate debuggee's name for remote debugging bootstrap if debuggee != None: welcome = [ '<WELCOME' , 'debuggee="'+utils.removeForXml(debuggee)] if debuggeeArgs != None: welcome.append(string.join(debuggeeArgs)) # populate arguments after program Name # finally append XML closure welcome.append('" />') self.populateToClient( welcome ) while ( self.parseCommand( self.receiveCommand() ) ): pass print "'+++ JPy/sessionended/" self.connection.close() # start a listening instance when invoked as main program # without arguments # when [host [port]] are provided as argv jpydamon will try to # connect back host port instead of listening if __name__ == "__main__": instance = JPyDbg() port = PORT host = None localDebuggee = None args = None print "args = " , sys.argv if ( len(sys.argv) > 1 ): host=sys.argv[1] if ( len(sys.argv) > 2 ): port=int(sys.argv[2]) if ( len(sys.argv) > 3 ): localDebuggee=sys.argv[3] if ( len(sys.argv) > 4 ): args=sys.argv[4:] instance.start( host=host , port=port , debuggee=localDebuggee , debuggeeArgs=args ) print "deamon ended\n" --- NEW FILE: inspector.py --- #! /usr/bin/env python import sys import traceback import inspect import os import parser import symbol import token import types import string from types import ListType, TupleType def get_docs( source , basename ): """Retrieve information from the parse tree of a source file. source source code to parse. """ ast = parser.suite(source) return ModuleInfo(ast.totuple(1), basename) class SuiteInfoBase: _docstring = '' _name = '' def __init__(self, tree = None): self._class_info = {} self._function_info = {} if tree: self._extract_info(tree) def _extract_info(self, tree): # extract docstring if len(tree) == 2: found, vars = match(DOCSTRING_STMT_PATTERN[1], tree[1]) else: found, vars = match(DOCSTRING_STMT_PATTERN, tree[3]) if found: self._docstring = eval(vars['docstring']) # discover inner definitions for node in tree[1:]: found, vars = match(COMPOUND_STMT_PATTERN, node) if found: cstmt = vars['compound'] if cstmt[0] == symbol.funcdef: name = cstmt[2][1] self._function_info[name] = FunctionInfo(cstmt) elif cstmt[0] == symbol.classdef: name = cstmt[2][1] self._class_info[name] = ClassInfo(cstmt) def get_docstring(self): return self._docstring def get_name(self): return self._name def get_class_names(self): return self._class_info.keys() def get_class_info(self, name): return self._class_info[name] def __getitem__(self, name): try: return self._class_info[name] except KeyError: return self._function_info[name] class SuiteFuncInfo: # Mixin class providing access to function names and info. def get_function_names(self): return self._function_info.keys() def get_function_info(self, name): return self._function_info[name] class FunctionInfo(SuiteInfoBase, SuiteFuncInfo): def __init__(self, tree = None): self._name = tree[2][1] self._lineNo = tree[2][2] self._args = [] self.argListInfos( tree[3] ) SuiteInfoBase.__init__(self, tree and tree[-1] or None) def get_DeclareLineNo( self ): return self._lineNo def argListInfos( self , tree ): argType = tree[0] if argType == symbol.parameters: self.argListInfos(tree[2]) elif argType == symbol.varargslist: for arg in tree[1:]: self.argListInfos(arg) elif argType == symbol.fpdef: self._args.append(tree[1]) class ClassInfo(SuiteInfoBase): def __init__(self, tree = None): self._name = tree[2][1] self._lineNo = tree[2][2] SuiteInfoBase.__init__(self, tree and tree[-1] or None) def get_method_names(self): return self._function_info.keys() def get_method_info(self, name): return self._function_info[name] def get_DeclareLineNo(): return self._lineNo class ModuleInfo(SuiteInfoBase, SuiteFuncInfo): def __init__(self, tree = None, name = "<string>"): self._name = name SuiteInfoBase.__init__(self, tree) if tree: found, vars = match(DOCSTRING_STMT_PATTERN, tree[1]) if found: self._docstring = vars["docstring"] def match(pattern, data, vars=None): """Match `data' to `pattern', with variable extraction. pattern Pattern to match against, possibly containing variables. data Data to be checked and against which variables are extracted. vars Dictionary of variables which have already been found. If not provided, an empty dictionary is created. The `pattern' value may contain variables of the form ['varname'] which are allowed to match anything. The value that is matched is returned as part of a dictionary which maps 'varname' to the matched value. 'varname' is not required to be a string object, but using strings makes patterns and the code which uses them more readable. This function returns two values: a boolean indicating whether a match was found and a dictionary mapping variable names to their associated values. """ if vars is None: vars = {} if type(pattern) is ListType: # 'variables' are ['varname'] vars[pattern[0]] = data return 1, vars if type(pattern) is not TupleType: return (pattern == data), vars if len(data) != len(pattern): return 0, vars for pattern, data in map(None, pattern, data): same, vars = match(pattern, data, vars) if not same: break return same, vars # This pattern identifies compound statements, allowing them to be readily # differentiated from simple statements. # COMPOUND_STMT_PATTERN = ( symbol.stmt, (symbol.compound_stmt, ['compound']) ) # This pattern will match a 'stmt' node which *might* represent a docstring; # docstrings require that the statement which provides the docstring be the # first statement in the class or function, which this pattern does not check. # DOCSTRING_STMT_PATTERN = ( symbol.stmt, (symbol.simple_stmt, (symbol.small_stmt, (symbol.expr_stmt, (symbol.testlist, (symbol.test, (symbol.and_test, (symbol.not_test, (symbol.comparison, (symbol.expr, (symbol.xor_expr, (symbol.and_expr, (symbol.shift_expr, (symbol.arith_expr, (symbol.term, (symbol.factor, (symbol.power, (symbol.atom, (token.STRING, ['docstring']) )))))))))))))))), (token.NEWLINE, '') )) class jpyutils : def parsedReturned( self , command = 'COMMAND' , argument = None , message = None , details = None ): parsedCommand = [] parsedCommand.append(command) parsedCommand.append(argument) parsedCommand.append(message) parsedCommand.append(details) return parsedCommand def populateCMDException( self , arg , oldstd ): "global utility exception reporter for all pydbg classes" sys.stdout=oldstd tb , exctype , value = sys.exc_info() excTrace = traceback.format_exception( tb , exctype , value ) tb = None # release return self.parsedReturned( argument = arg , message = "Error on CMD" , details = excTrace ) def removeForXml( self , strElem , keepLinefeed = 0 ): "replace unsuported xml encoding characters" if (not keepLinefeed ): strElem = string.replace(strElem,'\n','') strElem = string.replace(strElem,'"',""") strElem = string.replace(strElem,'<','<') strElem = string.replace(strElem,'>','>') return strElem class xmlizer: "xmlize a provided syntax error or parsed module infos" def __init__(self , infos , baseName , fName , destFName , error = None ): self.Infos = infos self.Error = error self.Utils = jpyutils() self.FileName = fName self.DestFName = destFName self.BaseName = baseName def populate_error( self ): LINESTR = 'line ' reason=self.Error[len(self.Error)-1] lower =self.Error[2].find(LINESTR)+len(LINESTR) higher=self.Error[2].find(',',lower) lineNo=self.Error[2][lower:] self.Dest.write('<error fileid="'+ \ self.Utils.removeForXml(self.FileName)+ \ '" reason="' + \ reason + \ '" lineno="'+lineNo+'" />' ) def populate_class_infos( self , className ): classInfo = self.Infos.get_class_info(className) self.Dest.write('<class name="'+ \ self.Utils.removeForXml(className)+ \ '" doc="' + \ self.Utils.removeForXml(classInfo.get_docstring()) + \ '" lineno="'+ str(classInfo._lineNo) +'" >\n' ) # gather infos about class methods methods = classInfo.get_method_names() for methodName in methods: method = classInfo.get_method_info(methodName) self.populate_method_infos(method,methodName) self.Dest.write('</class>\n' ) def populate_function_arguments_infos( self , args ): for arg in args: self.Dest.write(' <argument name="' + \ arg[1] + '" lineno="' +\ str(arg[2]) + '"/>\n') def populate_method_infos( self , method , methodName): self.Dest.write('<function name="'+ \ self.Utils.removeForXml(methodName)+ \ '" doc="' + \ self.Utils.removeForXml(method.get_docstring()) + \ '" lineno="'+ str(method._lineNo) +'" >\n' ) if (len(method._args) != 0): self.populate_function_arguments_infos(method._args) self.Dest.write("</function>\n") def populate_function_infos( self , functionName ): functionInfo = self.Infos.get_function_info(functionName) self.populate_method_infos(functionInfo,functionName) def populate_tree( self ): self.Dest.write('<module name="'+self.BaseName+'">\n') classes = self.Infos.get_class_names() for curClass in classes: self.populate_class_infos(curClass) functions = self.Infos.get_function_names() for curFunction in functions: self.populate_function_infos(curFunction) self.Dest.write("</module>\n") def populate( self ): self.Dest = file( self.DestFName , 'w+' ) self.Dest.write( '<?xml version="1.0"?>\n') if self.Error != None: self.populate_error() else: self.populate_tree() self.Dest.close() class commander : "python source file inspector/introspector" def __init__(self , source , dest ): "constructor providing python source to inspect" self.SourceFile = source self.DestFile = dest self.Infos = None self.BaseName = None self.Errors = None def check(self): "make a syntaxic control of provided source " # execute requested dynamic command on this side fp = open(self.SourceFile,"r") sourceStr = fp.read() + "\n" try: # extract python module name self.BaseName = os.path.basename(os.path.splitext(self.SourceFile)[0]) self.Code = compile( sourceStr , self.SourceFile , "exec" ) self.Infos = get_docs( sourceStr , self.BaseName ) except: tb , exctype , value = sys.exc_info() self.Errors = traceback.format_exception(tb,exctype,value) pass xml = xmlizer(self.Infos , self.BaseName , self.SourceFile , self.DestFile , self.Errors ) xml.populate() # # inspector launcher entry point # if __name__ == "__main__": # inspect jpydaemon itself print "args = " , sys.argv if ( len(sys.argv) > 1 ): source = sys.argv[1] if ( len(sys.argv) > 2 ): dest = sys.argv[2] if ( len(sys.argv) < 3 ): sys.exit(12) # missing arguments instance = commander( source , dest) instance.check() pass |