1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
#!/usr/bin/env python3
"""Find cycles in a balance that could be resolved to minimize the balance"""
import itertools
import math
import typing as T
Cycle = tuple
Cycles = list[Cycle]
def gen_cycles(vertices: set) -> Cycles:
"""Generate all cycles possible in the vertices set
The vertices set is expected to contain the vertices of a fully connected
bidirectional graph. Like the balance used by geldschieberbot.
A cycle must have a path length of at least 3 to be considered a cycle.
The nature of the graph makes the cycle generation easy. All possible cycles
are simply all possible subsets with at least 3 elements.
"""
cycles = []
for length in range(3, len(vertices) + 1):
for subset in set(itertools.combinations(vertices, length)):
cycles.append(subset)
return cycles
def test_gen_cycles():
"""Test the cycle generation by comparing the number of generated cycles"""
vertices = {1, 2, 3, 4}
lenghts = range(3, len(vertices) + 1)
num_cycles = sum(math.comb(len(vertices), x) for x in lenghts)
assert len(gen_cycles(vertices)) == num_cycles
Balance = dict[T.Any, dict[T.Any, int]]
def has_same_flow_direction(balance: Balance, cycle: Cycle) -> bool:
"""Check if a cycle has the same money flow
A cycle is considered to have the same money flow if all pairs have
either a value greater or less than 0.
Example:
A -> B 10
B -> C 10
C -> A 10
"""
edge = balance[cycle[-1]][cycle[0]]
if edge == 0:
return False
direction = edge > 0
for i in range(0, len(cycle) - 1):
edge = balance[cycle[i]][cycle[i + 1]]
if not edge or (edge > 0) != direction:
return False
return True
def test_has_same_flow_direction():
"""Test the same flow detection logic"""
# yapf: disable
balance = {'A': {'B': 10, 'C': -10},
'B': {'A': -10, 'C': 10},
'C': {'A': 10, 'B': -10}}
# yapf: enable
assert has_same_flow_direction(balance, ['A', 'B', 'C'])
assert has_same_flow_direction(balance, ['A', 'C', 'B'])
# yapf: disable
balance = {'A': {'B': 10, 'C': 0},
'B': {'A': -10, 'C': 10},
'C': {'A': 0, 'B': -10}}
# yapf: enable
assert not has_same_flow_direction(balance, ['A', 'B', 'C'])
assert not has_same_flow_direction(balance, ['A', 'C', 'B'])
def find_money_cicles(balance: Balance, cycles: Cycles) -> Cycles:
"""Find all cycles with the same money flow in a balance"""
found = []
for cycle in cycles:
if has_same_flow_direction(balance, cycle):
found.append(cycle)
return found
MinimizeSuggestion = tuple[Cycle, int]
def minimize(balance: Balance) -> list[MinimizeSuggestion]:
"""Greedy minimize a balance
Return a list of cycles with the biggest delta that could be resolved
to minimize the balance.
"""
new_balance = {p: sb.copy() for p, sb in balance.items()}
suggestions: list[MinimizeSuggestion] = []
while True:
cycles = find_money_cicles(new_balance,
gen_cycles(set(new_balance.keys())))
if not cycles:
return suggestions
deltas = []
for cycle in cycles:
money = []
for i, vertex in enumerate(cycle):
successor = cycle[(i + 1) % len(cycle)]
money.append(new_balance[vertex][successor])
if money[0] < 0:
delta = max(money)
else:
delta = min(money)
deltas.append(delta)
delta = max(deltas, key=abs)
cycle = cycles[deltas.index(delta)]
suggestions.append((cycle, delta))
for i, vertex in enumerate(cycle):
successor = cycle[(i + 1) % len(cycle)]
new_balance[vertex][successor] -= delta
new_balance[successor][vertex] += delta
def test_minimize():
"""Test the minimize logic with simple balances"""
# yapf: disable
balance = {'A': {'B': 10, 'C': -10},
'B': {'A': -10, 'C': 10},
'C': {'A': 10, 'B': -10}}
# yapf: enable
suggestions = minimize(balance)
assert len(suggestions) == 1
cycle, delta = suggestions[0]
assert abs(delta) == 10
assert 'A' in cycle and 'B' in cycle and 'C' in cycle
# yapf: disable
balance = {'A': {'B': 10, 'C': 0},
'B': {'A': -10, 'C': 10},
'C': {'A': 0, 'B': -10}}
# yapf: enable
suggestions = minimize(balance)
assert len(suggestions) == 0
def main():
"""Entry point listing minimize steps of a particular state file"""
parser = argparse.ArgumentParser()
parser.add_argument('state_file', default='state.json', nargs='?')
args = parser.parse_args()
with open(args.state_file, 'r', encoding='utf-8') as state_file:
balance = json.load(state_file)['balance']
print(minimize(balance))
if __name__ == '__main__':
import json
import argparse
main()
|