#!/usr/bin/env python3 """Find cycles in a balance that could be resolved to minimize the balance""" import itertools import math import typing as T Cycle = tuple Cycles = T.Iterable[Cycle] def gen_cycles(vertices: set) -> Cycles: """Iterate all cycles possible in the vertices set The vertices set is expected to contain the vertices of a fully connected bidirectional graph. Like the balance used by geldschieberbot. A cycle must have a path length of at least 3 to be considered a cycle. The nature of the graph makes the cycle generation easy. All possible cycles are simply all possible subsets with at least 3 elements. """ for length in range(3, len(vertices) + 1): for subset in itertools.combinations(vertices, length): yield from itertools.permutations(subset, length) def test_gen_cycles(): """Test the cycle generation by comparing the number of generated cycles""" vertices = {1, 2, 3, 4} lenghts = range(3, len(vertices) + 1) num_cycles = sum( math.perm(x) * math.comb(len(vertices), x) for x in lenghts) assert len(list(gen_cycles(vertices))) == num_cycles Balance = dict[T.Any, dict[T.Any, int]] def has_same_flow_direction(balance: Balance, cycle: Cycle) -> bool: """Check if a cycle has the same money flow A cycle is considered to have the same money flow if all pairs have either a value greater or less than 0. Example: A -> B 10 B -> C 10 C -> A 10 """ edge = balance[cycle[-1]][cycle[0]] if edge == 0: return False direction = edge > 0 for i in range(0, len(cycle) - 1): edge = balance[cycle[i]][cycle[i + 1]] if not edge or (edge > 0) != direction: return False return True def test_has_same_flow_direction(): """Test the same flow detection logic""" # yapf: disable balance = {'A': {'B': 10, 'C': -10}, 'B': {'A': -10, 'C': 10}, 'C': {'A': 10, 'B': -10}} # yapf: enable assert has_same_flow_direction(balance, ['A', 'B', 'C']) assert has_same_flow_direction(balance, ['A', 'C', 'B']) # yapf: disable balance = {'A': {'B': 10, 'C': 0}, 'B': {'A': -10, 'C': 10}, 'C': {'A': 0, 'B': -10}} # yapf: enable assert not has_same_flow_direction(balance, ['A', 'B', 'C']) assert not has_same_flow_direction(balance, ['A', 'C', 'B']) def find_money_cicles(balance: Balance, cycles: Cycles) -> Cycles: """Find all cycles with the same money flow in a balance""" for cycle in cycles: if has_same_flow_direction(balance, cycle): yield cycle def exclude_dead_ends(balance: Balance) -> T.Iterable[str]: """Return all vertices that are no dead ends""" for vert, edges in balance.items(): last_edge = None for edge in edges.values(): if last_edge and (last_edge < 0 < edge or last_edge > 0 > edge): yield vert if edge: last_edge = edge MinimizeSuggestion = tuple[Cycle, int] def minimize(balance: Balance) -> list[MinimizeSuggestion]: """Greedy minimize a balance Return a list of cycles with the biggest delta that could be resolved to minimize the balance. """ new_balance = {p: sb.copy() for p, sb in balance.items()} suggestions: list[MinimizeSuggestion] = [] while True: relevant_vertices = set(exclude_dead_ends(new_balance)) cycles = list( find_money_cicles(new_balance, gen_cycles(relevant_vertices))) if not cycles: return suggestions deltas = [] for cycle in cycles: money = [] for i, vertex in enumerate(cycle): successor = cycle[(i + 1) % len(cycle)] money.append(new_balance[vertex][successor]) if money[0] < 0: delta = max(money) else: delta = min(money) deltas.append(delta) delta = max(deltas, key=abs) cycle = cycles[deltas.index(delta)] suggestions.append((cycle, delta)) for i, vertex in enumerate(cycle): successor = cycle[(i + 1) % len(cycle)] new_balance[vertex][successor] -= delta new_balance[successor][vertex] += delta def test_minimize(): """Test the minimize logic with simple balances""" # yapf: disable balance = {'A': {'B': 10, 'C': -10}, 'B': {'A': -10, 'C': 10}, 'C': {'A': 10, 'B': -10}} # yapf: enable suggestions = minimize(balance) assert len(suggestions) == 1 cycle, delta = suggestions[0] assert abs(delta) == 10 assert 'A' in cycle and 'B' in cycle and 'C' in cycle # yapf: disable balance = {'A': {'B': 10, 'C': 0}, 'B': {'A': -10, 'C': 10}, 'C': {'A': 0, 'B': -10}} # yapf: enable suggestions = minimize(balance) assert len(suggestions) == 0 # yapf: disable balance = {'A': {'B': 10, 'C': 0, 'D': -10}, 'B': {'A': -10, 'C': 10, 'D': 0}, 'C': {'A': 0, 'B': -10, 'D': 10}, 'D': {'A': 10, 'B': 0, 'C': -10}} # yapf: enable suggestions = minimize(balance) assert len(suggestions) == 1 cycle, delta = suggestions[0] assert abs(delta) == 10 assert 'A' in cycle and 'B' in cycle and 'C' in cycle and 'D' in cycle def main(): """Entry point listing minimize steps of a particular state file""" import json import argparse parser = argparse.ArgumentParser() parser.add_argument('state_file', default='state.json', nargs='?') args = parser.parse_args() with open(args.state_file, 'r', encoding='utf-8') as state_file: balance = json.load(state_file)['balance'] print(minimize(balance)) if __name__ == '__main__': main()