Source code for beansoup.plugins.deposit_in_transit

"""Work in progress. It works, but needs documentation and some cleaning.

A plugin that automatically ties split deposit-in-transit transactions.

usage: beansoup.plugins.deposit_in_transit [--dit_component NAME]
                                           [--auto_open] [--same_day_merge]
                                           [--flag_pending]
                                           [--cleared_tag TAG]
                                           [--pending_tag TAG]
                                           [--ignored_tag TAG]
                                           [--link_prefix PREFIX]
                                           [--skip_re REGEX]

optional arguments:
  --dit_component NAME  use NAME as the component name distinguishing deposit-
                        in-transit accounts (default: DIT)
  --auto_open           automatically open deposit-in-transit accounts
                        (default: False)
  --same_day_merge      merge same-day transactions with matching deposit-in-
                        transit postings (default: False)
  --flag_pending        annotate pending transactions with a ! flag (default:
                        False)
  --cleared_tag TAG     tag cleared transactions with TAG (default: DEPOSITED)
  --pending_tag TAG     tag pending transactions with TAG (default: IN-
                        TRANSIT)
  --ignored_tag TAG     ignore transactions that have a TAG tag (default:
                        IGNORED)
  --link_prefix PREFIX  link pairs of cleared transactions with PREFIX string
                        followed by increasing count; otherwise it uses UUIDs
                        (default: None)
  --skip_re REGEX       disable plugin if REGEX matches any sys.argv (default:
                        None)
"""

import argparse
import collections
import itertools
import sys

from beancount.core import data, flags, getters
from beancount.core.account import has_component

from beansoup.plugins import config
from beansoup.utils import links

__plugins__ = ('plugin',)


DITError = collections.namedtuple('DITError', 'source message entry')


