[CompStrm Wiki] wdb and tks2
Status: Beta
Brought to you by:
blaforge
From: Bill la F. <laf...@ya...> - 2006-01-13 00:08:12
|
from time import time from weakref import WeakValueDictionary from heapq import heappush,heappop from tks.randompot import UUID from os.path import exists from threading import Lock class block: "a cachable block buffer" def __init__(self,recnbr,blksize,bimage): self.blksize=blksize self.recnbr=recnbr self.bimage=bimage self.s=None self.l=None self.inrheap=False self.inwheap=False def gets(self): if self.s==None: if self.l==None: raise Exception('empty buffer') else: self.s=chr(255).join(self.l) if len(self.s)>self.blksize: raise Exception('block puts: string greater than blksize') return self.s def puts(self,s): self.bimage(self) i=s.rfind(chr(255)) if i<0: raise Exception('record has no terminator') s=s[:i+1] if len(s)>self.blksize: raise Exception('block puts: string greater than blksize') self.s=s self.l=None def getl(self): if self.l==None: if self.s==None: raise Exception('empty buffer') else: self.l=self.s.split(chr(255)) return self.l def putl(self,l): self.bimage(self) self.l=l self.s=None def empty(self): return self.s==None and self.l==None def len(self): if self.s!=None: return len(self.s) if self.l!=None: ln=0 for kv in self.l: ln+=len(kv)+1 return l raise Exception('empty buffer') class cachefile: "Keeps the last accessed blocks in RAM" def __init__(self,file,bimage,maxrcache=1000,maxwcache=1000,blksize=8*1024): """ maxrcache is the number of blocks kept in RAM maxwcache is the number of dirty blocks kept in RAM """ self.file=file self.bimage=bimage self.maxr=maxrcache self.maxw=maxwcache self.blksize=blksize self.wd=WeakValueDictionary() self.rheap=[] self.wheap=[] def flush(self): "flush all pending writes" wheap=self.wheap dl=[] for ts,b in wheap: if not b.inwheap: continue dl.append((b.recnbr,b.gets())) dl.sort() for recnbr,s in dl: s=s.ljust(self.blksize) self.file.seek(self.blksize*recnbr) self.file.write(s) self.file.flush() def buffer(self,recnbr): "locate a buffer or create a new one" try: b=self.wd[recnbr] except: b=block(recnbr,self.blksize,self.bimage) return b def put(self,buffer,force=False): "write a block" recnbr=buffer.recnbr self.add(recnbr,buffer) ts=time() buffer.timeoflastaccess=ts if force: buffer.inwheap=False s=buffer.gets() s=s.ljust(self.blksize) self.file.seek(self.blksize*recnbr) self.file.write(s) return if buffer.inwheap: return buffer.inwheap=True heappush(self.wheap,(ts,buffer)) while len(self.wheap): t,b=heappop(self.wheap) ts=b.timeoflastaccess if not b.inwheap: continue if t==ts: b.inwheap=False s=b.gets() s=s.ljust(self.blksize) self.file.seek(self.blksize*recnbr) self.file.write(s) continue heappush(self.rheap,(ts,b)) def get(self,recnbr): "locate a block or read it." try: buffer=self.wd[recnbr] except: self.file.seek(self.blksize*recnbr) buffer=self.buffer(recnbr) buffer.puts(self.file.read(self.blksize)) self.add(recnbr,buffer) buffer.timeoflastaccess=time() return buffer def add(self,recnbr,buffer): "add a buffer to the cache" self.wd[recnbr]=buffer ts=time() buffer.timeoflastaccess=ts if buffer.inrheap: return heappush(self.rheap,(ts,buffer)) buffer.inrheap=True while len(self.rheap): t,b=heappop(self.rheap) ts=b.timeoflastaccess if t==ts: b.inrheap=False continue heappush(self.rheap,(ts,b)) class tranfile: """ Cached and transactional block I/O self.root is the block number of the btree root block """ def __init__(self,filename,blksize=8*1024,maxrcache=0,maxwcache=0): """ filename names the database. "before-"+filename is the name of the before-image file. blksize is the size of the blocks maxrcache is the number of blocks cached in RAM maxwcache is the number of dirty blocks cached in RAM """ if not maxrcache: maxrcache=(1024*1024)/blksize if not maxwcache: maxwcache=(1024*1024)/blksize ms=len(UUID()) self.ms=ms blksize=int(blksize) self.bblksize=self.ms+16+blksize if not exists(filename+'.wdb'): f=file(filename+'.wdb','wb',blksize) f.write(''.join((str(blksize).zfill(16),'1'.zfill(16),'0'.zfill(16),chr(255)))) f.close() f=file(filename+'.wdbb','wb',self.bblksize) f.write(''.ljust(self.bblksize)) f.close() self.blksize=blksize self.file=file(filename+'.wdb','r+b',blksize) self.cache=cachefile(self.file,self.bimage,maxrcache=maxrcache,maxwcache=maxwcache,blksize=blksize) r0=self.file.read(blksize) bsz=long(r0[:16]) if bsz!=blksize: raise Exception('cachefile has wrong blksize') self.ms=len(UUID()) self.bblksize=ms+16+blksize self.bfile=file(filename+'.wdbb','r+b',self.bblksize) self.before={} self.tran=None self.bfile.seek(0,2) sz=self.bfile.tell()/self.bblksize self.bfile.seek(0) bs=self.bfile.read(self.bblksize) sz-=1 t=bs[:ms] if t!=''.ljust(self.ms): while True: rn=bs[ms:ms+16] s=bs[ms+16:] b=self.cache.buffer(rn) b.puts(s) self.putb(b) if not sz: break bs=self.bfile.read(self.bblksize) sz-=1 nt=bs[:ms] if nt!=t: break self.bfile.seek(0) r0=self.getb(0).gets() self.avail=long(r0[16:32]) self.root=long(r0[32:48]) def close(self): "Update close the files" self.file.close() self.bfile.close() def putnewl(self,l): "Allocate 1 block" a=self.avail self.avail+=1 b=self.cache.buffer(a) b.putl(l) self.cache.put(b,force=True) return a def putb(self,buffer,force=False): "write a block" self.cache.put(buffer,force=force) def getb(self,recnbr): "read a block" return self.cache.get(recnbr) def bimage(self,b): if b.empty(): return recnbr=b.recnbr if self.tran and (recnbr not in self.before): s=b.gets() self.bwrite(recnbr,s) def bwrite(self,recnbr,s): self.before[recnbr]=None s=s.ljust(self.blksize) bfile=self.bfile bfile.write(''.join((self.tran,str(recnbr).zfill(16),s))) bfile.flush() def save(self,s): """ save a large string (len > blksize/2) returns the starting block nbr """ l=len(s) s=str(l).zfill(16)+s l+=16 bs=(l+self.blksize-1)/self.blksize s=s.ljust(bs*self.blksize) a=self.avail self.avail+=bs self.file.seek(a*blksize) self.file.write(s) def load(self,recnbr): "fetch a large string previously saved" self.file.seek(recnbr*self.blksize) sl=self.file.read(16) l=long(sl) return self.file.read(l) def transaction(self): "start a transaction" if self.tran: raise Exception('tran in tran') self.tran=UUID() b=self.getb(0) self.bwrite(0,b.gets()) def commit(self): "commit a transaction" if not self.tran: raise Exception('commit before tran') r0=''.join((str(self.blksize).zfill(16),str(self.avail).zfill(16),str(self.root).zfill(16),chr(255))) b=self.cache.buffer(0) b.puts(r0) self.cache.put(b,force=True) self.cache.flush() self.bfile.seek(0) self.bfile.write(''.ljust(self.bblksize)) self.bfile.flush() self.bfile.seek(0) self.before={} self.tran=None class btree: """ A write once, read many transactional btree file. All keys and values must be 7-bit ascii. Duplicate keys stored in chronological order. Reserved characters: chr(255) - record separator chr(254) - key/value separator chr(253) - indirect value indicator """ def __init__(self,filename,blksize=1024,maxrcache=0,maxwcache=0): """ filename names the database. "before-"+filename is the name of the before-image file. blksize is the size of the blocks maxrcache is the number of blocks cached in RAM maxwcache is the number of dirty blocks cached in RAM """ if blksize<64: blksize=64 tf=tranfile(filename,blksize=blksize,maxrcache=maxrcache,maxwcache=maxwcache) self.tf=tf self.blksize=tf.blksize self.lock=Lock() self.putcount=0 if not tf.root: tf.transaction() tf.root=tf.putnewl(['L'+chr(255)]) tf.commit() def close(self): "Update block 0 and close the files" self.tf.close() def tran(self): "start a transaction" self.tf.transaction() def commit(self): "commit a transaction" self.tf.commit() def items(self,key,limit): "yields k,v where limit <= k <= key, in descending order by k" putcount=self.putcount c,k,v=self._get(key) if not k: return if k<limit: return yield k,v while True: if putcount!=self.putcount: raise Exception("intervening put invalidates items") c,k,v=self._prior(c) if not k: return if k<limit: return yield k,v def _get(self,key): """ Get a value. Returns cursor,key,value. (Cursor is None when called in a transaction.) The returned key may be less than the argument key. When there is no matching value, returns None,None,None. """ self.lock.acquire() try: nl,bu=self._getleaf(key) kl=bu.getl() f=len(kl)-2 for i in range(f,0,-1): k,v=kl[i].split(chr(254)) if k<=key or i==1: break if k>key: return None,None,None if v.startswith(chr(253)): vn=long(v[1:]) v=self.tf.load(vn) nl.append((bu,i)) if self.tf.tran: return None,k,v return nl,k,v finally: self.lock.release() def _prior(self,nl): """ Get the prior value. Returns cursor,key,value. When there is no matching value, returns None,None,None. """ self.lock.acquire() try: tf=self.tf i=0 while len(nl): bu,i=nl.pop() if i>1: break i=0 if not i: return None,None,None i-=1 nl.append((bu,i)) kl=bu.getl() k,v=kl[i].split(chr(254)) while kl[0]=='N': bn=long(v) bu=tf.getb(bn) kl=bu.getl() i=len(kl)-2 nl.append((bu,i)) k,v=kl[i].split(chr(254)) if v.startswith(chr(253)): vn=long(v[1:]) v=self.tf.load(vn) return nl,k,v finally: self.lock.release() def _getleaf(self,key): nl=[] tf=self.tf bn=tf.root while True: bu=tf.getb(bn) kl=bu.getl() if kl[0]=='L': return nl,bu f=len(kl)-2 for i in range(f,0,-1): k,sbn=kl[i].split(chr(254)) if k<=key or i==1: break nl.append((bu,i)) bn=long(sbn) def put(self,key,value): """ Add a key/value. """ tf=self.tf if len(key)>self.blksize/4: raise Exception('key length is more than blksize/4') if not tf.tran: raise Exception('put not in tran') self.putcount+=1 if len(value)>self.blksize/2: vn=tf.save(value) value=chr(253)+str(vn) nl,bu=self._getleaf(key) kl=bu.getl() f=len(kl)-2 if f: for i in range(f,0,-1): k,v=kl[i].split(chr(254)) if k<=key or i==1: break if k<=key: i+=1 else: i=1 nkv=chr(254).join((key,value)) kl.insert(i,nkv) while True: ln=0 for kv in kl: ln+=len(kv)+1 if ln<=self.blksize: bu.putl(kl) tf.putb(bu) return ln=0 kl1=kl kl0=[] t=kl[0] while ln<(self.blksize/2) and len(kl1)>2: kv=kl1.pop(0) ln+=len(kv)+1 kl0.append(kv) kl0.append('') kl1.insert(0,t) kv0=kl0[1] kv1=kl1[1] i0=kv0.find(chr(254)) i1=kv1.find(chr(254)) k0=kv0[0:i0] k1=kv1[0:i1] tf=self.tf bn0=bu.recnbr bu.putl(kl0) tf.putb(bu) bn1=tf.putnewl(kl1) v0=str(bn0) v1=str(bn1) if not len(nl): print 'btree root split' kl=['N',k0+chr(254)+v0,k1+chr(254)+v1,''] bn=tf.putnewl(kl) tf.root=bn return bu,i=nl.pop() kl=bu.getl() kl[i]=chr(254).join((k0,v0)) kl.insert(i+1,chr(254).join((k1,v1))) |