from __future__ import absolute_import, with_statement
import sys
import decimal
import datetime
import codecs
import re
import collections
import contextlib
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
import six
if 'OrderedDict' in dir(collections):
odict = collections
else:
import ordereddict as odict
from . import mcc
def skip_headers(fh):
'''
Prepare `fh` for parsing by BeautifulSoup by skipping its OFX
headers.
'''
if fh is None or isinstance(fh, six.string_types):
return
fh.seek(0)
header_re = re.compile(r"^\s*\w+:\s*\w+\s*$")
while True:
pos = fh.tell()
line = fh.readline()
if not line:
break
if header_re.search(line) is None:
fh.seek(pos)
return
def soup_maker(fh):
skip_headers(fh)
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(fh, "html.parser")
for tag in soup.findAll():
tag.name = tag.name.lower()
except ImportError:
from BeautifulSoup import BeautifulStoneSoup
soup = BeautifulStoneSoup(fh)
return soup
def try_decode(string, encoding):
if hasattr(string, 'decode'):
string = string.decode(encoding)
return string
def is_iterable(candidate):
if sys.version_info < (2, 6):
return hasattr(candidate, 'next')
return isinstance(candidate, collections.Iterable)
@contextlib.contextmanager
def save_pos(fh):
"""
Save the position of the file handle, seek to the beginning, and
then restore the position.
"""
orig_pos = fh.tell()
fh.seek(0)
try:
yield fh
finally:
fh.seek(orig_pos)
class OfxFile(object):
def __init__(self, fh):
"""
fh should be a seekable file-like byte stream object
"""
self.headers = odict.OrderedDict()
self.fh = fh
if not is_iterable(self.fh):
return
if not hasattr(self.fh, "seek"):
return # fh is not a file object, we're doomed.
with save_pos(self.fh):
self.read_headers()
self.handle_encoding()
self.replace_NONE_headers()
def read_headers(self):
head_data = self.fh.read(1024 * 10)
head_data = head_data[:head_data.find(six.b('<'))]
for line in head_data.splitlines():
# Newline?
if line.strip() == six.b(""):
break
header, value = line.split(six.b(":"))
header, value = header.strip().upper(), value.strip()
self.headers[header] = value
def handle_encoding(self):
"""
Decode the headers and wrap self.fh in a decoder such that it
subsequently returns only text.
"""
# decode the headers using ascii
ascii_headers = odict.OrderedDict(
(
key.decode('ascii', 'replace'),
value.decode('ascii', 'replace'),
)
for key, value in six.iteritems(self.headers)
)
enc_type = ascii_headers.get('ENCODING')
if not enc_type:
# no encoding specified, use the ascii-decoded headers
self.headers = ascii_headers
# decode the body as ascii as well
self.fh = codecs.lookup('ascii').streamreader(self.fh)
return
if enc_type == "USASCII":
cp = ascii_headers.get("CHARSET", "1252")
encoding = "cp%s" % (cp, )
elif enc_type in ("UNICODE", "UTF-8"):
encoding = "utf-8"
codec = codecs.lookup(encoding)
self.fh = codec.streamreader(self.fh)
# Decode the headers using the encoding
self.headers = odict.OrderedDict(
(key.decode(encoding), value.decode(encoding))
for key, value in six.iteritems(self.headers)
)
def replace_NONE_headers(self):
"""
Any headers that indicate 'none' should be replaced with Python
None values
"""
for header in self.headers:
if self.headers[header].upper() == 'NONE':
self.headers[header] = None
class OfxPreprocessedFile(OfxFile):
def __init__(self, fh):
super(OfxPreprocessedFile, self).__init__(fh)
if self.fh is None:
return
ofx_string = self.fh.read()
# find all closing tags as hints
closing_tags = [t.upper() for t in re.findall(r'(?i)([a-z0-9_\.]+)>',
ofx_string)]
# close all tags that don't have closing tags and
# leave all other data intact
last_open_tag = None
tokens = re.split(r'(?i)(?[a-z0-9_\.]+>)', ofx_string)
new_fh = StringIO()
for idx, token in enumerate(tokens):
is_closing_tag = token.startswith('')
is_processing_tag = token.startswith('')
is_cdata = token.startswith('" % last_open_tag)
last_open_tag = None
if is_open_tag:
tag_name = re.findall(r'(?i)<([a-z0-9_\.]+)>', token)[0]
if tag_name.upper() not in closing_tags:
last_open_tag = tag_name
new_fh.write(token)
new_fh.seek(0)
self.fh = new_fh
class Ofx(object):
def __str__(self):
return ""
# headers = "\r\n".join(":".join(el if el else "NONE" for el in item)
# for item in six.iteritems(self.headers))
# headers += "\r\n\r\n"
#
# return headers + str(self.signon)
class AccountType(object):
(Unknown, Bank, CreditCard, Investment) = range(0, 4)
class Account(object):
def __init__(self):
self.curdef = None
self.statement = None
self.account_id = ''
self.routing_number = ''
self.branch_id = ''
self.account_type = ''
self.institution = None
self.type = AccountType.Unknown
# Used for error tracking
self.warnings = []
@property
def number(self):
# For backwards compatibility. Remove in version 1.0.
return self.account_id
class InvestmentAccount(Account):
def __init__(self):
super(InvestmentAccount, self).__init__()
self.brokerid = ''
class BrokerageBalance:
def __init__(self):
self.name = None
self.description = None
self.value = None # decimal
class Security:
def __init__(self, uniqueid, name, ticker, memo):
self.uniqueid = uniqueid
self.name = name
self.ticker = ticker
self.memo = memo
class Signon:
def __init__(self, keys):
self.code = keys['code']
self.severity = keys['severity']
self.message = keys['message']
self.dtserver = keys['dtserver']
self.language = keys['language']
self.dtprofup = keys['dtprofup']
self.fi_org = keys['org']
self.fi_fid = keys['fid']
self.intu_bid = keys['intu.bid']
if int(self.code) == 0:
self.success = True
else:
self.success = False
def __str__(self):
ret = "\t\r\n" + "\t\t\r\n" + \
"\t\t\t\r\n"
ret += "\t\t\t\t%s\r\n" % self.code
ret += "\t\t\t\t%s\r\n" % self.severity
if self.message:
ret += "\t\t\t\t%s\r\n" % self.message
ret += "\t\t\t
\r\n"
if self.dtserver is not None:
ret += "\t\t\t" + self.dtserver + "\r\n"
if self.language is not None:
ret += "\t\t\t" + self.language + "\r\n"
if self.dtprofup is not None:
ret += "\t\t\t" + self.dtprofup + "\r\n"
if (self.fi_org is not None) or (self.fi_fid is not None):
ret += "\t\t\t\r\n"
if self.fi_org is not None:
ret += "\t\t\t\t" + self.fi_org + "\r\n"
if self.fi_fid is not None:
ret += "\t\t\t\t" + self.fi_fid + "\r\n"
ret += "\t\t\t\r\n"
if self.intu_bid is not None:
ret += "\t\t\t" + self.intu_bid + "\r\n"
ret += "\t\t\r\n"
ret += "\t\r\n"
return ret
class Statement(object):
def __init__(self):
self.start_date = ''
self.end_date = ''
self.currency = ''
self.transactions = []
# Error tracking:
self.discarded_entries = []
self.warnings = []
class InvestmentStatement(object):
def __init__(self):
self.positions = []
self.transactions = []
# Error tracking:
self.discarded_entries = []
self.warnings = []
class Transaction(object):
def __init__(self):
self.payee = ''
self.type = ''
self.date = None
self.amount = None
self.id = ''
self.memo = ''
self.sic = None
self.mcc = ''
self.checknum = ''
def __repr__(self):
return ""
class InvestmentTransaction(object):
AGGREGATE_TYPES = ['buydebt', 'buymf', 'buyopt', 'buyother',
'buystock', 'closureopt', 'income',
'invexpense', 'jrnlfund', 'jrnlsec',
'margininterest', 'reinvest', 'retofcap',
'selldebt', 'sellmf', 'sellopt', 'sellother',
'sellstock', 'split', 'transfer']
def __init__(self, type):
self.type = type.lower()
self.tradeDate = None
self.settleDate = None
self.memo = ''
self.security = ''
self.income_type = ''
self.units = decimal.Decimal(0)
self.unit_price = decimal.Decimal(0)
self.commission = decimal.Decimal(0)
self.fees = decimal.Decimal(0)
self.total = decimal.Decimal(0)
def __repr__(self):
return ""
class Position(object):
def __init__(self):
self.security = ''
self.units = decimal.Decimal(0)
self.unit_price = decimal.Decimal(0)
class Institution(object):
def __init__(self):
self.organization = ''
self.fid = ''
class OfxParserException(Exception):
pass
class OfxParser(object):
@classmethod
def parse(cls_, file_handle, fail_fast=True):
'''
parse is the main entry point for an OfxParser. It takes a file
handle and an optional log_errors flag.
If fail_fast is True, the parser will fail on any errors.
If fail_fast is False, the parser will log poor statements in the
statement class and continue to run. Note: the library does not
guarantee that no exceptions will be raised to the caller, only
that statements will include bad transactions (which are marked).
'''
cls_.fail_fast = fail_fast
if not hasattr(file_handle, 'seek'):
raise TypeError(six.u('parse() accepts a seek-able file handle\
, not %s' % type(file_handle).__name__))
ofx_obj = Ofx()
# Store the headers
ofx_file = OfxPreprocessedFile(file_handle)
ofx_obj.headers = ofx_file.headers
ofx_obj.accounts = []
ofx_obj.signon = None
skip_headers(ofx_file.fh)
ofx = soup_maker(ofx_file.fh)
if ofx.find('ofx') is None:
raise OfxParserException('The ofx file is empty!')
sonrs_ofx = ofx.find('sonrs')
if sonrs_ofx:
ofx_obj.signon = cls_.parseSonrs(sonrs_ofx)
stmttrnrs = ofx.find('stmttrnrs')
if stmttrnrs:
stmttrnrs_trnuid = stmttrnrs.find('trnuid')
if stmttrnrs_trnuid:
ofx_obj.trnuid = stmttrnrs_trnuid.contents[0].strip()
stmttrnrs_status = stmttrnrs.find('status')
if stmttrnrs_status:
ofx_obj.status = {}
ofx_obj.status['code'] = int(
stmttrnrs_status.find('code').contents[0].strip()
)
ofx_obj.status['severity'] = \
stmttrnrs_status.find('severity').contents[0].strip()
stmtrs_ofx = ofx.findAll('stmtrs')
if stmtrs_ofx:
ofx_obj.accounts += cls_.parseStmtrs(stmtrs_ofx, AccountType.Bank)
ccstmtrs_ofx = ofx.findAll('ccstmtrs')
if ccstmtrs_ofx:
ofx_obj.accounts += cls_.parseStmtrs(
ccstmtrs_ofx, AccountType.CreditCard)
invstmtrs_ofx = ofx.findAll('invstmtrs')
if invstmtrs_ofx:
ofx_obj.accounts += cls_.parseInvstmtrs(invstmtrs_ofx)
seclist_ofx = ofx.find('seclist')
if seclist_ofx:
ofx_obj.security_list = cls_.parseSeclist(seclist_ofx)
else:
ofx_obj.security_list = None
acctinfors_ofx = ofx.find('acctinfors')
if acctinfors_ofx:
ofx_obj.accounts += cls_.parseAcctinfors(acctinfors_ofx, ofx)
fi_ofx = ofx.find('fi')
if fi_ofx:
for account in ofx_obj.accounts:
account.institution = cls_.parseOrg(fi_ofx)
if ofx_obj.accounts:
ofx_obj.account = ofx_obj.accounts[0]
return ofx_obj
@classmethod
def parseOfxDateTime(cls_, ofxDateTime):
# dateAsString looks something like 20101106160000.00[-5:EST]
# for 6 Nov 2010 4pm UTC-5 aka EST
# Some places (e.g. Newfoundland) have non-integer offsets.
res = re.search("\[(?P[-+]?\d+\.?\d*)\:\w*\]$", ofxDateTime)
if res:
tz = float(res.group('tz'))
else:
tz = 0
timeZoneOffset = datetime.timedelta(hours=tz)
res = re.search("^[0-9]*\.([0-9]{0,5})", ofxDateTime)
if res:
msec = datetime.timedelta(seconds=float("0." + res.group(1)))
else:
msec = datetime.timedelta(seconds=0)
try:
local_date = datetime.datetime.strptime(
ofxDateTime[:14], '%Y%m%d%H%M%S'
)
return local_date - timeZoneOffset + msec
except:
if ofxDateTime[:8] == "00000000":
return None
return datetime.datetime.strptime(
ofxDateTime[:8], '%Y%m%d') - timeZoneOffset + msec
@classmethod
def parseAcctinfors(cls_, acctinfors_ofx, ofx):
all_accounts = []
for i in acctinfors_ofx.findAll('acctinfo'):
accounts = []
if i.find('invacctinfo'):
accounts += cls_.parseInvstmtrs([i])
elif i.find('ccacctinfo'):
accounts += cls_.parseStmtrs([i], AccountType.CreditCard)
elif i.find('bankacctinfo'):
accounts += cls_.parseStmtrs([i], AccountType.Bank)
else:
continue
fi_ofx = ofx.find('fi')
if fi_ofx:
for account in ofx_obj.accounts:
account.institution = cls_.parseOrg(fi_ofx)
desc = i.find('desc')
if hasattr(desc, 'contents'):
for account in accounts:
account.desc = desc.contents[0].strip()
all_accounts += accounts
return all_accounts
@classmethod
def parseInvstmtrs(cls_, invstmtrs_list):
ret = []
for invstmtrs_ofx in invstmtrs_list:
account = InvestmentAccount()
acctid_tag = invstmtrs_ofx.find('acctid')
if (hasattr(acctid_tag, 'contents')):
try:
account.account_id = acctid_tag.contents[0].strip()
except IndexError:
account.warnings.append(
six.u("Empty acctid tag for %s") % invstmtrs_ofx)
if cls_.fail_fast:
raise
brokerid_tag = invstmtrs_ofx.find('brokerid')
if (hasattr(brokerid_tag, 'contents')):
try:
account.brokerid = brokerid_tag.contents[0].strip()
except IndexError:
account.warnings.append(
six.u("Empty brokerid tag for %s") % invstmtrs_ofx)
if cls_.fail_fast:
raise
account.type = AccountType.Investment
if (invstmtrs_ofx):
account.statement = cls_.parseInvestmentStatement(
invstmtrs_ofx)
ret.append(account)
return ret
@classmethod
def parseSeclist(cls_, seclist_ofx):
securityList = []
for secinfo_ofx in seclist_ofx.findAll('secinfo'):
uniqueid_tag = secinfo_ofx.find('uniqueid')
name_tag = secinfo_ofx.find('secname')
ticker_tag = secinfo_ofx.find('ticker')
memo_tag = secinfo_ofx.find('memo')
if uniqueid_tag and name_tag:
try:
ticker = ticker_tag.contents[0].strip()
except AttributeError:
# ticker can be empty
ticker = None
try:
memo = memo_tag.contents[0].strip()
except AttributeError:
# memo can be empty
memo = None
securityList.append(
Security(uniqueid_tag.contents[0].strip(),
name_tag.contents[0].strip(),
ticker,
memo))
return securityList
@classmethod
def parseInvestmentPosition(cls_, ofx):
position = Position()
tag = ofx.find('uniqueid')
if (hasattr(tag, 'contents')):
position.security = tag.contents[0].strip()
tag = ofx.find('units')
if (hasattr(tag, 'contents')):
position.units = cls_.toDecimal(tag)
tag = ofx.find('unitprice')
if (hasattr(tag, 'contents')):
position.unit_price = cls_.toDecimal(tag)
tag = ofx.find('dtpriceasof')
if (hasattr(tag, 'contents')):
try:
position.date = cls_.parseOfxDateTime(tag.contents[0].strip())
except ValueError:
raise
return position
@classmethod
def parseInvestmentTransaction(cls_, ofx):
transaction = InvestmentTransaction(ofx.name)
tag = ofx.find('fitid')
if (hasattr(tag, 'contents')):
transaction.id = tag.contents[0].strip()
tag = ofx.find('memo')
if (hasattr(tag, 'contents')):
transaction.memo = tag.contents[0].strip()
tag = ofx.find('dttrade')
if (hasattr(tag, 'contents')):
try:
transaction.tradeDate = cls_.parseOfxDateTime(
tag.contents[0].strip())
except ValueError:
raise
tag = ofx.find('dtsettle')
if (hasattr(tag, 'contents')):
try:
transaction.settleDate = cls_.parseOfxDateTime(
tag.contents[0].strip())
except ValueError:
raise
tag = ofx.find('uniqueid')
if (hasattr(tag, 'contents')):
transaction.security = tag.contents[0].strip()
tag = ofx.find('incometype')
if (hasattr(tag, 'contents')):
transaction.income_type = tag.contents[0].strip()
tag = ofx.find('units')
if (hasattr(tag, 'contents')):
transaction.units = cls_.toDecimal(tag)
tag = ofx.find('unitprice')
if (hasattr(tag, 'contents')):
transaction.unit_price = cls_.toDecimal(tag)
tag = ofx.find('commission')
if (hasattr(tag, 'contents')):
transaction.commission = cls_.toDecimal(tag)
tag = ofx.find('fees')
if (hasattr(tag, 'contents')):
transaction.fees = cls_.toDecimal(tag)
tag = ofx.find('total')
if (hasattr(tag, 'contents')):
transaction.total = cls_.toDecimal(tag)
tag = ofx.find('inv401ksource')
if (hasattr(tag, 'contents')):
transaction.inv401ksource = tag.contents[0].strip()
return transaction
@classmethod
def parseInvestmentStatement(cls_, invstmtrs_ofx):
statement = InvestmentStatement()
currency_tag = invstmtrs_ofx.find('curdef')
if hasattr(currency_tag, "contents"):
statement.currency = currency_tag.contents[0].strip().lower()
invtranlist_ofx = invstmtrs_ofx.find('invtranlist')
if (invtranlist_ofx is not None):
tag = invtranlist_ofx.find('dtstart')
if (hasattr(tag, 'contents')):
try:
statement.start_date = cls_.parseOfxDateTime(
tag.contents[0].strip())
except IndexError:
statement.warnings.append(six.u('Empty start date.'))
if cls_.fail_fast:
raise
except ValueError:
e = sys.exc_info()[1]
statement.warnings.append(six.u('Invalid start date:\
%s') % e)
if cls_.fail_fast:
raise
tag = invtranlist_ofx.find('dtend')
if (hasattr(tag, 'contents')):
try:
statement.end_date = cls_.parseOfxDateTime(
tag.contents[0].strip())
except IndexError:
statement.warnings.append(six.u('Empty end date.'))
except ValueError:
e = sys.exc_info()[1]
statement.warnings.append(six.u('Invalid end date: \
%s') % e)
if cls_.fail_fast:
raise
for transaction_type in ['posmf', 'posstock', 'posopt']:
try:
for investment_ofx in invstmtrs_ofx.findAll(transaction_type):
statement.positions.append(
cls_.parseInvestmentPosition(investment_ofx))
except (ValueError, IndexError, decimal.InvalidOperation,
TypeError):
e = sys.exc_info()[1]
if cls_.fail_fast:
raise
statement.discarded_entries.append(
{six.u('error'): six.u("Error parsing positions: \
") + str(e), six.u('content'): investment_ofx}
)
for transaction_type in InvestmentTransaction.AGGREGATE_TYPES:
try:
for investment_ofx in invstmtrs_ofx.findAll(transaction_type):
statement.transactions.append(
cls_.parseInvestmentTransaction(investment_ofx))
except (ValueError, IndexError, decimal.InvalidOperation):
e = sys.exc_info()[1]
if cls_.fail_fast:
raise
statement.discarded_entries.append(
{six.u('error'): transaction_type + ": " + str(e),
six.u('content'): investment_ofx}
)
invbal_ofx = invstmtrs_ofx.find('invbal')
if invbal_ofx is not None:
#18073.98+00000000000.00+00000000000.00+00000000000.00
availcash_ofx = invbal_ofx.find('availcash')
if availcash_ofx is not None:
statement.available_cash = cls_.toDecimal(availcash_ofx)
margin_balance_ofx = invbal_ofx.find('marginbalance')
if margin_balance_ofx is not None:
statement.margin_balance = cls_.toDecimal(margin_balance_ofx)
short_balance_ofx = invbal_ofx.find('shortbalance')
if short_balance_ofx is not None:
statement.short_balance = cls_.toDecimal(short_balance_ofx)
buy_power_ofx = invbal_ofx.find('buypower')
if buy_power_ofx is not None:
statement.buy_power = cls_.toDecimal(buy_power_ofx)
ballist_ofx = invbal_ofx.find('ballist')
if ballist_ofx is not None:
statement.balance_list = []
for balance_ofx in ballist_ofx.findAll('bal'):
brokerage_balance = BrokerageBalance()
name_ofx = balance_ofx.find('name')
if name_ofx is not None:
brokerage_balance.name = name_ofx.contents[0].strip()
description_ofx = balance_ofx.find('desc')
if description_ofx is not None:
brokerage_balance.description = description_ofx.contents[0].strip()
value_ofx = balance_ofx.find('value')
if value_ofx is not None:
brokerage_balance.value = cls_.toDecimal(value_ofx)
statement.balance_list.append(brokerage_balance)
return statement
@classmethod
def parseOrg(cls_, fi_ofx):
institution = Institution()
org = fi_ofx.find('org')
if hasattr(org, 'contents'):
institution.organization = org.contents[0].strip()
fid = fi_ofx.find('fid')
if hasattr(fid, 'contents'):
institution.fid = fid.contents[0].strip()
return institution
@classmethod
def parseSonrs(cls_, sonrs):
items = [
'code',
'severity',
'dtserver',
'language',
'dtprofup',
'org',
'fid',
'intu.bid',
'message'
]
idict = {}
for i in items:
try:
idict[i] = sonrs.find(i).contents[0].strip()
except:
idict[i] = None
idict['code'] = int(idict['code'])
if idict['message'] is None:
idict['message'] = ''
return Signon(idict)
@classmethod
def parseStmtrs(cls_, stmtrs_list, accountType):
''' Parse the tags and return a list of Accounts object. '''
ret = []
for stmtrs_ofx in stmtrs_list:
account = Account()
act_curdef = stmtrs_ofx.find('curdef')
if act_curdef:
account.curdef = act_curdef.contents[0].strip()
acctid_tag = stmtrs_ofx.find('acctid')
if hasattr(acctid_tag, 'contents'):
account.account_id = acctid_tag.contents[0].strip()
bankid_tag = stmtrs_ofx.find('bankid')
if hasattr(bankid_tag, 'contents'):
account.routing_number = bankid_tag.contents[0].strip()
branchid_tag = stmtrs_ofx.find('branchid')
if hasattr(branchid_tag, 'contents'):
account.branch_id = branchid_tag.contents[0].strip()
type_tag = stmtrs_ofx.find('accttype')
if hasattr(type_tag, 'contents'):
account.account_type = type_tag.contents[0].strip()
account.type = accountType
if stmtrs_ofx:
account.statement = cls_.parseStatement(stmtrs_ofx)
ret.append(account)
return ret
@classmethod
def parseBalance(cls_, statement, stmt_ofx, bal_tag_name, bal_attr,
bal_date_attr, bal_type_string):
bal_tag = stmt_ofx.find(bal_tag_name)
if hasattr(bal_tag, "contents"):
balamt_tag = bal_tag.find('balamt')
dtasof_tag = bal_tag.find('dtasof')
if hasattr(balamt_tag, "contents"):
try:
setattr(statement, bal_attr, cls_.toDecimal(balamt_tag))
except (IndexError, decimal.InvalidOperation):
ex = sys.exc_info()[1]
statement.warnings.append(
six.u("%s balance amount was empty for \
%s") % (bal_type_string, stmt_ofx))
if cls_.fail_fast:
raise OfxParserException("Empty %s balance\
" % bal_type_string)
if hasattr(dtasof_tag, "contents"):
try:
setattr(statement, bal_date_attr, cls_.parseOfxDateTime(
dtasof_tag.contents[0].strip()))
except IndexError:
statement.warnings.append(
six.u("%s balance date was empty for %s\
") % (bal_type_string, stmt_ofx))
if cls_.fail_fast:
raise
except ValueError:
statement.warnings.append(
six.u("%s balance date was not allowed for \
%s") % (bal_type_string, stmt_ofx))
if cls_.fail_fast:
raise
@classmethod
def parseStatement(cls_, stmt_ofx):
'''
Parse a statement in ofx-land and return a Statement object.
'''
statement = Statement()
dtstart_tag = stmt_ofx.find('dtstart')
if hasattr(dtstart_tag, "contents"):
try:
statement.start_date = cls_.parseOfxDateTime(
dtstart_tag.contents[0].strip())
except IndexError:
statement.warnings.append(
six.u("Statement start date was empty for %s") % stmt_ofx)
if cls_.fail_fast:
raise
except ValueError:
statement.warnings.append(
six.u("Statement start date was not allowed for \
%s") % stmt_ofx)
if cls_.fail_fast:
raise
dtend_tag = stmt_ofx.find('dtend')
if hasattr(dtend_tag, "contents"):
try:
statement.end_date = cls_.parseOfxDateTime(
dtend_tag.contents[0].strip())
except IndexError:
statement.warnings.append(
six.u("Statement start date was empty for %s") % stmt_ofx)
if cls_.fail_fast:
raise
except ValueError:
ve = sys.exc_info()[1]
msg = six.u("Statement start date was not formatted "
"correctly for %s")
statement.warnings.append(msg % stmt_ofx)
if cls_.fail_fast:
raise
except TypeError:
statement.warnings.append(
six.u("Statement start date was not allowed for \
%s") % stmt_ofx)
if cls_.fail_fast:
raise
currency_tag = stmt_ofx.find('curdef')
if hasattr(currency_tag, "contents"):
try:
statement.currency = currency_tag.contents[0].strip().lower()
except IndexError:
statement.warnings.append(
six.u("Currency definition was empty for %s") % stmt_ofx)
if cls_.fail_fast:
raise
cls_.parseBalance(statement, stmt_ofx, 'ledgerbal',
'balance', 'balance_date', 'ledger')
cls_.parseBalance(statement, stmt_ofx, 'availbal', 'available_balance',
'available_balance_date', 'ledger')
for transaction_ofx in stmt_ofx.findAll('stmttrn'):
try:
statement.transactions.append(
cls_.parseTransaction(transaction_ofx))
except OfxParserException:
ofxError = sys.exc_info()[1]
statement.discarded_entries.append(
{'error': str(ofxError), 'content': transaction_ofx})
if cls_.fail_fast:
raise
return statement
@classmethod
def parseTransaction(cls_, txn_ofx):
'''
Parse a transaction in ofx-land and return a Transaction object.
'''
transaction = Transaction()
type_tag = txn_ofx.find('trntype')
if hasattr(type_tag, 'contents'):
try:
transaction.type = type_tag.contents[0].lower().strip()
except IndexError:
raise OfxParserException(six.u("Empty transaction type"))
except TypeError:
raise OfxParserException(
six.u("No Transaction type (a required field)"))
name_tag = txn_ofx.find('name')
if hasattr(name_tag, "contents"):
try:
transaction.payee = name_tag.contents[0].strip()
except IndexError:
raise OfxParserException(six.u("Empty transaction name"))
except TypeError:
raise OfxParserException(
six.u("No Transaction name (a required field)"))
memo_tag = txn_ofx.find('memo')
if hasattr(memo_tag, "contents"):
try:
transaction.memo = memo_tag.contents[0].strip()
except IndexError:
# Memo can be empty.
pass
except TypeError:
pass
amt_tag = txn_ofx.find('trnamt')
if hasattr(amt_tag, "contents"):
try:
transaction.amount = cls_.toDecimal(amt_tag)
except IndexError:
raise OfxParserException("Invalid Transaction Date")
except decimal.InvalidOperation:
# Some banks use a null transaction for including interest
# rate changes on your statement.
if amt_tag.contents[0].strip() in ('null', '-null'):
transaction.amount = 0
else:
raise OfxParserException(
six.u("Invalid Transaction Amount: '%s'") % amt_tag.contents[0])
except TypeError:
raise OfxParserException(
six.u("No Transaction Amount (a required field)"))
else:
raise OfxParserException(
six.u("Missing Transaction Amount (a required field)"))
date_tag = txn_ofx.find('dtposted')
if hasattr(date_tag, "contents"):
try:
transaction.date = cls_.parseOfxDateTime(
date_tag.contents[0].strip())
except IndexError:
raise OfxParserException("Invalid Transaction Date")
except ValueError:
ve = sys.exc_info()[1]
raise OfxParserException(str(ve))
except TypeError:
raise OfxParserException(
six.u("No Transaction Date (a required field)"))
else:
raise OfxParserException(
six.u("Missing Transaction Date (a required field)"))
id_tag = txn_ofx.find('fitid')
if hasattr(id_tag, "contents"):
try:
transaction.id = id_tag.contents[0].strip()
except IndexError:
raise OfxParserException(six.u("Empty FIT id (a required \
field)"))
except TypeError:
raise OfxParserException(six.u("No FIT id (a required field)"))
else:
raise OfxParserException(six.u("Missing FIT id (a required \
field)"))
sic_tag = txn_ofx.find('sic')
if hasattr(sic_tag, 'contents'):
try:
transaction.sic = sic_tag.contents[0].strip()
except IndexError:
raise OfxParserException(six.u("Empty transaction Standard \
Industry Code (SIC)"))
if transaction.sic is not None and transaction.sic in mcc.codes:
try:
transaction.mcc = mcc.codes.get(transaction.sic, '').get('combined \
description')
except IndexError:
raise OfxParserException(six.u("Empty transaction Merchant Category \
Code (MCC)"))
except AttributeError:
if cls._fail_fast:
raise
checknum_tag = txn_ofx.find('checknum')
if hasattr(checknum_tag, 'contents'):
try:
transaction.checknum = checknum_tag.contents[0].strip()
except IndexError:
raise OfxParserException(six.u("Empty Check (or other reference) \
number"))
return transaction
@classmethod
def toDecimal(cls_, tag):
d = tag.contents[0].strip()
if '.' not in d and ',' in d:
d = d.replace(',', '.')
return decimal.Decimal(d)