|
From: Fred L. D. <fd...@us...> - 2003-07-14 16:30:04
|
Update of /cvsroot/cvs-syncmail/syncmail
In directory sc8-pr-cvs1:/tmp/cvs-serv7352
Added Files:
branchctl
Log Message:
Script which controls the use of branches within a CVS repository.
For each branch (including the trunk), two controls are available:
whether the branch is open for commits, and whether commits involving
that branch and any other should be permitted. Both default to true
since that's the normal CVS behavior.
Much of this code is based on the new-config-branch version of
syncmail, and allowed portions of that to be tested in a simpler
environment.
--- NEW FILE: branchctl ---
#! /usr/bin/env python
#
# Script to control whether CVS branches are open or closed for
# changes.
#
# This is intended to be called from the commitinfo hook.
"""\
Script which controls the use of branches within a CVS repository.
Usage:
%(PROGRAM)s [options] [<%%S> [email-addr ...]]
Where options are:
--config=file
Use file as the configuration file. By default, a file named
%(DEFAULT_CONFIGURATION_FILE) is used if it exists. If the name of the
configuration file is relative, it is interpreted as relative
to the CVSROOT administrative directory of the repository.
--config is incompatible with --no-config.
--no-config
Do not use a configuration file. This can be used to disable
the use of the default configuration file. --no-config is
incompatible with --config.
--cvsroot=<path>
Use <path> as the environment variable CVSROOT. Otherwise this
variable must exist in the environment.
--quiet / -q
Don't print as much status to stdout.
--help / -h
Print this text.
"""
# The configuration data can co-exist in the same configuration file
# with the configuration for the "new-world" version of syncmail.
import getopt
import os
import re
import string
import sys
DEFAULT_CONFIGURATION_FILE = "branchctl.conf"
DEFAULT_CONFIGURATION = {
"verbose": "true",
"open": "true",
}
def usage(code, msg=''):
print __doc__ % globals()
if msg:
print msg
sys.exit(code)
class CVSEntry:
def __init__(self, name, revision, timestamp, conflict, options, tagdate):
self.name = name
self.revision = revision
self.timestamp = timestamp
self.conflict = conflict
self.options = options
self.tagdate = tagdate
def get_entry(prefix, mapping, line, filename):
line = string.strip(line)
parts = string.split(line, "/")
_, name, revision, timestamp, options, tagdate = parts
tagdate = string.rstrip(tagdate) or None
key = namekey(prefix, name)
try:
entry = mapping[key]
except KeyError:
if revision == "0":
revision = None
if "+" in timestamp:
timestamp, conflict = tuple(string.split(timestamp, "+"))
else:
conflict = None
entry = CVSEntry(key, revision, timestamp, conflict,
options, tagdate)
mapping[key] = entry
return entry
def namekey(prefix, name):
if prefix:
return os.path.join(prefix, name)
else:
return name
def load_change_info(prefix=None):
if prefix is not None:
entries_fn = os.path.join(prefix, "CVS", "Entries")
else:
entries_fn = os.path.join("CVS", "Entries")
entries_log_fn = entries_fn + ".Log"
mapping = {}
f = open(entries_fn)
while 1:
line = f.readline()
if not line:
break
## if string.strip(line) == "D":
## continue
# we could recurse down subdirs, except the Entries.Log files
# we need haven't been written to the subdirs yet, so it
# doesn't do us any good
## if line[0] == "D":
## name = string.split(line, "/")[1]
## dirname = namekey(prefix, name)
## if os.path.isdir(dirname):
## m = load_change_info(dirname)
## mapping.update(m)
if line[0] == "/":
# normal file
get_entry(prefix, mapping, line, entries_fn)
# else: bogus Entries line
f.close()
if os.path.isfile(entries_log_fn):
f = open(entries_log_fn)
while 1:
line = f.readline()
if not line:
break
if line[1:2] != ' ':
# really old version of CVS
break
entry = get_entry(prefix, mapping, line[2:], entries_log_fn)
parts = string.split(line, "/")[1:]
if line[0] == "A":
# adding a file
entry.new_revision = parts[1]
elif line[0] == "R":
# removing a file
entry.new_revision = None
f.close()
for entry in mapping.values():
if not hasattr(entry, "new_revision"):
print 'confused about file', entry.name, '-- ignoring'
del mapping[entry.name]
return mapping
def load_branch_name():
tag_fn = os.path.join("CVS", "Tag")
if os.path.isfile(tag_fn):
f = open(tag_fn)
line = string.strip(f.readline())
f.close()
if line[:1] == "T":
return line[1:]
return None
TRUE_VALUES = ('true', 'on', 'enabled')
FALSE_VALUES = ('false', 'off', 'disabled')
class OptionLookup:
def __init__(self, dicts, branch=None):
self._dicts = dicts
self._replace = Replacer({
"BRANCH": branch or "",
"CVSROOT": os.environ.get("CVSROOT", ""),
"HOSTNAME": os.environ.get("HOSTNAME") or getfqdn(),
})
def get(self, option):
for dict in self._dicts:
v = dict.get(option)
if v is not None:
return self._replace(v)
return None
def getbool(self, option):
v = self.get(option)
if v is not None:
v = string.lower(v)
if v in TRUE_VALUES:
v = 1
elif v in FALSE_VALUES:
v = 0
else:
raise ValueError("illegal boolean value: %s" % `v`)
return v
def getint(self, option):
v = self.get(option)
if v is not None:
v = int(v)
return v
def getaddress(self, option):
"""Return (host, port) for a host:port or host string.
The port, if ommitted, will be None.
"""
v = self.get(option)
if v is None:
return MAILHOST, MAILPORT
elif ":" in v:
h, p = tuple(string.split(v, ":"))
p = int(p)
return h or MAILHOST, p
else:
return v, MAILPORT
# Support for $VARIABLE replacement.
class Replacer:
def __init__(self, vars):
self._vars = vars
rx = re.compile(r"\$([a-zA-Z][a-zA-Z_]*\b|\{[a-zA-Z][a-zA-Z_]*\})")
self._search = rx.search
def __call__(self, v):
v, name, suffix = self._split(v)
while name:
v = v + self._vars.get(name, "")
prefix, name, suffix = self._split(suffix)
v = v + prefix
return v
def _split(self, s):
m = self._search(s)
if m is not None:
name = m.group(1)
if name[0] == "{":
name = name[1:-1]
return s[:m.start()], name, s[m.end():]
else:
return s, None, ''
def get_section_as_dict(config, section):
d = {}
if config.has_section(section):
for opt in config.options(section):
d[opt] = config.get(section, opt, raw=1)
return d
from ConfigParser import ConfigParser
class ConfigParser(ConfigParser):
# Regular expressions for parsing section headers and options,
# from the Python 2.3 version of ConfigParser.
SECTCRE = re.compile(
r'\[' # [
r'(?P<header>[^]]+)' # very permissive!
r'\]' # ]
)
# For compatibility with older versions:
__SECTCRE = SECTCRE
class ConfigurationDatabase:
def __init__(self, filename, cmdline, args):
self.args = args
self.cmdline = cmdline
self.config = None
if filename and os.path.isfile(filename):
self.config = ConfigParser()
if hasattr(cp, "readfp"):
self.config.readfp(open(filename), filename)
else:
# We have to use this old method for compatibility with
# ancient versions of Python.
self.config.read([filename])
def get_config(self, branch):
dicts = []
if self.config:
if branch:
dicts.append(get_section_as_dict(cp, "branch " + branch))
dicts.append(get_section_as_dict(cp, "branch"))
dicts.append(self.cmdline)
dicts.append(get_section_as_dict(cp, "general"))
else:
dicts.append(self.cmdline)
dicts = filter(None, dicts)
dicts.append(DEFAULT_CONFIGURATION.copy())
return Options(OptionLookup(dicts, branch), self.args)
class OptionsBase:
def __init__(self, config, args):
self.args = args
self.verbose = config.getbool('verbose')
fn = os.path.join("CVS", "Repository")
if os.path.isfile(fn):
f = open(fn)
self.repodir = string.strip(f.readline())
f.close()
else:
self.repodir = os.curdir
def get_admin_file(name):
if os.environ.has_key("CVSROOT"):
p = os.path.join(os.environ["CVSROOT"], "CVSROOT", name)
return os.path.abspath(p)
else:
return os.path.join("CVSROOT", name)
def load_configuration(args):
cmdline, args = load_cmdline(args)
if cmdline.has_key('config-file'):
cfgfile = get_admin_file(cmdline['config-file'])
del cmdline['config-file']
else:
defconfig = get_admin_file(DEFAULT_CONFIGURATION_FILE)
if os.path.isfile(defconfig):
cfgfile = defconfig
else:
cfgfile = None
return ConfigurationDatabase(cfgfile, cmdline, args)
class Options(OptionsBase):
def __init__(self, config, args):
OptionsBase.__init__(self, config, args)
self.open = config.getbool("open")
def load_cmdline(args):
try:
opts, args = getopt.getopt(args, 'hq',
['cvsroot=', 'config=', 'no-config',
'help', 'quiet'])
except getopt.error, msg:
usage(2, msg)
cmdline = {}
def set(option, value, cmdline=cmdline):
if cmdline.has_key(option):
usage(2, "can't set option more than once")
cmdline[option] = value
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt == '--cvsroot':
os.environ["CVSROOT"] = arg
elif opt == "--config":
set('config-file', arg)
elif opt == '--no-config':
set('config-file', '')
elif opt in ('-q', '--quiet'):
set('verbose', 'false')
return cmdline, args
def main():
branch = load_branch_name()
configdb = load_configuration(sys.argv[1:], branch)
changes = load_change_info().values()
d = {}
for change in changes:
tag = changes.tagdate or branch
d[tag] = 1
closed = 0
for tag in d.keys():
config = configdb.get_config(tag)
if not config.open:
closed = 1
if tag:
print "branch %s is closed" % `tag`
else:
print "trunk is closed"
if closed:
sys.exit(1)
if __name__ == '__main__':
main()
|