aboutsummaryrefslogtreecommitdiff
path: root/minimize.py
blob: 70fd738cbac86d33411e8013d79f432e236626f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
"""Find cycles in a balance that could be resolved to minimize the balance"""
import itertools
import math
import typing as T

Cycle = tuple
Cycles = T.Iterable[Cycle]


def gen_cycles(vertices: set) -> Cycles:
    """Iterate all cycles possible in the vertices set

    The vertices set is expected to contain the vertices of a fully connected
    bidirectional graph. Like the balance used by geldschieberbot.
    A cycle must have a path length of at least 3 to be considered a cycle.
    The nature of the graph makes the cycle generation easy. All possible cycles
    are simply all possible subsets with at least 3 elements.
    """
    for length in range(3, len(vertices) + 1):
        for subset in itertools.combinations(vertices, length):
            yield from itertools.permutations(subset, length)


def test_gen_cycles():
    """Test the cycle generation by comparing the number of generated cycles"""
    vertices = {1, 2, 3, 4}
    lenghts = range(3, len(vertices) + 1)
    num_cycles = sum(
        math.perm(x) * math.comb(len(vertices), x) for x in lenghts)
    assert len(list(gen_cycles(vertices))) == num_cycles


Balance = dict[T.Any, dict[T.Any, int]]


def has_same_flow_direction(balance: Balance, cycle: Cycle) -> bool:
    """Check if a cycle has the same money flow

    A cycle is considered to have the same money flow if all pairs have
    either a value greater or less than 0.
    Example:
      A -> B 10
      B -> C 10
      C -> A 10
    """
    edge = balance[cycle[-1]][cycle[0]]
    if edge == 0:
        return False

    direction = edge > 0
    for i in range(0, len(cycle) - 1):
        edge = balance[cycle[i]][cycle[i + 1]]
        if not edge or (edge > 0) != direction:
            return False
    return True


def test_has_same_flow_direction():
    """Test the same flow detection logic"""
    # yapf: disable
    balance = {'A': {'B': 10, 'C': -10},
               'B': {'A': -10, 'C': 10},
               'C': {'A': 10, 'B': -10}}
    # yapf: enable

    assert has_same_flow_direction(balance, ['A', 'B', 'C'])
    assert has_same_flow_direction(balance, ['A', 'C', 'B'])

    # yapf: disable
    balance = {'A': {'B': 10, 'C': 0},
               'B': {'A': -10, 'C': 10},
               'C': {'A': 0, 'B': -10}}
    # yapf: enable

    assert not has_same_flow_direction(balance, ['A', 'B', 'C'])
    assert not has_same_flow_direction(balance, ['A', 'C', 'B'])


def find_money_cicles(balance: Balance, cycles: Cycles) -> Cycles:
    """Find all cycles with the same money flow in a balance"""
    for cycle in cycles:
        if has_same_flow_direction(balance, cycle):
            yield cycle


def exclude_dead_ends(balance: Balance) -> T.Iterable[str]:
    """Return all vertices that are no dead ends"""
    for vert, edges in balance.items():
        last_edge = None
        for edge in edges.values():
            if last_edge and (last_edge < 0 < edge or last_edge > 0 > edge):
                yield vert

            if edge:
                last_edge = edge


MinimizeSuggestion = tuple[Cycle, int]


def minimize(balance: Balance) -> list[MinimizeSuggestion]:
    """Greedy minimize a balance

    Return a list of cycles with the biggest delta that could be resolved
    to minimize the balance.
    """
    new_balance = {p: sb.copy() for p, sb in balance.items()}
    suggestions: list[MinimizeSuggestion] = []

    while True:
        relevant_vertices = set(exclude_dead_ends(new_balance))
        cycles = list(
            find_money_cicles(new_balance, gen_cycles(relevant_vertices)))
        if not cycles:
            return suggestions
        deltas = []

        for cycle in cycles:
            money = []
            for i, vertex in enumerate(cycle):
                successor = cycle[(i + 1) % len(cycle)]
                money.append(new_balance[vertex][successor])

            if money[0] < 0:
                delta = max(money)
            else:
                delta = min(money)
            deltas.append(delta)

        delta = max(deltas, key=abs)
        cycle = cycles[deltas.index(delta)]
        suggestions.append((cycle, delta))

        for i, vertex in enumerate(cycle):
            successor = cycle[(i + 1) % len(cycle)]
            new_balance[vertex][successor] -= delta
            new_balance[successor][vertex] += delta


def test_minimize():
    """Test the minimize logic with simple balances"""
    # yapf: disable
    balance = {'A': {'B': 10, 'C': -10},
               'B': {'A': -10, 'C': 10},
               'C': {'A': 10, 'B': -10}}
    # yapf: enable

    suggestions = minimize(balance)
    assert len(suggestions) == 1
    cycle, delta = suggestions[0]
    assert abs(delta) == 10
    assert 'A' in cycle and 'B' in cycle and 'C' in cycle

    # yapf: disable
    balance = {'A': {'B': 10, 'C': 0},
               'B': {'A': -10, 'C': 10},
               'C': {'A': 0, 'B': -10}}
    # yapf: enable

    suggestions = minimize(balance)
    assert len(suggestions) == 0

    # yapf: disable
    balance = {'A': {'B': 10, 'C': 0, 'D': -10},
               'B': {'A': -10, 'C': 10, 'D': 0},
               'C': {'A': 0, 'B': -10, 'D': 10},
               'D': {'A': 10, 'B': 0, 'C': -10}}
    # yapf: enable

    suggestions = minimize(balance)
    assert len(suggestions) == 1
    cycle, delta = suggestions[0]
    assert abs(delta) == 10
    assert 'A' in cycle and 'B' in cycle and 'C' in cycle and 'D' in cycle


def main():
    """Entry point listing minimize steps of a particular state file"""
    import json
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('state_file', default='state.json', nargs='?')
    args = parser.parse_args()
    with open(args.state_file, 'r', encoding='utf-8') as state_file:
        balance = json.load(state_file)['balance']
    print(minimize(balance))


if __name__ == '__main__':
    main()