From: Fred L. D. <fd...@us...> - 2003-07-14 16:30:04
|
Update of /cvsroot/cvs-syncmail/syncmail In directory sc8-pr-cvs1:/tmp/cvs-serv7352 Added Files: branchctl Log Message: Script which controls the use of branches within a CVS repository. For each branch (including the trunk), two controls are available: whether the branch is open for commits, and whether commits involving that branch and any other should be permitted. Both default to true since that's the normal CVS behavior. Much of this code is based on the new-config-branch version of syncmail, and allowed portions of that to be tested in a simpler environment. --- NEW FILE: branchctl --- #! /usr/bin/env python # # Script to control whether CVS branches are open or closed for # changes. # # This is intended to be called from the commitinfo hook. """\ Script which controls the use of branches within a CVS repository. Usage: %(PROGRAM)s [options] [<%%S> [email-addr ...]] Where options are: --config=file Use file as the configuration file. By default, a file named %(DEFAULT_CONFIGURATION_FILE) is used if it exists. If the name of the configuration file is relative, it is interpreted as relative to the CVSROOT administrative directory of the repository. --config is incompatible with --no-config. --no-config Do not use a configuration file. This can be used to disable the use of the default configuration file. --no-config is incompatible with --config. --cvsroot=<path> Use <path> as the environment variable CVSROOT. Otherwise this variable must exist in the environment. --quiet / -q Don't print as much status to stdout. --help / -h Print this text. """ # The configuration data can co-exist in the same configuration file # with the configuration for the "new-world" version of syncmail. import getopt import os import re import string import sys DEFAULT_CONFIGURATION_FILE = "branchctl.conf" DEFAULT_CONFIGURATION = { "verbose": "true", "open": "true", } def usage(code, msg=''): print __doc__ % globals() if msg: print msg sys.exit(code) class CVSEntry: def __init__(self, name, revision, timestamp, conflict, options, tagdate): self.name = name self.revision = revision self.timestamp = timestamp self.conflict = conflict self.options = options self.tagdate = tagdate def get_entry(prefix, mapping, line, filename): line = string.strip(line) parts = string.split(line, "/") _, name, revision, timestamp, options, tagdate = parts tagdate = string.rstrip(tagdate) or None key = namekey(prefix, name) try: entry = mapping[key] except KeyError: if revision == "0": revision = None if "+" in timestamp: timestamp, conflict = tuple(string.split(timestamp, "+")) else: conflict = None entry = CVSEntry(key, revision, timestamp, conflict, options, tagdate) mapping[key] = entry return entry def namekey(prefix, name): if prefix: return os.path.join(prefix, name) else: return name def load_change_info(prefix=None): if prefix is not None: entries_fn = os.path.join(prefix, "CVS", "Entries") else: entries_fn = os.path.join("CVS", "Entries") entries_log_fn = entries_fn + ".Log" mapping = {} f = open(entries_fn) while 1: line = f.readline() if not line: break ## if string.strip(line) == "D": ## continue # we could recurse down subdirs, except the Entries.Log files # we need haven't been written to the subdirs yet, so it # doesn't do us any good ## if line[0] == "D": ## name = string.split(line, "/")[1] ## dirname = namekey(prefix, name) ## if os.path.isdir(dirname): ## m = load_change_info(dirname) ## mapping.update(m) if line[0] == "/": # normal file get_entry(prefix, mapping, line, entries_fn) # else: bogus Entries line f.close() if os.path.isfile(entries_log_fn): f = open(entries_log_fn) while 1: line = f.readline() if not line: break if line[1:2] != ' ': # really old version of CVS break entry = get_entry(prefix, mapping, line[2:], entries_log_fn) parts = string.split(line, "/")[1:] if line[0] == "A": # adding a file entry.new_revision = parts[1] elif line[0] == "R": # removing a file entry.new_revision = None f.close() for entry in mapping.values(): if not hasattr(entry, "new_revision"): print 'confused about file', entry.name, '-- ignoring' del mapping[entry.name] return mapping def load_branch_name(): tag_fn = os.path.join("CVS", "Tag") if os.path.isfile(tag_fn): f = open(tag_fn) line = string.strip(f.readline()) f.close() if line[:1] == "T": return line[1:] return None TRUE_VALUES = ('true', 'on', 'enabled') FALSE_VALUES = ('false', 'off', 'disabled') class OptionLookup: def __init__(self, dicts, branch=None): self._dicts = dicts self._replace = Replacer({ "BRANCH": branch or "", "CVSROOT": os.environ.get("CVSROOT", ""), "HOSTNAME": os.environ.get("HOSTNAME") or getfqdn(), }) def get(self, option): for dict in self._dicts: v = dict.get(option) if v is not None: return self._replace(v) return None def getbool(self, option): v = self.get(option) if v is not None: v = string.lower(v) if v in TRUE_VALUES: v = 1 elif v in FALSE_VALUES: v = 0 else: raise ValueError("illegal boolean value: %s" % `v`) return v def getint(self, option): v = self.get(option) if v is not None: v = int(v) return v def getaddress(self, option): """Return (host, port) for a host:port or host string. The port, if ommitted, will be None. """ v = self.get(option) if v is None: return MAILHOST, MAILPORT elif ":" in v: h, p = tuple(string.split(v, ":")) p = int(p) return h or MAILHOST, p else: return v, MAILPORT # Support for $VARIABLE replacement. class Replacer: def __init__(self, vars): self._vars = vars rx = re.compile(r"\$([a-zA-Z][a-zA-Z_]*\b|\{[a-zA-Z][a-zA-Z_]*\})") self._search = rx.search def __call__(self, v): v, name, suffix = self._split(v) while name: v = v + self._vars.get(name, "") prefix, name, suffix = self._split(suffix) v = v + prefix return v def _split(self, s): m = self._search(s) if m is not None: name = m.group(1) if name[0] == "{": name = name[1:-1] return s[:m.start()], name, s[m.end():] else: return s, None, '' def get_section_as_dict(config, section): d = {} if config.has_section(section): for opt in config.options(section): d[opt] = config.get(section, opt, raw=1) return d from ConfigParser import ConfigParser class ConfigParser(ConfigParser): # Regular expressions for parsing section headers and options, # from the Python 2.3 version of ConfigParser. SECTCRE = re.compile( r'\[' # [ r'(?P<header>[^]]+)' # very permissive! r'\]' # ] ) # For compatibility with older versions: __SECTCRE = SECTCRE class ConfigurationDatabase: def __init__(self, filename, cmdline, args): self.args = args self.cmdline = cmdline self.config = None if filename and os.path.isfile(filename): self.config = ConfigParser() if hasattr(cp, "readfp"): self.config.readfp(open(filename), filename) else: # We have to use this old method for compatibility with # ancient versions of Python. self.config.read([filename]) def get_config(self, branch): dicts = [] if self.config: if branch: dicts.append(get_section_as_dict(cp, "branch " + branch)) dicts.append(get_section_as_dict(cp, "branch")) dicts.append(self.cmdline) dicts.append(get_section_as_dict(cp, "general")) else: dicts.append(self.cmdline) dicts = filter(None, dicts) dicts.append(DEFAULT_CONFIGURATION.copy()) return Options(OptionLookup(dicts, branch), self.args) class OptionsBase: def __init__(self, config, args): self.args = args self.verbose = config.getbool('verbose') fn = os.path.join("CVS", "Repository") if os.path.isfile(fn): f = open(fn) self.repodir = string.strip(f.readline()) f.close() else: self.repodir = os.curdir def get_admin_file(name): if os.environ.has_key("CVSROOT"): p = os.path.join(os.environ["CVSROOT"], "CVSROOT", name) return os.path.abspath(p) else: return os.path.join("CVSROOT", name) def load_configuration(args): cmdline, args = load_cmdline(args) if cmdline.has_key('config-file'): cfgfile = get_admin_file(cmdline['config-file']) del cmdline['config-file'] else: defconfig = get_admin_file(DEFAULT_CONFIGURATION_FILE) if os.path.isfile(defconfig): cfgfile = defconfig else: cfgfile = None return ConfigurationDatabase(cfgfile, cmdline, args) class Options(OptionsBase): def __init__(self, config, args): OptionsBase.__init__(self, config, args) self.open = config.getbool("open") def load_cmdline(args): try: opts, args = getopt.getopt(args, 'hq', ['cvsroot=', 'config=', 'no-config', 'help', 'quiet']) except getopt.error, msg: usage(2, msg) cmdline = {} def set(option, value, cmdline=cmdline): if cmdline.has_key(option): usage(2, "can't set option more than once") cmdline[option] = value for opt, arg in opts: if opt in ('-h', '--help'): usage(0) elif opt == '--cvsroot': os.environ["CVSROOT"] = arg elif opt == "--config": set('config-file', arg) elif opt == '--no-config': set('config-file', '') elif opt in ('-q', '--quiet'): set('verbose', 'false') return cmdline, args def main(): branch = load_branch_name() configdb = load_configuration(sys.argv[1:], branch) changes = load_change_info().values() d = {} for change in changes: tag = changes.tagdate or branch d[tag] = 1 closed = 0 for tag in d.keys(): config = configdb.get_config(tag) if not config.open: closed = 1 if tag: print "branch %s is closed" % `tag` else: print "trunk is closed" if closed: sys.exit(1) if __name__ == '__main__': main() |