diff options
Diffstat (limited to 'minimize.py')
| -rwxr-xr-x | minimize.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/minimize.py b/minimize.py new file mode 100755 index 0000000..1252475 --- /dev/null +++ b/minimize.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +"""Find cycles in a balance that could be resolved to minimize the balance""" +import itertools +import math +import typing as T + +Cycle = tuple +Cycles = list[Cycle] + + +def gen_cycles(vertices: set) -> Cycles: + """Generate all cycles possible in the vertices set + + The vertices set is expected to contain the vertices of a fully connected + bidirectional graph. Like the balance used by geldschieberbot. + A cycle must have a path length of at least 3 to be considered a cycle. + The nature of the graph makes the cycle generation easy. All possible cycles + are simply all possible subsets with at least 3 elements. + """ + cycles = [] + for length in range(3, len(vertices) + 1): + for subset in set(itertools.combinations(vertices, length)): + cycles.append(subset) + return cycles + + +def test_gen_cycles(): + """Test the cycle generation by comparing the number of generated cycles""" + vertices = {1, 2, 3, 4} + lenghts = range(3, len(vertices) + 1) + num_cycles = sum(math.comb(len(vertices), x) for x in lenghts) + assert len(gen_cycles(vertices)) == num_cycles + + +Balance = dict[T.Any, dict[T.Any, int]] + + +def has_same_flow_direction(balance: Balance, cycle: Cycle) -> bool: + """Check if a cycle has the same money flow + + A cycle is considered to have the same money flow if all pairs have + either a value greater or less than 0. + Example: + A -> B 10 + B -> C 10 + C -> A 10 + """ + edge = balance[cycle[-1]][cycle[0]] + if edge == 0: + return False + + direction = edge > 0 + for i in range(0, len(cycle) - 1): + edge = balance[cycle[i]][cycle[i + 1]] + if not edge or (edge > 0) != direction: + return False + return True + + +def test_has_same_flow_direction(): + """Test the same flow detection logic""" + # yapf: disable + balance = {'A': {'B': 10, 'C': -10}, + 'B': {'A': -10, 'C': 10}, + 'C': {'A': 10, 'B': -10}} + # yapf: enable + + assert has_same_flow_direction(balance, ['A', 'B', 'C']) + assert has_same_flow_direction(balance, ['A', 'C', 'B']) + + # yapf: disable + balance = {'A': {'B': 10, 'C': 0}, + 'B': {'A': -10, 'C': 10}, + 'C': {'A': 0, 'B': -10}} + # yapf: enable + + assert not has_same_flow_direction(balance, ['A', 'B', 'C']) + assert not has_same_flow_direction(balance, ['A', 'C', 'B']) + + +def find_money_cicles(balance: Balance, cycles: Cycles) -> Cycles: + """Find all cycles with the same money flow in a balance""" + found = [] + for cycle in cycles: + if has_same_flow_direction(balance, cycle): + found.append(cycle) + + return found + + +MinimizeSuggestion = tuple[Cycle, int] + + +def minimize(balance: Balance) -> list[MinimizeSuggestion]: + """Greedy minimize a balance + + Return a list of cycles with the biggest delta that could be resolved + to minimize the balance. + """ + new_balance = {p: sb.copy() for p, sb in balance.items()} + suggestions: list[MinimizeSuggestion] = [] + + while True: + cycles = find_money_cicles(new_balance, + gen_cycles(set(new_balance.keys()))) + if not cycles: + return suggestions + deltas = [] + + for cycle in cycles: + money = [] + for i, vertex in enumerate(cycle): + successor = cycle[(i + 1) % len(cycle)] + money.append(new_balance[vertex][successor]) + + if money[0] < 0: + delta = max(money) + else: + delta = min(money) + deltas.append(delta) + + delta = max(deltas, key=abs) + cycle = cycles[deltas.index(delta)] + suggestions.append((cycle, delta)) + + for i, vertex in enumerate(cycle): + successor = cycle[(i + 1) % len(cycle)] + new_balance[vertex][successor] -= delta + new_balance[successor][vertex] += delta + + +def test_minimize(): + """Test the minimize logic with simple balances""" + # yapf: disable + balance = {'A': {'B': 10, 'C': -10}, + 'B': {'A': -10, 'C': 10}, + 'C': {'A': 10, 'B': -10}} + # yapf: enable + + suggestions = minimize(balance) + assert len(suggestions) == 1 + cycle, delta = suggestions[0] + assert abs(delta) == 10 + assert 'A' in cycle and 'B' in cycle and 'C' in cycle + + # yapf: disable + balance = {'A': {'B': 10, 'C': 0}, + 'B': {'A': -10, 'C': 10}, + 'C': {'A': 0, 'B': -10}} + # yapf: enable + + suggestions = minimize(balance) + assert len(suggestions) == 0 + + +def main(): + """Entry point listing minimize steps of a particular state file""" + parser = argparse.ArgumentParser() + parser.add_argument('state_file', default='state.json', nargs='?') + args = parser.parse_args() + with open(args.state_file, 'r', encoding='utf-8') as state_file: + balance = json.load(state_file)['balance'] + print(minimize(balance)) + + +if __name__ == '__main__': + import json + import argparse + + main() |
