[f66b44]: pyke / knowledge_engine.py  Maximize  Restore  History

Download this file

372 lines (344 with data), 15.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# $Id$
# coding=utf-8
#
# Copyright Š 2007 Bruce Frederiksen
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import types
import os
import os.path
import re
if sys.version_info[0] < 3:
import itertools
class chain(object):
old_chain = itertools.chain
def __new__(cls, *args):
return cls.old_chain(*args)
@staticmethod
def from_iterable(i):
for iterable in i:
for x in iterable: yield x
itertools.chain = chain
import pyke
from pyke import (condensedPrint, contexts, pattern,
fact_base, rule_base, special)
class CanNotProve(StandardError):
pass
Name_test = re.compile(r'[a-zA-Z_][a-zA-Z0-9_]*$')
Bad_name_char = re.compile('[^a-zA-Z0-9_]')
class engine(object):
_Variables = tuple(contexts.variable('ans_%d' % i) for i in range(100))
def __init__(self, paths = ('.',),
gen_dir = '.', gen_root_dir = 'compiled_krb',
load_fc = True, load_bc = True):
if not Name_test.match(gen_root_dir):
raise ValueError(
"engine.__init__: gen_root_dir (%s) must be a legal python "
"identifier" % (gen_root_dir,))
self.knowledge_bases = {}
self.rule_bases = {}
special.create_for(self)
if paths != '*test*':
if isinstance(paths, types.ModuleType):
# secret hook for the compiler to initialize itself (so the
# compiled python module can be in an egg).
paths.populate(self)
else:
if isinstance(paths, types.StringTypes): paths = (paths,)
compile_list = _get_compile_list(paths, gen_dir, gen_root_dir)
if compile_list:
from pyke import krb_compiler
krb_compiler.compile(gen_dir, gen_root_dir, compile_list)
_check_list(compile_list, gen_dir, gen_root_dir)
compile_list2 = _load_paths(self, paths, gen_dir, gen_root_dir,
load_fc, load_bc, compile_list)
if compile_list2:
from pyke import krb_compiler
krb_compiler.compile(gen_dir, gen_root_dir, compile_list2)
_check_list(compile_list2, gen_dir, gen_root_dir)
for full_filename in compile_list2:
if not _load_file(self, full_filename,
gen_dir, gen_root_dir,
load_fc, load_bc, compile_list2):
raise AssertionError("version recompile failed")
for kb in self.knowledge_bases.itervalues(): kb.init2()
for rb in self.rule_bases.itervalues(): rb.init2()
def reset(self):
for rb in self.rule_bases.itervalues(): rb.reset()
for kb in self.knowledge_bases.itervalues(): kb.reset()
def get_kb(self, kb_name, _new_class = None):
ans = self.knowledge_bases.get(kb_name)
if ans is None:
if _new_class: ans = _new_class(self, kb_name)
else: raise KeyError("knowledge_base %s not found" % kb_name)
return ans
def get_rb(self, rb_name):
ans = self.rule_bases.get(rb_name)
if ans is None: raise KeyError("rule_base %s not found" % rb_name)
return ans
def get_create(self, rb_name, parent = None, exclude_list = ()):
ans = self.rule_bases.get(rb_name)
if ans is None:
ans = rule_base.rule_base(self, rb_name, parent, exclude_list)
elif ans.parent != parent or ans.exclude_set != frozenset(exclude_list):
raise AssertionError("duplicate rule_base: %s" % rb_name)
return ans
def add_universal_fact(self, kb_name, fact_name, args):
if isinstance(args, types.StringTypes):
raise TypeError("engine.add_universal_fact: "
"illegal args type, %s" % type(args))
args = tuple(args)
return self.get_kb(kb_name, fact_base.fact_base) \
.add_universal_fact(fact_name, args)
def add_case_specific_fact(self, kb_name, fact_name, args):
if isinstance(args, types.StringTypes):
raise TypeError("engine.add_case_specific_fact: "
"illegal args type, %s" % type(args))
args = tuple(args)
return self.get_kb(kb_name, fact_base.fact_base) \
.add_case_specific_fact(fact_name, args)
def assert_(self, kb_name, entity_name, args):
if isinstance(args, types.StringTypes):
raise TypeError("engine.assert_: "
"illegal args type, %s" % type(args))
args = tuple(args)
return self.get_kb(kb_name, fact_base.fact_base) \
.assert_(entity_name, args)
def activate(self, *rb_names):
for rb_name in rb_names: self.get_rb(rb_name).activate()
def lookup(self, kb_name, entity_name, pat_context, patterns):
return self.get_kb(kb_name).lookup(pat_context, pat_context,
entity_name, patterns)
def prove(self, kb_name, entity_name, pat_context, patterns):
return self.get_kb(kb_name).prove(pat_context, pat_context,
entity_name, patterns)
def prove_n(self, kb_name, entity_name, fixed_args, num_returns):
''' Generates: a tuple of len == num_returns, and a plan (or None).
'''
if isinstance(fixed_args, types.StringTypes):
raise TypeError("engine.prove_n: fixed_args must not be a string, "
"did you forget a , (%(arg)s) => (%(arg)s,)?" %
{'arg': repr(fixed_args)})
context = contexts.simple_context()
vars = self._Variables[:num_returns]
try:
for plan in self.prove(kb_name, entity_name, context,
tuple(pattern.pattern_literal(arg)
for arg in fixed_args) + vars):
final = {}
ans = tuple(context.lookup_data(var.name, final = final)
for var in vars)
if plan: plan = plan.create_plan(final)
yield ans, plan
finally:
context.done()
def prove_1(self, kb_name, entity_name, fixed_args, num_returns):
''' Returns a tuple of len == num_returns, and a plan (or None).
'''
try:
# All we need is the first one!
return self.prove_n(kb_name, entity_name, fixed_args, num_returns) \
.next()
except StopIteration:
raise CanNotProve("Can not prove %s.%s%s" %
(kb_name, entity_name,
condensedPrint.cprint(
fixed_args + self._Variables[:num_returns])))
def print_stats(self, f = sys.stdout):
for kb \
in sorted(self.knowledge_bases.itervalues(), key=lambda kb: kb.name):
kb.print_stats(f)
def trace(self, rb_name, rule_name):
self.get_rb(rb_name).trace(rule_name)
def untrace(self, rb_name, rule_name):
self.get_rb(rb_name).untrace(rule_name)
def _raise_exc(exc): raise exc
def _make_package_dirs(base_dir, package_path):
''' Creates directories in package_path, relative to base_dir, and makes
sure that each one has an __init__.py file in it.
Package_path is a sequence of path components.
Returns the full path (base_dir/package_path)
'''
if len(package_path) > 1: _make_package_dirs(base_dir, package_path[:-1])
full_package_path = os.path.join(base_dir, os.path.join(*package_path))
if not os.path.exists(full_package_path): os.mkdir(full_package_path)
init_file_path = os.path.join(full_package_path, '__init__.py')
if not os.path.exists(init_file_path): open(init_file_path, 'w').close()
return full_package_path
def _doctor_names(path):
''' Convert all path components into legal path names.
Return converted result.
'''
def fix_component(c):
if c[0] in '0123456789': c = '_' + c
return Bad_name_char.sub('_', c)
if not path: return []
return [fix_component(component) for component in path.split(os.path.sep)]
def _get_base_path(filename, gen_dir, gen_root_dir, makedirs = False):
if makedirs and not os.path.exists(gen_dir): os.makedirs(gen_dir)
fn_dirname, fn_name = os.path.split(filename)
fndrive, fnpath = os.path.splitdrive(os.path.abspath(fn_dirname))
# convert "c:\a\b\c" to "\c_drive\a\b\c"
if fndrive:
fn_boguspath = '%s%s_drive%s' % (os.path.sep, fndrive, fnpath)
else:
fn_boguspath = fnpath
#print "fn_boguspath:", fn_boguspath
gendrive, genpath = os.path.splitdrive(os.path.abspath(gen_dir))
if gendrive:
gen_boguspath = '%s%s_drive%s' % (os.path.sep, gendrive, genpath)
else:
gen_boguspath = genpath
#print "gen_boguspath:", gen_boguspath
# find commonprefix to last common path component
# unfortunately, os.path.commonprefix(['a/ba/c', 'a/bb/d']) gives 'a/b'!
commonprefix = os.path.commonprefix([fn_boguspath, gen_boguspath])
assert commonprefix and commonprefix[0] == os.path.sep
#print "commonprefix:", commonprefix
if commonprefix == fn_boguspath:
skip_len = len(commonprefix)
elif commonprefix == gen_boguspath:
skip_len = len(commonprefix) + 1
else:
skip_len = commonprefix.rindex(os.path.sep) + 1
fn_unique_tail = fn_boguspath[skip_len:]
#print "fn_unique_tail:", fn_unique_tail
assert not fn_unique_tail or fn_unique_tail[0] != os.path.sep
package_list = [gen_root_dir] + _doctor_names(fn_unique_tail)
if makedirs:
path = _make_package_dirs(gen_dir, package_list)
else:
path = os.path.join(gen_dir, *package_list)
return os.path.join(path, fn_name[:-4]), package_list
def _needs_compiling(filename, gen_dir, gen_root_dir):
source_mtime = os.stat(filename).st_mtime
base, ignore = _get_base_path(filename, gen_dir, gen_root_dir)
try:
ok = os.stat(base + '_fc.py').st_mtime > source_mtime
except OSError:
ok = None
if ok is None or ok:
try:
ok = os.stat(base + '_bc.py').st_mtime > source_mtime
except OSError:
if ok is None: ok = False
return not ok
def _get_compile_list(paths, gen_dir, gen_root_dir):
ans = []
for path in paths:
for dirpath, dirnames, filenames in os.walk(path, onerror=_raise_exc):
for filename in filenames:
if filename.endswith('.krb') and \
_needs_compiling(os.path.join(dirpath, filename),
gen_dir, gen_root_dir):
ans.append(os.path.join(dirpath, filename))
return ans
def _load_paths(engine, paths, gen_dir, gen_root_dir, load_fc, load_bc,
compile_list):
if gen_dir == '.':
if '' not in sys.path and os.path.abspath(gen_dir) not in sys.path:
sys.path.insert(0, '')
else:
if os.path.abspath(gen_dir) not in sys.path:
sys.path.insert(0, os.path.abspath(gen_dir))
ans = []
for path in paths:
for dirpath, dirnames, filenames in os.walk(path, onerror=_raise_exc):
for filename in filenames:
if filename.endswith('.krb'):
full_filename = os.path.join(dirpath, filename)
if not _load_file(engine, full_filename,
gen_dir, gen_root_dir, load_fc, load_bc,
compile_list):
ans.append(full_filename)
#print "_load_paths =>", ans
return ans
def _load_file(engine, filename, gen_dir, gen_root_dir, load_fc, load_bc,
compile_list):
base, package_list = _get_base_path(filename, gen_dir, gen_root_dir)
base_modulename = os.path.basename(base)
def load_module(type, do_import=True):
try:
os.stat(base + type + '.py')
except OSError:
return True
module_path = package_list + [base_modulename + type]
full_module_name = '.'.join(module_path)
#print >> sys.stderr, "loading:", full_module_name
module = None
if full_module_name in sys.modules:
#print "already imported"
module = sys.modules[full_module_name]
if filename in compile_list:
module = reload(module)
#print module
elif do_import:
#print "needs import"
module = _import(module_path)
if module is not None and \
(not hasattr(module, 'version') or module.version != pyke.version):
#print "load_module(%s, %s) => False" % (filename, type)
return False
if do_import: module.populate(engine)
#print "load_module(%s, %s) => True" % (filename, type)
return True
if load_fc:
if not load_module('_fc'): return False
if load_bc:
if not load_module('_plans', False): return False
if not load_module('_bc'): return False
return True
""" ******* for testing:
def trace_import(*args, **kws):
if args[0].endswith('_plans'):
sys.stderr.write("import: ")
if kws: sys.stderr.write("kws.keys(): %s " % (str(kws.keys()),))
for arg in args:
if isinstance(arg, dict):
if '__name__' in arg:
sys.stderr.write('%s[%d] ' % (arg['__name__'], len(arg)))
else:
sys.stderr.write(str(len(arg)) + ' ')
else:
sys.stderr.write(str(arg) + ' ')
sys.stderr.write('\n')
return old_import(*args)
import __builtin__
old_import = __builtin__.__import__
#print "__builtin__:", __builtin__, "old_import:", old_import
__builtin__.__import__ = trace_import
********* end testing """
def _import(modulepath):
''' modulepath does not include .py
'''
#print "_import:", modulepath
mod = __import__('.'.join(modulepath))
for comp in modulepath[1:]:
mod = getattr(mod, comp)
return mod
def _check_list(compile_list, gen_dir, gen_root_dir):
for filename in compile_list:
if _needs_compiling(filename, gen_dir, gen_root_dir):
raise AssertionError("%s didn't compile correctly" % filename)
def test():
import doctest
sys.exit(doctest.testmod()[0])
if __name__ == "__main__":
test()

Get latest updates about Open Source Projects, Conferences and News.

Sign up for the SourceForge newsletter:





No, thanks