Source code for beansoup.plugins.clear_transactions

"""Work in progress. It works, but needs documentation and some cleaning.
"""

import argparse
import collections
import datetime
import itertools

from beancount.core import data, getters, flags

from beansoup.plugins import config
from beansoup.utils import dates

__plugins__ = ('clear_transactions',)


[docs]class AccountPairType: def __init__(self, entries): self.existing_accounts = getters.get_accounts(entries) def __call__(self, string): accounts = string.split(',') if len(accounts) != 2: msg = "invalid account pair: '{}'; expecting clearing and main account names separated by a comma (no spaces)".format(string) raise argparse.ArgumentTypeError(msg) for account in accounts: if account not in self.existing_accounts: msg = "account '{}' does not exist".format(account) raise argparse.ArgumentTypeError(msg) return tuple(accounts)
[docs]def clear_transactions(entries, options_map, config_string): # Parse plugin config; report errors if any parser = config.ArgumentParser( prog=__name__, description='A plugin that automatically tags cleared and pending transactions.', formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False, entries_filename=options_map['filename']) parser.add_argument( '--flag_pending', action='store_true', default=False, help='annotate pending transactions with a {} flag'.format(flags.FLAG_WARNING)) parser.add_argument( '--cleared_tag', metavar='TAG', default='CLEARED', help='tag cleared transactions with %(metavar)s') parser.add_argument( '--pending_tag', metavar='TAG', default='PENDING', help='tag pending transactions with %(metavar)s') parser.add_argument( '--ignored_tag', metavar='TAG', default='PRE_CLEARED', help='ignore transactions that have a %(metavar)s tag') parser.add_argument( '--link_prefix', metavar='PREFIX', default='cleared', help='link pairs of cleared transactions with %(metavar)s string followed by increasing count') parser.add_argument( '--max_days', metavar='N', type=int, default=7, help='only pair transactions if they occurred no more than %(metavar)s days apart') parser.add_argument( '--skip_weekends', action='store_true', default=False, help='skip weekends when measuring the time gap between transactions') parser.add_argument( 'account_pairs', metavar='CLEARING_ACCOUNT,MAIN_ACCOUNT', nargs='+', type=AccountPairType(entries), help='the names of a clearing account and its main account, separated by a comma (no space)') try: args = parser.parse_args((config_string or '').split()) except config.ParseError as error: return entries, [error] processor = Processor(args) modified_entries, errors = processor.clear_transactions(entries) # FIXME: Consider printing the pending entries. Maybe return errors for them. return [modified_entries.get(id(entry), entry) for entry in entries], errors
[docs]class Processor: def __init__(self, args): self.flag_pending = args.flag_pending self.cleared_tag_name = args.cleared_tag self.pending_tag_name = args.pending_tag self.ignored_tag_name = args.ignored_tag self.cleared_link_prefix = args.link_prefix self.max_delta_days = args.max_days self.skip_weekends = args.skip_weekends self.clearing_accounts = dict(args.account_pairs) self.modified_entries = None self.link_count = itertools.count(start=1)
[docs] def clear_transactions(self, entries): errors = [] self.modified_entries = {} groups = collections.defaultdict(list) for entry in entries: if (not isinstance(entry, data.Transaction) or (entry.tags and self.ignored_tag_name in entry.tags)): continue posting = self.get_txn_clearing_posting(entry) if posting: groups[posting.account].append(data.TxnPosting(entry, posting)) # NOTE: sorting is only needed to support testing for _, txn_postings in sorted(groups.items(), key=lambda x: x[0]): self.clear_transaction_group(txn_postings) return self.modified_entries, errors
[docs] def get_txn_clearing_posting(self, txn): # This code implicitly assumes that a transaction can only have # one posting to a clearing account for posting in txn.postings: if posting.account in self.clearing_accounts: return posting
[docs] def clear_transaction_group(self, txn_postings): # Make sure the transactions are sorted; # other plugins could have changed their order txn_postings = collections.deque( sorted(txn_postings, key=lambda x: data.entry_sortkey(x.txn))) while txn_postings: txn_posting = txn_postings.popleft() if id(txn_posting.txn) in self.modified_entries: # This transaction has already been cleared continue # Look for matching transactions within a maximum time delta max_date = self.max_matching_date(txn_posting.txn) for txn_posting2 in itertools.takewhile(lambda x: x.txn.date <= max_date, txn_postings): if id(txn_posting2.txn) in self.modified_entries: # This transaction has already been cleared continue if self.match_txn_postings(txn_posting, txn_posting2): # Found match; link the transactions and tag them as cleared link_name = '{}-{}'.format(self.cleared_link_prefix, next(self.link_count)) txn = txn_posting.txn self.modified_entries[id(txn)] = txn._replace( tags=(txn.tags or set()) | set((self.cleared_tag_name,)), links=(txn.links or set()) | set((link_name,))) txn2 = txn_posting2.txn self.modified_entries[id(txn2)] = txn2._replace( tags=(txn2.tags or set()) | set((self.cleared_tag_name,)), links=(txn2.links or set()) | set((link_name,))) break else: # No match; mark the transaction as pending txn = txn_posting.txn self.modified_entries[id(txn)] = txn._replace( flag=flags.FLAG_WARNING if self.flag_pending else txn.flag, tags=(txn.tags or set()) | set((self.pending_tag_name,)))
[docs] def max_matching_date(self, txn): if self.skip_weekends: return dates.add_biz_days(txn.date, self.max_delta_days) return txn.date + datetime.timedelta(days=self.max_delta_days)
[docs] def match_txn_postings(self, txn_posting, txn_posting2): # We already know the two transactions are within the max time gap # and share a clearing account # We can have a match only if the postings to the clearing account # on the two transactions balance out to 0 if txn_posting.posting.units != -txn_posting2.posting.units: return False # We can have a match only if one and only one of the two transactions # has a posting to the main account related to their common clearing # account main_account = self.clearing_accounts[txn_posting.posting.account] num_main_account_postings = len( [posting for posting in (txn_posting.txn.postings + txn_posting2.txn.postings) if posting.account == main_account]) return num_main_account_postings == 1