Package Bio :: Package Mindy :: Module BaseDB
[hide private]
[frames] | no frames]

Source Code for Module Bio.Mindy.BaseDB

  1  import os 
  2  import Bio 
  3  import compression 
  4   
5 -def _int_str(i):
6 s = str(i) 7 if s[-1:] == "l": 8 return s[:-1] 9 return s
10
11 -class WriteDB:
12 # Must define 'self.filename_map' mapping from filename -> fileid 13 # Must define 'self.fileid_info' mapping from fileid -> (filename,size) 14
15 - def add_filename(self, filename, size, fileid_info):
16 fileid = self.filename_map.get(filename, None) 17 if fileid is not None: 18 return fileid 19 s = str(len(self.filename_map)) 20 self.filename_map[filename] = s # map from filename -> id 21 assert s not in fileid_info.keys(), "Duplicate entry! %s" % (s,) 22 self.fileid_info[s] = (filename, size) 23 return s
24
25 - def load(self, filename, builder, fileid_info, record_tag = "record"):
26 formatname = self.formatname 27 size = os.path.getsize(filename) 28 filetag = self.add_filename(filename, size, fileid_info) 29 30 source = compression.open_file(filename, "rb") 31 if formatname == "unknown": 32 formatname = "sequence" 33 34 format = Bio.formats.normalize(formatname).identifyFile(source) 35 if format is None: 36 raise TypeError("Cannot identify file as a %s format" % 37 (self.formatname,)) 38 if self.formatname == "unknown": 39 expected_names = ["fasta", "embl", "swissprot", "genbank"] 40 for node in format._parents: 41 if node.name in expected_names: 42 self.formatname = node.name 43 break 44 else: 45 self.formatname = format.name 46 47 iterator = format.make_iterator( 48 record_tag, 49 select_names = tuple(builder.uses_tags()) + (record_tag,), 50 debug_level = 0) 51 52 for record in iterator.iterate(source, cont_handler = builder): 53 self.add_record(filetag, 54 iterator.start_position, 55 iterator.end_position - iterator.start_position, 56 record.document)
57
58 -class DictLookup:
59 - def __getitem__(self, key):
60 raise NotImplementedError("Must be implemented in subclass")
61 - def keys(self):
62 raise NotImplementedError("Must be implemented in subclass")
63
64 - def values(self):
65 return [self[key] for key in self.keys()]
66 - def items(self):
67 return [(key, self[key]) for key in self.keys()]
68
69 - def get(self, key, default = None):
70 try: 71 return self[key] 72 except KeyError: 73 return default
74 75
76 -class OpenDB(DictLookup):
77 - def __init__(self, dbname, index_type):
78 self.dbname = dbname 79 80 config = read_config(os.path.join(dbname, "config.dat")) 81 if config["index"] != index_type: 82 raise TypeError("FlatDB does not support %r index" % 83 (config["index"],)) 84 self.primary_namespace = config["primary_namespace"] 85 self.secondary_namespaces = config["secondary_namespaces"] 86 self.formatname = config["format"] 87 88 filename_map = {} 89 fileid_info = {} 90 for k, v in config.items(): 91 if not k.startswith("fileid_"): 92 continue 93 fileid = k[7:] 94 filename, size = v 95 fileid_info[fileid] = v 96 filename_map[filename] = fileid 97 if os.path.getsize(filename) != size: 98 raise TypeError( 99 "File %s has changed size from %d to %d bytes!" % 100 (size, os.path.getsize(filename))) 101 102 self.filename_map = filename_map 103 self.fileid_info = fileid_info
104 105
106 - def lookup(self, *args, **kwargs):
107 if args: 108 if kwargs: 109 raise TypeError("Cannot specify both args and kwargs") 110 if len(args) != 1: 111 raise TypeError("Only one identifier handled") 112 namespace, name = self.primary_namespace, args[0] 113 114 else: 115 if len(kwargs) != 1: 116 raise TypeError("lookup takes a single key") 117 namespace, name = kwargs.items()[0] 118 return self[namespace][name]
119
120 - def __getitem__(self, namespace):
121 """return the database table lookup for the given namespace""" 122 raise NotImplementedError("must be implemented in the derived class")
123
124 - def keys(self):
125 return [self.primary_namespace] + self.secondary_namespaces
126 127 # Write the configuration
128 -def write_config(config_filename, 129 index_type, 130 primary_namespace, 131 secondary_namespaces, 132 fileid_info, 133 formatname):
134 configfile = open(config_filename, "wb") 135 136 # Write the header 137 configfile.write("index\t" + index_type + "\n") 138 139 # Write the namespace information 140 configfile.write("primary_namespace\t%s\n" % primary_namespace) 141 keys = secondary_namespaces[:] 142 keys.sort() 143 configfile.write("secondary_namespaces\t") 144 configfile.write("\t".join(keys) + "\n") 145 146 # Format name 147 configfile.write("format\t" + formatname + "\n") 148 149 # Write the fileid table 150 items = fileid_info.items() 151 items.sort() 152 for fileid, (filename, size) in items: 153 configfile.write("fileid_%s\t%s\t%s\n" % \ 154 (fileid, filename, _int_str(size))) 155 156 configfile.close()
157 158
159 -def read_config(config_filename):
160 d = {} 161 for line in open(config_filename, "rb").read().split("\n"): 162 words = line.rstrip().split("\t") 163 assert not d.has_key(words[0]), \ 164 "Duplicate key %r in config file: old = %r, new = %r" % \ 165 (words[0], d[words[0]], line) 166 if words[0] in ("index", "primary_namespace", "format"): 167 if len(words) != 2: 168 raise AssertionError( 169 "%s should only have one value, not %r" % \ 170 (words[0], words[1:])) 171 d[words[0]] = words[1] 172 173 elif words[0].startswith("fileid_"): 174 if len(words) != 3: 175 raise AssertionError( 176 "%s should only have two values, not %r" % \ 177 (words[0], words[1:])) 178 d[words[0]] = (words[1], long(words[2])) 179 180 elif words[0] in ("secondary_namespaces",): 181 # This can have 0 or more values 182 d[words[0]] = words[1:] 183 184 else: 185 # Unknown word, save as-is 186 d[words[0]] = words[1:] 187 188 return d
189