aboutsummaryrefslogtreecommitdiff
path: root/minimize.py
diff options
context:
space:
mode:
Diffstat (limited to 'minimize.py')
-rwxr-xr-xminimize.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/minimize.py b/minimize.py
new file mode 100755
index 0000000..1252475
--- /dev/null
+++ b/minimize.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+"""Find cycles in a balance that could be resolved to minimize the balance"""
+import itertools
+import math
+import typing as T
+
+Cycle = tuple
+Cycles = list[Cycle]
+
+
+def gen_cycles(vertices: set) -> Cycles:
+ """Generate all cycles possible in the vertices set
+
+ The vertices set is expected to contain the vertices of a fully connected
+ bidirectional graph. Like the balance used by geldschieberbot.
+ A cycle must have a path length of at least 3 to be considered a cycle.
+ The nature of the graph makes the cycle generation easy. All possible cycles
+ are simply all possible subsets with at least 3 elements.
+ """
+ cycles = []
+ for length in range(3, len(vertices) + 1):
+ for subset in set(itertools.combinations(vertices, length)):
+ cycles.append(subset)
+ return cycles
+
+
+def test_gen_cycles():
+ """Test the cycle generation by comparing the number of generated cycles"""
+ vertices = {1, 2, 3, 4}
+ lenghts = range(3, len(vertices) + 1)
+ num_cycles = sum(math.comb(len(vertices), x) for x in lenghts)
+ assert len(gen_cycles(vertices)) == num_cycles
+
+
+Balance = dict[T.Any, dict[T.Any, int]]
+
+
+def has_same_flow_direction(balance: Balance, cycle: Cycle) -> bool:
+ """Check if a cycle has the same money flow
+
+ A cycle is considered to have the same money flow if all pairs have
+ either a value greater or less than 0.
+ Example:
+ A -> B 10
+ B -> C 10
+ C -> A 10
+ """
+ edge = balance[cycle[-1]][cycle[0]]
+ if edge == 0:
+ return False
+
+ direction = edge > 0
+ for i in range(0, len(cycle) - 1):
+ edge = balance[cycle[i]][cycle[i + 1]]
+ if not edge or (edge > 0) != direction:
+ return False
+ return True
+
+
+def test_has_same_flow_direction():
+ """Test the same flow detection logic"""
+ # yapf: disable
+ balance = {'A': {'B': 10, 'C': -10},
+ 'B': {'A': -10, 'C': 10},
+ 'C': {'A': 10, 'B': -10}}
+ # yapf: enable
+
+ assert has_same_flow_direction(balance, ['A', 'B', 'C'])
+ assert has_same_flow_direction(balance, ['A', 'C', 'B'])
+
+ # yapf: disable
+ balance = {'A': {'B': 10, 'C': 0},
+ 'B': {'A': -10, 'C': 10},
+ 'C': {'A': 0, 'B': -10}}
+ # yapf: enable
+
+ assert not has_same_flow_direction(balance, ['A', 'B', 'C'])
+ assert not has_same_flow_direction(balance, ['A', 'C', 'B'])
+
+
+def find_money_cicles(balance: Balance, cycles: Cycles) -> Cycles:
+ """Find all cycles with the same money flow in a balance"""
+ found = []
+ for cycle in cycles:
+ if has_same_flow_direction(balance, cycle):
+ found.append(cycle)
+
+ return found
+
+
+MinimizeSuggestion = tuple[Cycle, int]
+
+
+def minimize(balance: Balance) -> list[MinimizeSuggestion]:
+ """Greedy minimize a balance
+
+ Return a list of cycles with the biggest delta that could be resolved
+ to minimize the balance.
+ """
+ new_balance = {p: sb.copy() for p, sb in balance.items()}
+ suggestions: list[MinimizeSuggestion] = []
+
+ while True:
+ cycles = find_money_cicles(new_balance,
+ gen_cycles(set(new_balance.keys())))
+ if not cycles:
+ return suggestions
+ deltas = []
+
+ for cycle in cycles:
+ money = []
+ for i, vertex in enumerate(cycle):
+ successor = cycle[(i + 1) % len(cycle)]
+ money.append(new_balance[vertex][successor])
+
+ if money[0] < 0:
+ delta = max(money)
+ else:
+ delta = min(money)
+ deltas.append(delta)
+
+ delta = max(deltas, key=abs)
+ cycle = cycles[deltas.index(delta)]
+ suggestions.append((cycle, delta))
+
+ for i, vertex in enumerate(cycle):
+ successor = cycle[(i + 1) % len(cycle)]
+ new_balance[vertex][successor] -= delta
+ new_balance[successor][vertex] += delta
+
+
+def test_minimize():
+ """Test the minimize logic with simple balances"""
+ # yapf: disable
+ balance = {'A': {'B': 10, 'C': -10},
+ 'B': {'A': -10, 'C': 10},
+ 'C': {'A': 10, 'B': -10}}
+ # yapf: enable
+
+ suggestions = minimize(balance)
+ assert len(suggestions) == 1
+ cycle, delta = suggestions[0]
+ assert abs(delta) == 10
+ assert 'A' in cycle and 'B' in cycle and 'C' in cycle
+
+ # yapf: disable
+ balance = {'A': {'B': 10, 'C': 0},
+ 'B': {'A': -10, 'C': 10},
+ 'C': {'A': 0, 'B': -10}}
+ # yapf: enable
+
+ suggestions = minimize(balance)
+ assert len(suggestions) == 0
+
+
+def main():
+ """Entry point listing minimize steps of a particular state file"""
+ parser = argparse.ArgumentParser()
+ parser.add_argument('state_file', default='state.json', nargs='?')
+ args = parser.parse_args()
+ with open(args.state_file, 'r', encoding='utf-8') as state_file:
+ balance = json.load(state_file)['balance']
+ print(minimize(balance))
+
+
+if __name__ == '__main__':
+ import json
+ import argparse
+
+ main()