summaryrefslogtreecommitdiff
path: root/ofxparse/ofxutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'ofxparse/ofxutil.py')
-rw-r--r--ofxparse/ofxutil.py262
1 files changed, 262 insertions, 0 deletions
diff --git a/ofxparse/ofxutil.py b/ofxparse/ofxutil.py
new file mode 100644
index 0000000..17c2f36
--- /dev/null
+++ b/ofxparse/ofxutil.py
@@ -0,0 +1,262 @@
+from __future__ import absolute_import, with_statement
+
+import os
+import copy
+import collections
+import xml.etree.ElementTree as ET
+
+if 'OrderedDict' in dir(collections):
+ odict = collections
+else:
+ import ordereddict as odict
+
+import six
+
+class InvalidOFXStructureException(Exception):
+ pass
+
+class OfxData(object):
+ def __init__(self, tag):
+ self.nodes = odict.OrderedDict()
+ self.tag = tag
+ self.data = ""
+
+ def add_tag(self, name):
+ name = name.lower()
+ if name not in self.nodes:
+ self.nodes[name] = OfxData(name.upper())
+ return self.nodes[name]
+ elif not isinstance(self.nodes[name], list):
+ self.nodes[name] = [self.nodes[name], OfxData(name.upper())]
+ else:
+ self.nodes[name].append(OfxData(name.upper()))
+ return self.nodes[name][-1]
+
+ def del_tag(self, name):
+ name = name.lower()
+ if name in self.nodes:
+ del self.nodes[name]
+
+ def __setattr__(self, name, value):
+ if name in self.__dict__ or name in ['nodes', 'tag', 'data', 'headers', 'xml']:
+ self.__dict__[name] = value
+ else:
+ self.del_tag(name)
+ if isinstance(value, list):
+ for val in value:
+ tag = self.add_tag(name)
+ tag.nodes = val.nodes
+ tag.data = val.data
+ elif isinstance(value, OfxData):
+ tag = self.add_tag(name)
+ tag.nodes = value.nodes
+ tag.data = value.data
+ else:
+ tag = self.add_tag(name)
+ tag.data = str(value)
+
+ def __getattr__(self, name):
+ if name in self.__dict__:
+ return self.__dict__[name]
+ elif name in self.__dict__['nodes']:
+ return self.__dict__['nodes'][name]
+ else:
+ tag = self.add_tag(name)
+ return tag
+
+ def __delattr__(self, name):
+ if name in self.__dict__:
+ del self.__dict__[name]
+ elif name in self.__dict__['nodes']:
+ del self.__dict__['nodes'][name]
+ else:
+ raise AttributeError
+
+ def __getitem__(self, name):
+ item_list = []
+ self.find(name, item_list)
+ return item_list
+
+ def find(self, name, item_list):
+ for n, child in six.iteritems(self.nodes):
+ if isinstance(child, OfxData):
+ if child.tag.lower() == name:
+ item_list.append(child)
+ child.find(name, item_list)
+ else:
+ for grandchild in child:
+ if grandchild.tag.lower() == name:
+ item_list.append(grandchild)
+ grandchild.find(name, item_list)
+
+ def __iter__(self):
+ for k, v in six.iteritems(self.nodes):
+ yield v
+
+ def __contains__(self, item):
+ return item in self.nodes
+
+ def __len__(self):
+ return len(self.nodes)
+
+ def __str__(self):
+ return os.linesep.join("\t" * line[1] + line[0] for line in self.format())
+
+ def format(self):
+ if self.data or not self.nodes:
+ if self.tag.upper() == "OFX":
+ return [["<%s>%s</%s>" % (self.tag, self.data if self.data else "", self.tag), 0]]
+ return [["<%s>%s" % (self.tag, self.data), 0]]
+ else:
+ ret = [["<%s>" % self.tag, -1]]
+ for name, child in six.iteritems(self.nodes):
+ if isinstance(child, OfxData):
+ ret.extend(child.format())
+ else:
+ for grandchild in child:
+ ret.extend(grandchild.format())
+ ret.append(["</%s>" % self.tag, -1])
+
+ for item in ret:
+ item[1] += 1
+
+ return ret
+
+
+class OfxUtil(OfxData):
+ def __init__(self, ofx_data=None):
+ super(OfxUtil, self).__init__('OFX')
+ self.headers = odict.OrderedDict()
+ self.xml = ""
+ if ofx_data:
+ if isinstance(ofx_data, six.string_types) and not ofx_data.lower().endswith('.ofx'):
+ self.parse(ofx_data)
+ else:
+ self.parse(open(ofx_data).read() if isinstance(ofx_data, six.string_types) else ofx_data.read())
+
+ def parse(self, ofx):
+ try:
+ for line in ofx.splitlines():
+ if line.strip() == "":
+ break
+ header, value = line.split(":")
+ self.headers[header] = value
+ except ValueError:
+ pass
+ except:
+ raise
+ finally:
+ if "OFXHEADER" not in self.headers:
+ self.headers["OFXHEADER"] = "100"
+ if "VERSION" not in self.headers:
+ self.headers["VERSION"] = "102"
+ if "SECURITY" not in self.headers:
+ self.headers["SECURITY"] = "NONE"
+ if "OLDFILEUID" not in self.headers:
+ self.headers["OLDFILEUID"] = "NONE"
+ if "NEWFILEUID" not in self.headers:
+ self.headers["NEWFILEUID"] = "NONE"
+
+ try:
+ tags = ofx.split("<")
+ if len(tags) > 1:
+ tags = ["<" + t.strip() for t in tags[1:]]
+
+ heirarchy = []
+ can_open = True
+
+ for i, tag in enumerate(tags):
+ gt = tag.index(">")
+ if tag[1] != "/":
+ #Is an opening tag
+ if not can_open:
+ tags[i - 1] = tags[i - 1] + "</" + heirarchy.pop() + ">"
+ can_open = True
+ tag_name = tag[1:gt].split()[0]
+ heirarchy.append(tag_name)
+ if len(tag) > gt + 1:
+ can_open = False
+ else:
+ #Is a closing tag
+ tag_name = tag[2:gt].split()[0]
+ if tag_name not in heirarchy:
+ #Close tag with no matching open, so delete it
+ tags[i] = tag[gt + 1:]
+ else:
+ #Close tag with matching open, but other open tags that need to be closed first
+ while(tag_name != heirarchy[-1]):
+ tags[i - 1] = tags[i - 1] + "</" + heirarchy.pop() + ">"
+ can_open = True
+ heirarchy.pop()
+
+ self.xml = ET.fromstringlist(tags)
+ self.load_from_xml(self, self.xml)
+ except:
+ raise InvalidOFXStructureException
+
+ def load_from_xml(self, ofx, xml):
+ ofx.data = xml.text
+ for child in xml:
+ tag = ofx.add_tag(child.tag)
+ self.load_from_xml(tag, child)
+
+ def reload_xml(self):
+ super(OfxUtil, self).__init__('OFX')
+ self.load_from_xml(self, self.xml)
+
+ def write(self, output_file):
+ with open(output_file, 'wb') as f:
+ f.write(str(self))
+
+ def __str__(self):
+ ret = os.linesep.join(":".join(line) for line in six.iteritems(self.headers)) + os.linesep * 2
+ ret += super(OfxUtil, self).__str__()
+ return ret
+
+if __name__ == "__main__":
+ here = os.path.dirname(__file__)
+ fixtures = os.path.join(here, '../tests/fixtures/')
+ ofx = OfxUtil(fixtures + 'checking.ofx')
+# ofx = OfxUtil(fixtures + 'fidelity.ofx')
+
+
+ #Manipulate OFX file via XML library
+# for transaction in ofx.xml.iter('STMTTRN'):
+# transaction.find('NAME').text = transaction.find('MEMO').text
+# transaction.remove(transaction.find('MEMO'))
+# ofx.reload_xml()
+
+
+ #Manipulate OFX file via object tree built from XML
+# for transaction in ofx.bankmsgsrsv1.stmttrnrs.stmtrs.banktranlist.stmttrn:
+# transaction.name = transaction.memo
+# del transaction.memo
+# transaction.notes = "Acknowledged"
+
+ #Modified sytnax for object tree data manipulation
+ #I'm using the __getitem__ method like the xml.iter method from ElementTree, as a recursive search
+ for transaction in ofx['stmttrn']:
+ transaction.name = transaction.memo
+ del transaction.memo
+ transaction.notes = "Acknowledged"
+
+# for bal in ofx['bal']:
+# print(bal)
+
+# ofx.test = "First assignment operation"
+# ofx.test = "Second assignment operation"
+#
+ print(ofx)
+
+ #Write OFX data to output file
+# ofx.write('out.ofx')
+
+# for file_name in os.listdir(fixtures):
+# if os.path.isfile(fixtures + file_name):
+# print "Attempting to parse", file_name
+# ofx = OfxParser('fixtures/' + file_name)
+#
+# file_parts = file_name.split(".")
+# file_parts.insert(1, 'v2')
+# with open('fixtures/' + ".".join(file_parts), 'wb') as f:
+# f.write(str(ofx))