[docs]def plugin(entries, options_map, config_string): # Parse plugin config; report errors if any parser = config.ArgumentParser( prog=__name__, description='A plugin that automatically ties split deposit-in-transit transactions.', formatter_class=argparse.ArgumentDefaultsHelpFormatter, add_help=False, entries_filename=options_map['filename']) parser.add_argument( '--dit_component', metavar='NAME', default='DIT', help='use %(metavar)s as the component name distinguishing deposit-in-transit accounts') parser.add_argument( '--auto_open', action='store_true', default=False, help='automatically open deposit-in-transit accounts') parser.add_argument( '--same_day_merge', action='store_true', default=False, help='merge same-day transactions with matching deposit-in-transit postings') parser.add_argument( '--flag_pending', action='store_true', default=False, help='annotate pending transactions with a {} flag'.format(flags.FLAG_WARNING)) parser.add_argument( '--cleared_tag', metavar='TAG', default='DEPOSITED', help='tag cleared transactions with %(metavar)s') parser.add_argument( '--pending_tag', metavar='TAG', default='IN-TRANSIT', help='tag pending transactions with %(metavar)s') parser.add_argument( '--ignored_tag', metavar='TAG', default='IGNORED', help='ignore transactions that have a %(metavar)s tag') parser.add_argument( '--link_prefix', metavar='PREFIX', default=None, help='link pairs of cleared transactions with %(metavar)s string followed by increasing count; otherwise it uses UUIDs') parser.add_argument( '--skip_re', metavar='REGEX', default=None, type=config.re_type, help='disable plugin if %(metavar)s matches any sys.argv') try: args = parser.parse_args((config_string or '').split()) except config.ParseError as error: return entries, [error] # If the plugin was called with the --skip_re option and the given # regular expression matches any of the arguments in sys.argv, # do not run the plugin and return the original entries instead. if args.skip_re and any(args.skip_re.match(arg) for arg in sys.argv): return entries, [] unchanged_entries, new_entries, errors = process_entries(entries, args) # FIXME: Consider printing the pending entries. Maybe return errors for them. return sorted(unchanged_entries + new_entries, key=data.entry_sortkey), errors
[docs]def process_entries(entries, args): new_entries = [] if args.auto_open: new_entries.extend(open_dit_accounts(entries, args.dit_component)) # Find all DIT transactions to be processed; their original entries # will be replaced by new ones dits, unchanged_entries, errors = split_entries( entries, dit_component=args.dit_component, ignored_tag=args.ignored_tag) pairs, singletons, pairing_errors = pair_dits( dits, dit_component=args.dit_component) errors.extend(pairing_errors) cleared_links = links.count(args.link_prefix) for pair in pairs: new_entries.extend(process_pair( pair, cleared_tag=args.cleared_tag, cleared_links=cleared_links, same_day_merge=args.same_day_merge)) new_entries.extend( [process_singleton(singleton, flag_pending=args.flag_pending, pending_tag=args.pending_tag) for singleton in singletons]) return unchanged_entries, new_entries, errors
[docs]def open_dit_accounts(entries, dit_component): """ Minimally adapted from beancount.plugins.auto_accounts. """ opened_accounts = {entry.account for entry in entries if isinstance(entry, data.Open)} new_entries = [] accounts_first, _ = getters.get_accounts_use_map(entries) for index, (account, date_first_used) in enumerate(sorted(accounts_first.items())): if ((account not in opened_accounts) and has_component(account, dit_component)): meta = data.new_metadata(__name__, index) new_entry = data.Open(meta, date_first_used, account, None, None) new_entries.append(new_entry) return new_entries
[docs]def split_entries(entries, dit_component, ignored_tag): dits, unchanged_entries, errors = [], [], [] for entry in entries: if (isinstance(entry, data.Transaction) and not (entry.tags and ignored_tag in entry.tags)): dit_postings = [posting for posting in entry.postings if has_component(posting.account, dit_component)] num_dit_postings = len(dit_postings) else: num_dit_postings = 0 if num_dit_postings == 0: unchanged_entries.append(entry) else: dits.append(data.TxnPosting(entry, dit_postings[0])) if num_dit_postings > 1: errors.append(DITError( entry.meta, "(deposit_in_transit) Found entry with multiple postings to DIT accounts; " "only processing posting to {} account".format( dit_postings[0].account), entry)) return dits, unchanged_entries, errors
[docs]def pair_dits(dits, dit_component): # A map from amounts to all DIT postings (as a TxnPosting) sharing # that amount units_map = collections.defaultdict(list) for dit in dits: units_map[dit.posting.units].append(dit) pairs, singletons, errors = [], [], [] skip_ids = set() for dit in dits: if id(dit.txn) in skip_ids: continue units_map[dit.posting.units].remove(dit) dit2 = match_dit(dit, units_map.get(-dit.posting.units), dit_component) if dit2: # Found matching DIT transaction pairs.append((dit, dit2)) skip_ids |= {id(dit2.txn)} units_map[dit2.posting.units].remove(dit2) else: singletons.append(dit) return pairs, singletons, errors
[docs]def match_dit(dit, candidate_dits, dit_component): # FIXME: check DIT- and base-accounts match return candidate_dits[0] if candidate_dits else None
[docs]def process_pair(pair, cleared_tag, cleared_links, same_day_merge): def tag_and_link(entry, cleared_link): tags = (entry.tags or set()) | {cleared_tag} links = (entry.links or set()) | {cleared_link} return entry._replace(tags=tags, links=links) def xform_posting(posting): return data.Posting(posting.account, -posting.units, None, None, None, None) # The first in the pair should be the sender; the second, the receiver. if pair[0].posting.units < pair[1].posting.units: pair = (pair[1], pair[0]) date = max(pair[0].txn.date, pair[1].txn.date) if pair[0].txn.narration == pair[1].txn.narration: narration = pair[0].txn.narration else: narration = '{} / {}'.format(pair[0].txn.narration, pair[1].txn.narration) if pair[0].txn.payee is None: payee = pair[1].txn.payee elif pair[1].txn.payee is None: payee = pair[0].txn.payee elif pair[0].txn.payee == pair[1].txn.payee: payee = pair[0].txn.payee else: payee = '{} / {}'.format(pair[0].txn.payee, pair[1].txn.payee) if same_day_merge and is_pair_mergeable(pair): # Merge the two transactions meta = pair[0].txn.meta flag = pair[0].txn.flag tags = ((pair[0].txn.tags or set()) | (pair[1].txn.tags or set()) | {cleared_tag}) links = ((pair[0].txn.links or set()) | (pair[1].txn.links or set())) or None postings = ([posting for posting in pair[0].txn.postings if posting is not pair[0].posting] + [posting for posting in pair[1].txn.postings if posting is not pair[1].posting]) new_entry = data.Transaction( meta, date, flag, payee, narration, tags, links, postings) return (new_entry, ) # Make sure the connecting entry will be shown between the two existing # ones when looking at the list of entries for their common link lineno = int((pair[0].txn.meta.get('lineno', 0) + pair[1].txn.meta.get('lineno', 0)) / 2) meta = data.new_metadata(__name__, lineno) cleared_link = next(cleared_links) new_entry = data.Transaction( meta, date, flags.FLAG_OKAY, payee, narration, {cleared_tag}, {cleared_link}, [xform_posting(pair[0].posting), xform_posting(pair[1].posting)]) return (tag_and_link(pair[0].txn, cleared_link), tag_and_link(pair[1].txn, cleared_link), new_entry)
[docs]def is_pair_mergeable(pair): if pair[0].txn.flag != pair[1].txn.flag: return False if pair[0].txn.date != pair[1].txn.date: return False if (pair[0].posting.cost or pair[0].posting.price or pair[1].posting.cost or pair[1].posting.price): return False return True
[docs]def process_singleton(singleton, flag_pending, pending_tag): entry = singleton.txn flag = flags.FLAG_WARNING if flag_pending else entry.flag tags = (entry.tags or set()) | {pending_tag} return entry._replace(flag=flag, tags=tags)