diff options
| author | Florian Fischer <florian.fischer@muhq.space> | 2022-07-04 10:49:06 +0200 |
|---|---|---|
| committer | Florian Fischer <florian.fischer@muhq.space> | 2022-07-04 10:49:06 +0200 |
| commit | 462b01aefc3953063875884f99adb7138c7db227 (patch) | |
| tree | c98a9b96d0909adc9966966a1e4a48eadda7d48b | |
| parent | 45dd321c6dd44f72d2982a9665a2ea88436bacec (diff) | |
| download | geldschieberbot-462b01aefc3953063875884f99adb7138c7db227.tar.gz geldschieberbot-462b01aefc3953063875884f99adb7138c7db227.zip | |
geldschieberbot: refactor bot into a class
Geldschieberbot beeing a class gets rid of the mutable global
variables for dry_run and record_changes.
Misc cleanups:
* Cleanup recording changes
* Add doc-strings
* Uppercase globals
| -rw-r--r-- | geldschieberbot.py | 1511 |
1 files changed, 767 insertions, 744 deletions
diff --git a/geldschieberbot.py b/geldschieberbot.py index f21b7d6..2dab9f2 100644 --- a/geldschieberbot.py +++ b/geldschieberbot.py @@ -8,62 +8,18 @@ import subprocess import sys import tanken + # Path where our data is stored persistent on disk -state_file = os.environ["GSB_STATE_FILE"] - -if os.path.isfile(state_file): - with open(state_file, 'r', encoding='utf-8') as state_f: - state = json.load(state_f) -else: - # Dict containing the whole state of geldschieberbot - # balance - dict of dicts associating two persons to an amount - # name2num, num2name - dicts associating numbers to names and vice versa - # cars - dict associating car names to their service charge - # scheduled_cmds - dict associating names to cmds, their schedule, and the last execution - # changes - dict associating users with their changes - state = { - "balance": {}, - "name2num": {}, - "num2name": {}, - "cars": {}, - "scheduled_cmds": {}, - "changes": {}, - } - -balance = state["balance"] -name2num = state["name2num"] -num2name = state["num2name"] -available_cars = state["cars"] -scheduled_cmds = state["scheduled_cmds"] -changes = state["changes"] - -group_id = os.environ["GSB_GROUP_ID"] - -send_cmd = os.environ["GSB_SEND_CMD"] -group_send_cmd = send_cmd + group_id - -dry_run = False -"""Run without changing the stored state""" - -quiet = False -"""Run without sending messages""" - -record_changes = True -"""Should changes be recorded""" - - -def record(recipient, donor, amount): - """Apply changes to the balance""" - - # Only change anything if this is not a dry run - if dry_run: - return +STATE_FILE = os.environ["GSB_STATE_FILE"] - balance[donor][recipient] += amount - balance[recipient][donor] -= amount +GROUP_ID = os.environ["GSB_GROUP_ID"] + +SEND_CMD = os.environ["GSB_SEND_CMD"] +GROUP_SEND_CMD = SEND_CMD + GROUP_ID def to_cent(euro): + """Parse string containing euros into a cent value""" if '.' in euro: euro = euro.split('.') else: @@ -85,844 +41,892 @@ def to_cent(euro): def to_euro(cents): + """Format cents as euro""" return f"{cents/100:.2f}" -def send(msg, attachment=None, cmd=send_cmd): - if not quiet: - if attachment: - cmd += f' -a {attachment}' - subprocess.run(cmd.split(' '), input=msg.encode(), check=False) - +class Geldschieberbot: + """ + State of the geldschieberbot + """ -def create_summary(user): - msg = '' - cars_summary = "" - total = 0 - cars_total = 0 - p_balances = balance[user] # failes if user is not in balance - for person in p_balances: - amount = p_balances[person] - if amount == 0: - continue - if person in available_cars: - cars_total -= amount - cars_summary += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' + def load_state(self, state_path=STATE_FILE): + """Load state from disk""" + if os.path.isfile(state_path): + with open(state_path, 'r', encoding='utf-8') as state_f: + self.state = json.load(state_f) else: - total -= amount - msg += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' - - if not msg: - msg = '\tAll fine :)' - else: - msg += f'\tBalance: {to_euro(total)}' - - ret_summary = f'{user}:\n{msg}' - - if cars_summary: - cars_summary += f'\tLiability: {to_euro(cars_total)}' - ret_summary += f'\n\tCars:\n{cars_summary}' - - return ret_summary - + # Dict containing the whole state of geldschieberbot + # balance - dict of dicts associating two persons to an amount + # name2num, num2name - dicts associating numbers to names and vice versa + # cars - dict associating car names to their service charge + # scheduled_cmds - dict associating names to cmds, their schedule, and the last execution + # changes - dict associating users with their changes + self.state = { + "balance": {}, + "name2num": {}, + "num2name": {}, + "cars": {}, + "scheduled_cmds": {}, + "changes": {}, + } -def create_total_summary() -> str: - msg = 'Summary:' + self.balance = self.state["balance"] + self.name2num = self.state["name2num"] + self.num2name = self.state["num2name"] + self.available_cars = self.state["cars"] + self.scheduled_cmds = self.state["scheduled_cmds"] + self.changes = self.state["changes"] + + def save_state(self, state_path=STATE_FILE): + """Load state from disk""" + with open(state_path, 'w', encoding='utf-8') as f: + json.dump(self.state, f) + + def record(self, recipient, donor, amount): + """Apply changes to the balance""" + + # Only change anything if this is not a dry run + if self.dry_run: + return + + self.balance[donor][recipient] += amount + self.balance[recipient][donor] -= amount + + def send(self, msg, attachment=None, cmd=SEND_CMD): + """Send a message with optional attachment""" + if not self.quiet: + if attachment: + cmd += f' -a {attachment}' + subprocess.run(cmd.split(' '), input=msg.encode(), check=False) + + def create_summary(self, user) -> str: + """Create a summary for a user""" + msg = '' + cars_summary = "" + total = 0 + cars_total = 0 + p_balances = self.balance[user] # failes if user is not in balance + for person in p_balances: + amount = p_balances[person] + if amount == 0: + continue + if person in self.available_cars: + cars_total -= amount + cars_summary += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' + else: + total -= amount + msg += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' - cars_summary = '' - for person in balance: - p_summary = create_summary(person) - if person in available_cars: - cars_summary += f'\n{p_summary}' + if not msg: + msg = '\tAll fine :)' else: - msg += f'\n{p_summary}' + msg += f'\tBalance: {to_euro(total)}' - if cars_summary: - msg += f'\nCars:{cars_summary}' - return msg + ret_summary = f'{user}:\n{msg}' + if cars_summary: + cars_summary += f'\tLiability: {to_euro(cars_total)}' + ret_summary += f'\n\tCars:\n{cars_summary}' -def create_members() -> str: - r = "" - for m in name2num: - r += m + ": " + name2num[m] + "\n" - return r + return ret_summary + def create_total_summary(self) -> str: + """Create a summary of all balances""" + msg = 'Summary:' -def add_to_balance(name): - nb = {} - for m in balance: - balance[m][name] = 0 - nb[m] = 0 - balance[name] = nb - - -def remove_from_balance(name): - del balance[name] - for m in balance: - del balance[m][name] - - -def create_help(): - return """ -Usage: send a message starting with '!' followed by a command -Commands: -ls | list - print all registered members -help - print this help message -reg name - register the sender with the name: name -sum [name] - print summary of specific users -full-sum - print summary of all users - -split amount person [persons] - split amount between the sender and persons -teil amount person [persons] - split amount between the sender and persons - -schieb amount recipient - give money to recipient -gib amount recipient - give money to recipient -zieh amount donor - get money from donor -nimm amount donor - get money from donor - -transfer amount source destination - transfer amount of your balance from source to destination + cars_summary = '' + for person in self.balance: + p_summary = self.create_summary(person) + if person in self.available_cars: + cars_summary += f'\n{p_summary}' + else: + msg += f'\n{p_summary}' + + if cars_summary: + msg += f'\nCars:{cars_summary}' + return msg + + def create_members(self) -> str: + """Create a list of all group members""" + r = "" + for m in self.name2num: + r += m + ": " + self.name2num[m] + "\n" + return r + + def add_to_balance(self, name): + """Add a new user balance""" + nb = {} + for m in self.balance: + self.balance[m][name] = 0 + nb[m] = 0 + self.balance[name] = nb + + def remove_from_balance(self, name): + """Remove a user balance""" + del self.balance[name] + for m in self.balance: + del self.balance[m][name] + + @classmethod + def create_help(cls): + return """ + Usage: send a message starting with '!' followed by a command + Commands: + ls | list - print all registered members + help - print this help message + reg name - register the sender with the name: name + sum [name] - print summary of specific users + full-sum - print summary of all users + + split amount person [persons] - split amount between the sender and persons + teil amount person [persons] - split amount between the sender and persons + + schieb amount recipient - give money to recipient + gib amount recipient - give money to recipient + zieh amount donor - get money from donor + nimm amount donor - get money from donor + + transfer amount source destination - transfer amount of your balance from source to destination + + cars [cmd] - interact with the available cars + cars [list | ls] - list available cars and their service charge + cars add car-name service-charge - add new car + cars new car-name service-charge - add new car + cars remove car-name - remove new car + cars pay car-name amount - pay a bill for the specified car + + tanken amount [person] [car] [info] - calculate fuel costs, service charge and add them to the person's and car's balance respectively + + fuck [change] - rewind last or specific change + list-changes [n] [first] - list the last n changes starting at first + + export-state - send the state file + + weekly name cmd - repeat cmd each week + monthly name cmd - repeat cmd each month + yearly name cmd - repeat cmd each year + cancel name - stop repeating cmd + + Happy Geldschieben! + """ + + def register(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Register a new user""" + if len(args) != 2: + return {'err': f'not in form "{args[0]} name"'} + name = args[1] -cars [cmd] - interact with the available cars -cars [list | ls] - list available cars and their service charge -cars add car-name service-charge - add new car -cars new car-name service-charge - add new car -cars remove car-name - remove new car -cars pay car-name amount - pay a bill for the specified car + try: + to_cent(name) + return {'err': 'pure numerical names are not allowed'} + except (ValueError, TypeError): + pass -tanken amount [person] [car] [info] - calculate fuel costs, service charge and add them to the person's and car's balance respectively + if name in self.name2num: + return {'err': f'{name} already registered'} -fuck [change] - rewind last or specific change -list-changes [n] [first] - list the last n changes starting at first + if sender in self.num2name: + return {'err': 'you are already registered'} -export-state - send the state file + self.num2name[sender] = name + self.name2num[name] = sender -weekly name cmd - repeat cmd each week -monthly name cmd - repeat cmd each month -yearly name cmd - repeat cmd each year -cancel name - stop repeating cmd + self.add_to_balance(name) -Happy Geldschieben! -""" + # add changes list + self.changes[name] = [] + return {'msg': f'Happy geldschiebing {name}!'} + def summary(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Print summary for one or multiple balances""" + if len(args) == 1: + if not sender in self.num2name: + return {'err': 'You must register first to print your summary'} + name = self.num2name[sender] + return {'msg': f'Summary:\n{self.create_summary(name)}'} -cmds = {} + msg = "Summary:\n" + for name in args[1:]: + if name in self.name2num or name in self.available_cars: + msg += self.create_summary(name) + "\n" + else: + return {'err': f'name "{name}" not registered'} + return {'msg': msg} + def full_summary(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Print a summary of all balances""" + if len(args) == 1: + return {'msg': self.create_total_summary()} + else: + return {'err': f'{args[0][1:]} takes no arguments'} -def register(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if len(args) != 2: - return {'err': f'not in form "{args[0]} name"'} - name = args[1] + def list_users(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """List all registered users""" + return {'msg': self.create_members()} - try: - to_cent(name) - return {'err': 'pure numerical names are not allowed'} - except (ValueError, TypeError): - pass + @classmethod + def usage(cls, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Return the usage""" + return {'msg': cls.create_help()} - if name in name2num: - return {'err': f'{name} already registered'} + def split(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Split a fixed amount across multiple persons""" + if not sender in self.num2name: + return {'err': 'you must register first'} - if sender in num2name: - return {'err': 'you are already registered'} + if len(args) < 3: + return {'err': f'not in form "{args[0]} amount [name]+"'} - num2name[sender] = name - name2num[name] = sender + try: + amount = to_cent(args[1]) + persons = args[2:] + except (ValueError, TypeError): + # support !split name amount + if len(args) == 3: + try: + amount = to_cent(args[2]) + persons = [args[1]] + except (ValueError, TypeError): + return {'err': 'amount must be a positive number'} + else: + return {'err': 'amount must be a positive number'} - add_to_balance(name) + recipient = self.num2name[sender] + # persons + sender + npersons = len(persons) + 1 + amount_per_person = int(amount / npersons) - # add changes list - changes[name] = [] - return {'msg': f'Happy geldschiebing {name}!'} + output = f"Split {to_euro(amount)} between {npersons} -> {to_euro(amount_per_person)} each\n" + change = [args] + for p in persons: + if p in self.name2num: + if p == recipient: + output += (f'{p}, you will be charged multiple times. ' + 'This may not be what you want\n') + else: + self.record(recipient, p, amount_per_person) + change.append([recipient, p, amount_per_person]) + else: + output += f"{p} not known. Please take care manually\n" + self.may_record_change(recipient, change) -cmds["reg"] = register -cmds["register"] = register + output += "New Balance:\n" + output += self.create_summary(recipient) + return {'msg': output} + def transaction(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Record a transaction""" + if len(args) != 3: + return {'err': f'not in form "{args[0]} amount recipient"'} -def summary(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if len(args) == 1: - if not sender in num2name: - return {'err': 'You must register first to print your summary'} - name = num2name[sender] - return {'msg': f'Summary:\n{create_summary(name)}'} + if not sender in self.balance: + if sender not in self.num2name: + return {'err': 'you must register first'} + sender = self.num2name[sender] - msg = "Summary:\n" - for name in args[1:]: - if name in name2num or name in available_cars: - msg += create_summary(name) + "\n" + if args[1] in self.balance: + recipient, amount = args[1:3] + elif args[2] in self.balance: + amount, recipient = args[1:3] else: - return {'err': f'name "{name}" not registered'} - return {'msg': msg} - - -cmds["sum"] = summary -cmds["summary"] = summary - - -def full_summary(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if len(args) == 1: - return {'msg': create_total_summary()} - else: - return {'err': f'{args[0][1:]} takes no arguments'} - - -cmds["full-sum"] = full_summary -cmds["full-summary"] = full_summary - + return {'err': 'recipient not known'} -def list_users(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - return {'msg': create_members()} + if sender == recipient: + return {'err': 'you can not transfer money to or from yourself'} + try: + amount = to_cent(amount) + except (ValueError, TypeError): + return {'err': 'amount must be a positive number'} -cmds["ls"] = list_users -cmds["list"] = list_users + if args[0] in ["!zieh", "!nimm"]: + amount *= -1 + self.may_record_change(sender, [args, [sender, recipient, amount]]) -def usage(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - return {'msg': create_help()} + self.record(sender, recipient, amount) + p_balance = self.balance[sender][recipient] -cmds["help"] = usage -cmds["usage"] = usage + output = ("New Balance: {} {} {} {}\n".format( + sender, ("->" if p_balance > 0 else "<-"), to_euro(abs(p_balance)), + recipient)) + return {'msg': output} + def transfer(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Transfer amount from one balance to another""" + if len(args) < 4: + return { + 'err': f'not in form "{args[0]} amount source destination"' + } -def split(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if not sender in num2name: - return {'err': 'you must register first'} + if not sender in self.num2name: + return {'err': 'you must register first'} - if len(args) < 3: - return {'err': f'not in form "{args[0]} amount [name]+"'} + sender = self.num2name[sender] - try: - amount = to_cent(args[1]) - persons = args[2:] - except (ValueError, TypeError): - # support !split name amount - if len(args) == 3: - try: - amount = to_cent(args[2]) - persons = [args[1]] - except (ValueError, TypeError): - return {'err': 'amount must be a positive number'} - else: + try: + amount_raw = args[1] + amount_cent = to_cent(amount_raw) + except (ValueError, TypeError): return {'err': 'amount must be a positive number'} - recipient = num2name[sender] - # persons + sender - npersons = len(persons) + 1 - amount_per_person = int(amount / npersons) - - output = f"Split {to_euro(amount)} between {npersons} -> {to_euro(amount_per_person)} each\n" - change = [args] - for p in persons: - if p in name2num: - if p == recipient: - output += (f'{p}, you will be charged multiple times. ' - 'This may not be what you want\n') - else: - record(recipient, p, amount_per_person) - change.append([recipient, p, amount_per_person]) - else: - output += f"{p} not known. Please take care manually\n" - - if record_changes and not dry_run: - changes[recipient].append(change) + source, destination = args[2:4] + if source not in self.balance: + return {'err': f'source "{source}" not known'} - output += "New Balance:\n" - output += create_summary(recipient) - return {'msg': output} + if destination not in self.balance: + return {'err': f'destination "{destination}" not known'} + output = "" + saved_record_changes = self.disable_record_changes() + change = [args] -cmds["split"] = split -cmds["teil"] = split - + ret = self.transaction(sender, ["!zieh", source, amount_raw], "") + if 'err' in ret: + # No changes yet we can fail + return {'err': ret['err']} -def transaction(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if len(args) != 3: - return {'err': f'not in form "{args[0]} amount recipient"'} + output += ret['msg'] + # Sender <- X Source + change.append((sender, source, -amount_cent)) - if not sender in balance: - if sender not in num2name: - return {'err': 'you must register first'} - sender = num2name[sender] + ret = self.transaction(sender, ["!schieb", destination, amount_raw], + "") + err = ret.get('err', None) + if err: + output += err + "\nThe balance may be in a inconsistent state please take care manually" + return {'msg': output} - if args[1] in balance: - recipient, amount = args[1:3] - elif args[2] in balance: - amount, recipient = args[1:3] - else: - return {'err': 'recipient not known'} + output += ret['msg'] + # Sender -> X Destination + change.append((sender, destination, amount_cent)) - if sender == recipient: - return {'err': 'you can not transfer money to or from yourself'} + ret = self.transaction(source, ["!zieh", destination, amount_raw], "") + err = ret.get('err', None) + if err: + output += err + "\nThe balance may be in a inconsistent state please take care manually" + return {'msg': output} - try: - amount = to_cent(amount) - except (ValueError, TypeError): - return {'err': 'amount must be a positive number'} + output += ret['msg'] + # Destination -> X Source + change.append((destination, source, amount_cent)) - if args[0] in ["!zieh", "!nimm"]: - amount *= -1 + self.restore_record_changes(saved_record_changes) + self.may_record_change(sender, change) - if record_changes and not dry_run: - changes[sender].append([args, [sender, recipient, amount]]) + return {'msg': output} - record(sender, recipient, amount) + def cars(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Manage available cars - p_balance = balance[sender][recipient] + List, add, remove or pay a bill for a car. + """ + # list cars + if len(args) < 2 or args[1] in ["ls", "list"]: + if len(self.available_cars) == 0: + return {'msg': 'No cars registered yet.'} - output = ("New Balance: {} {} {} {}\n".format( - sender, ("->" if p_balance > 0 else "<-"), to_euro(abs(p_balance)), - recipient)) - return {'msg': output} + ret_msg = "" + if len(args) > 2: + cars_to_list = args[2:] + else: + cars_to_list = self.available_cars -cmds["schieb"] = transaction -cmds["gib"] = transaction -cmds["zieh"] = transaction -cmds["nimm"] = transaction + for car in cars_to_list: + if car in self.available_cars: + ret_msg += f"{car} - service charge {self.available_cars[car]}ct/km\n" + ret_msg += self.create_summary(car) + "\n" + else: + return {'err': f'"{car}" is no available car\n'} + return {'msg': ret_msg[:-1]} -def transfer(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if len(args) < 4: - return {'err': f'not in form "{args[0]} amount source destination"'} + # add car + if args[1] in ["add", "new"]: + if len(args) < 4: + return { + 'err': + f'not in form "{args[0]} {args[1]} car-name service-charge"' + } - if not sender in num2name: - return {'err': 'you must register first'} + car = args[2] + if car in self.available_cars: + return {'err': f'"{car}" already registered'} - sender = num2name[sender] + if car in self.balance: + return { + 'err': + f'A user named "{car}" already exists. Please use a different name for this car' + } - try: - amount_raw = args[1] - amount_cent = to_cent(amount_raw) - except (ValueError, TypeError): - return {'err': 'amount must be a positive number'} + try: + service_charge = to_cent(args[3]) + except (ValueError, TypeError): + return {'err': 'service-charge must be a positive number'} - source, destination = args[2:4] - if source not in balance: - return {'err': f'source "{source}" not known'} + self.available_cars[car] = service_charge + self.add_to_balance(car) + return {'msg': f'added "{car}" as an available car'} - if destination not in balance: - return {'err': f'destination "{destination}" not known'} + # remove car + if args[1] in ["rm", "remove"]: + if len(args) < 3: + return {'err': f'not in form "{args[0]} {args[1]} car-name"'} - output = "" - global record_changes - saved_record_changes = record_changes - record_changes = False - change = [args] + car = args[2] + if car not in self.available_cars: + return {'err': f'A car with the name "{car}" does not exists'} - ret = transaction(sender, ["!zieh", source, amount_raw], "") - if 'err' in ret: - # No changes yet we can fail - return {'err': ret['err']} + del self.available_cars[car] + self.remove_from_balance(car) + return {'msg': f'removed "{car}" from the available cars'} - output += ret['msg'] - # Sender <- X Source - change.append((sender, source, -amount_cent)) + # pay bill + if args[1] in ["pay"]: + if len(args) < 4: + return { + 'err': f'not in form "{args[0]} {args[1]} car-name amount"' + } - ret = transaction(sender, ["!schieb", destination, amount_raw], "") - err = ret.get('err', None) - if err: - output += err + "\nThe balance may be in a inconsistent state please take care manually" - return {'msg': output} + if not sender in self.num2name: + return {'err': 'you must register first'} - output += ret['msg'] - # Sender -> X Destination - change.append((sender, destination, amount_cent)) + sender_name = self.num2name[sender] - ret = transaction(source, ["!zieh", destination, amount_raw], "") - err = ret.get('err', None) - if err: - output += err + "\nThe balance may be in a inconsistent state please take care manually" - return {'msg': output} + car = args[2] + if car not in self.available_cars: + return {'err': f'car "{car}" not known'} - output += ret['msg'] - # Destination -> X Source - change.append((destination, source, amount_cent)) + try: + amount = to_cent(args[3]) + amount_euro = to_euro(amount) + except (ValueError, TypeError): + return {'err': 'amount must be a positive number'} - record_changes = saved_record_changes - if err is None and record_changes and not dry_run: - changes[sender].append(change) + output = "" - return {'msg': output} + saved_record_changes = self.disable_record_changes() + change = [args] + total_available_charge = 0 + available_charges = [] + for person in self.balance[car]: + _amount = self.balance[car][person] + if _amount < 0: + total_available_charge -= _amount + available_charges.append((person, _amount)) -cmds["transfer"] = transfer + proportion = -1 + if amount < total_available_charge: + proportion = -1 * (amount / total_available_charge) + ret = self.transaction(sender, f'!gib {car} {amount_euro}'.split(), + '') + assert 'err' not in ret + output += f"{sender_name} payed {amount_euro}\n" -def cars(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - # list cars - if len(args) < 2 or args[1] in ["ls", "list"]: - if len(available_cars) == 0: - return {'msg': 'No cars registered yet.'} + # transfer money + output += f"Transferring {proportion * -100:.2f}% of everybody's charges\n" + for person, _amount in available_charges: + if person == sender_name or _amount >= 0: + continue - ret_msg = "" + to_move = int(_amount * proportion) + to_move_euro = to_euro(to_move) + ret = self.transfer(sender, + ['transfer', to_move_euro, car, person], + '') + assert 'err' not in ret - if len(args) > 2: - cars_to_list = args[2:] - else: - cars_to_list = available_cars + output += "Transfer {} from {} to {}\n".format( + to_move_euro, person, sender_name) - for car in cars_to_list: - if car in available_cars: - ret_msg += f"{car} - service charge {available_cars[car]}ct/km\n" - ret_msg += create_summary(car) + "\n" - else: - return {'err': f'"{car}" is no available car\n'} + output += "New Balances:\n" + output += self.create_summary(sender_name) + "\n" + output += self.create_summary(car) - return {'msg': ret_msg[:-1]} + self.restore_record_changes(saved_record_changes) + self.may_record_change(sender_name, change) - # add car - if args[1] in ["add", "new"]: - if len(args) < 4: - return { - 'err': - f'not in form "{args[0]} {args[1]} car-name service-charge"' - } + return {'msg': output} - car = args[2] - if car in available_cars: - return {'err': f'"{car}" already registered'} + return {'err': f'unknown car subcommand "{args[1]}".'} - if car in balance: + def _tanken(self, sender, args, msg) -> dict[str, str]: + """Split a tank across all passengers""" + if len(args) < 2: return { - 'err': - f'A user named "{car}" already exists. Please use a different name for this car' + 'err': f'not in form "{args[0]} amount [person] [car] [info]"' } - try: - service_charge = to_cent(args[3]) + amount = to_cent(args[1]) except (ValueError, TypeError): - return {'err': 'service-charge must be a positive number'} + return {'err': 'amount must be a number'} - available_cars[car] = service_charge - add_to_balance(car) - return {'msg': f'added "{car}" as an available car'} - - # remove car - if args[1] in ["rm", "remove"]: - if len(args) < 3: - return {'err': f'not in form "{args[0]} {args[1]} car-name"'} - - car = args[2] - if car not in available_cars: - return {'err': f'A car with the name "{car}" does not exists'} - - del available_cars[car] - remove_from_balance(car) - return {'msg': f'removed "{car}" from the available cars'} + # find recipient + if len(args) > 2 and args[2] in self.name2num: + recipient = args[2] + elif sender in self.num2name: + recipient = self.num2name[sender] + else: + return {'err': 'recipient unknown'} - # pay bill - if args[1] in ["pay"]: - if len(args) < 4: - return { - 'err': f'not in form "{args[0]} {args[1]} car-name amount"' - } + # find car + car = None + if len(args) > 2 and args[2] in self.available_cars: + car = args[2] + elif len(args) > 3 and args[3] in self.available_cars: + car = args[3] - if not sender in num2name: - return {'err': 'you must register first'} + service_charge = 0 + if car: + service_charge = self.available_cars[car] - sender_name = num2name[sender] + parts, err = tanken.tanken(msg[1:], amount, service_charge) - car = args[2] - if car not in available_cars: - return {'err': f'car "{car}" not known'} + if err: + return {'err': err} - try: - amount = to_cent(args[3]) - amount_euro = to_euro(amount) - except (ValueError, TypeError): - return {'err': 'amount must be a positive number'} + assert parts output = "" - - global record_changes - saved_record_changes = record_changes - record_changes = False change = [args] - - total_available_charge = 0 - available_charges = [] - for person in balance[car]: - _amount = balance[car][person] - if _amount < 0: - total_available_charge -= _amount - available_charges.append((person, _amount)) - - proportion = -1 - if amount < total_available_charge: - proportion = -1 * (amount / total_available_charge) - - ret = transaction(sender, f'!gib {car} {amount_euro}'.split(), '') - assert 'err' not in ret - output += f"{sender_name} payed {amount_euro}\n" - - # transfer money - output += f"Transferring {proportion * -100:.2f}% of everybody's charges\n" - for person, _amount in available_charges: - if person == sender_name or _amount >= 0: + for pname, values in parts.items(): + output += pname + ": {}km = fuel: {}, service charge: {}\n".format( + values["distance"], to_euro(values["cost"]), + to_euro(values["service_charge"])) + # record service charges + if pname not in self.name2num: + output += pname + " not known." + if car: + person_to_charge = pname + if pname not in self.name2num: + person_to_charge = recipient + output += f" {recipient} held accountable for service charge." + + self.record(car, person_to_charge, values["service_charge"]) + change.append( + [car, person_to_charge, values["service_charge"]]) + + # recipient paid the fuel -> don't charge them + if pname == recipient: continue - to_move = int(_amount * proportion) - to_move_euro = to_euro(to_move) - ret = transfer(sender, ['transfer', to_move_euro, car, person], '') - assert 'err' not in ret - - output += "Transfer {} from {} to {}\n".format( - to_move_euro, person, sender_name) - - output += "New Balances:\n" - output += create_summary(sender_name) + "\n" - output += create_summary(car) - - record_changes = saved_record_changes - if record_changes and not dry_run: - changes[sender_name].append(change) - - return {'msg': output} - - return {'err': f'unknown car subcommand "{args[1]}".'} - - -cmds["cars"] = cars - + if pname in self.name2num: + self.record(recipient, pname, values["cost"]) + change.append([recipient, pname, values["cost"]]) + else: + output += " Please collect fuel cost manually\n" -def _tanken(sender, args, msg) -> dict[str, str]: - if len(args) < 2: - return {'err': f'not in form "{args[0]} amount [person] [car] [info]"'} - try: - amount = to_cent(args[1]) - except (ValueError, TypeError): - return {'err': 'amount must be a number'} + self.may_record_change(self.num2name[sender], change) - # find recipient - if len(args) > 2 and args[2] in name2num: - recipient = args[2] - elif sender in num2name: - recipient = num2name[sender] - else: - return {'err': 'recipient unknown'} - - # find car - car = None - if len(args) > 2 and args[2] in available_cars: - car = args[2] - elif len(args) > 3 and args[3] in available_cars: - car = args[3] - - service_charge = 0 - if car: - service_charge = available_cars[car] - - parts, err = tanken.tanken(msg[1:], amount, service_charge) - - if err: - return {'err': err} - - assert parts - - output = "" - change = [args] - for pname, values in parts.items(): - output += pname + ": {}km = fuel: {}, service charge: {}\n".format( - values["distance"], to_euro(values["cost"]), - to_euro(values["service_charge"])) - # record service charges - if pname not in name2num: - output += pname + " not known." + output += "New Balance:\n" + output += self.create_summary(recipient) if car: - person_to_charge = pname - if pname not in name2num: - person_to_charge = recipient - output += f" {recipient} held accountable for service charge." - - record(car, person_to_charge, values["service_charge"]) - change.append([car, person_to_charge, values["service_charge"]]) + output += "\nCar " + output += self.create_summary(car) + return {'msg': output} - # recipient paid the fuel -> don't charge them - if pname == recipient: - continue + def fuck(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Rewind past changes""" + if not sender in self.num2name: + return {'err': 'you must register first'} - if pname in name2num: - record(recipient, pname, values["cost"]) - change.append([recipient, pname, values["cost"]]) - else: - output += " Please collect fuel cost manually\n" + name = self.num2name[sender] - if record_changes and not dry_run: - changes[num2name[sender]].append(change) + nchanges = len(self.changes[name]) + if nchanges == 0: + return {'msg': 'Nothing to rewind'} - output += "New Balance:\n" - output += create_summary(recipient) - if car: - output += "\nCar " - output += create_summary(car) - return {'msg': output} + change_to_rewind = -1 + if len(args) >= 2: + try: + change_to_rewind = int(args[1]) - 1 + except ValueError: + return {'err': 'change to rewind must be a number'} + + if change_to_rewind > nchanges: + return { + 'err': 'change to rewind is bigger than there are changes' + } + + # pop last item + last_changes = self.changes[name].pop(change_to_rewind) + args, last_changes = last_changes[0], last_changes[1:] + + output = name + ": sorry I fucked up!\nRewinding:\n" + output += ' '.join(args) + "\n" + for change in last_changes: + if not change[0] in self.cmds: + output += "{} {} {} {}\n".format( + change[0], ("->" if change[2] < 0 else "<-"), + to_euro(abs(change[2])), change[1]) + self.record(change[1], change[0], change[2]) + + for change in last_changes: + if change[0] in self.cmds: + ret = self.cmds[change[0]](sender, change, "") + if 'err' in ret: + output += "ERROR: " + ret['err'] + else: + output += ret['msg'] -cmds["tanken"] = _tanken + return {'msg': output} + def list_changes(self, sender, args, msg) -> dict[str, str]: + """List changes made by the sender""" + if not sender in self.num2name: + return {'err': 'you must register first'} -def fuck(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if not sender in num2name: - return {'err': 'you must register first'} + sender_name = self.num2name[sender] - name = num2name[sender] + changes_to_list = 5 + if len(args) >= 2: + try: + changes_to_list = int(args[1]) + except ValueError: + return { + 'err': 'the amount of changes to list must be a number' + } - nchanges = len(changes[name]) - if nchanges == 0: - return {'msg': 'Nothing to rewind'} + nchanges = len(self.changes[sender_name]) + if nchanges == 0: + return {'msg': 'Nothing to list'} - change_to_rewind = -1 - if len(args) >= 2: - try: - change_to_rewind = int(args[1]) - 1 - except ValueError: - return {'err': 'change to rewind must be a number'} - - if change_to_rewind > nchanges: - return {'err': 'change to rewind is bigger than there are changes'} - - # pop last item - last_changes = changes[name].pop(change_to_rewind) - args, last_changes = last_changes[0], last_changes[1:] - - output = name + ": sorry I fucked up!\nRewinding:\n" - output += ' '.join(args) + "\n" - for change in last_changes: - if not change[0] in cmds: - output += "{} {} {} {}\n".format(change[0], - ("->" if change[2] < 0 else "<-"), - to_euro(abs(change[2])), - change[1]) - record(change[1], change[0], change[2]) - - for change in last_changes: - if change[0] in cmds: - ret = cmds[change[0]](sender, change, "") + first_to_list = max(nchanges - changes_to_list, 0) - if 'err' in ret: - output += "ERROR: " + ret['err'] - else: - output += ret['msg'] + if len(args) == 3: + try: + first_to_list = int(args[2]) - 1 + except ValueError: + return {'err': 'the first change to list must be a number'} + + if first_to_list > nchanges: + return { + 'err': + 'the first change to list is bigger than there are changes' + } + + msg = "" + i = 0 + for i, change in enumerate(self.changes[sender_name]): + if i < first_to_list: + continue - return {'msg': output} + if i >= first_to_list + changes_to_list: + # i is used to track how many changes we listed + # This change will not be listed so decrement i before extiting the loop. + i -= 1 + break + msg += f'Change {i + 1}:\n' + msg += f'\t{" ".join(change[0])}\n' + for sender, recipient, amount in change[1:]: + msg += "\t{} {} {} {}\n".format(sender, + ("->" if amount < 0 else "<-"), + to_euro(abs(amount)), + recipient) -cmds["fuck"] = fuck -cmds["rewind"] = fuck -cmds["undo"] = fuck + # prepend message header because we want to know how much changes we actually listed + msg = f'Changes from {sender_name} {first_to_list + 1}-{i + 1}\n' + msg + return {'msg': msg} -def list_changes(sender, args, msg) -> dict[str, str]: - if not sender in num2name: - return {'err': 'you must register first'} + def export_state(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Send the state file as attachment""" + if not sender in self.num2name: + return {'err': 'you must register first'} - sender_name = num2name[sender] + msg = f'State from {datetime.now().date().isoformat()}' + return {'msg': msg, 'attachment': STATE_FILE} - changes_to_list = 5 - if len(args) >= 2: - try: - changes_to_list = int(args[1]) - except ValueError: - return {'err': 'the amount of changes to list must be a number'} + def schedule(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Schedule a command for periodic execution""" + if not sender in self.num2name: + return {'err': 'you must register first'} - nchanges = len(changes[sender_name]) - if nchanges == 0: - return {'msg': 'Nothing to list'} + sender_name = self.num2name[sender] - first_to_list = max(nchanges - changes_to_list, 0) + if len(args) < 3: + return {'err': f'not in form "{args[0]} name cmd"'} - if len(args) == 3: - try: - first_to_list = int(args[2]) - 1 - except ValueError: - return {'err': 'the first change to list must be a number'} + name = args[1] + cmd = args[2:] - if first_to_list > nchanges: + if name in self.scheduled_cmds: return { - 'err': - 'the first change to list is bigger than there are changes' + 'err': f'there is already a scheduled command named "{name}"' } - msg = "" - i = 0 - for i, change in enumerate(changes[sender_name]): - if i < first_to_list: - continue - - if i >= first_to_list + changes_to_list: - # i is used to track how many changes we listed - # This change will not be listed so decrement i before extiting the loop. - i -= 1 - break - - msg += f'Change {i + 1}:\n' - msg += f'\t{" ".join(change[0])}\n' - for sender, recipient, amount in change[1:]: - msg += "\t{} {} {} {}\n".format(sender, - ("->" if amount < 0 else "<-"), - to_euro(abs(amount)), recipient) - - # prepend message header because we want to know how much changes we actually listed - msg = f'Changes from {sender_name} {first_to_list + 1}-{i + 1}\n' + msg - - return {'msg': msg} - - -cmds["list-changes"] = list_changes - - -def export_state(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if not sender in num2name: - return {'err': 'you must register first'} - - msg = f'State from {datetime.now().date().isoformat()}' - return {'msg': msg, 'attachment': state_file} - - -cmds["export-state"] = export_state - - -def schedule(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - if not sender in num2name: - return {'err': 'you must register first'} - - sender_name = num2name[sender] - - if len(args) < 3: - return {'err': f'not in form "{args[0]} name cmd"'} - - name = args[1] - cmd = args[2:] - - if name in scheduled_cmds: - return {'err': f'there is already a scheduled command named "{name}"'} - - # Test the command - global dry_run - old_dry_run, dry_run = dry_run, True - - ret = cmds[cmd[0]](sender, cmd, '') - - dry_run = old_dry_run - - if 'err' in ret: - return {'err': 'the command "{}" failed and will not be recorded'} - - scheduled_cmd = { - "schedule": args[0][1:], - "last_time": None, - "sender": sender, - "cmd": cmd - } + # Test the command + saved_dry_run = self.enable_dry_run() + ret = self.cmds[cmd[0]](sender, cmd, '') + self.restore_dry_run(saved_dry_run) - scheduled_cmds[name] = scheduled_cmd - output = 'Recorded the {} command "{}" as "{}"\n'.format( - args[0][1:], ' '.join(cmd), name) + if 'err' in ret: + return {'err': 'the command "{}" failed and will not be recorded'} - output += "Running {} command {} for {} initially\n".format( - scheduled_cmd["schedule"], name, sender_name) - - ret = cmds[cmd[0]](sender, cmd, "") - if 'err' in ret: - output += 'ERROR: ' + ret['err'] - else: - output += ret['msg'] - - changes[sender_name][0].append(["cancel", name]) - - now = datetime.now().date() - scheduled_cmd["last_time"] = now.isoformat() - - return {'msg': output} - - -cmds["weekly"] = schedule -cmds["monthly"] = schedule -cmds["yearly"] = schedule - - -def cancel(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - cmd_name = args[1] - if not cmd_name in scheduled_cmds: - return {'err': f'"{cmd_name}" is not a scheduled command'} - cmd = scheduled_cmds[cmd_name] - - if not cmd["sender"] == sender: - return {'err': 'only the original creator can cancel this command'} - - del scheduled_cmds[cmd_name] - return {'msg': f'Cancelled the {cmd["schedule"]} cmd "{cmd_name}"'} - - -cmds["cancel"] = cancel + scheduled_cmd = { + "schedule": args[0][1:], + "last_time": None, + "sender": sender, + "cmd": cmd + } + self.scheduled_cmds[name] = scheduled_cmd + output = 'Recorded the {} command "{}" as "{}"\n'.format( + args[0][1:], ' '.join(cmd), name) -def thanks(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument - sender_name = num2name.get(sender, sender) - msg = f'You are welcome. It is a pleasure to work with you, {sender_name}.' - nick = None if len(args) == 1 else args[1] - if nick: - msg = f"{msg}\nBut don't call me {nick}." + output += "Running {} command {} for {} initially\n".format( + scheduled_cmd["schedule"], name, sender_name) - return {'msg': msg} - - -cmds["thanks"] = thanks + ret = self.cmds[cmd[0]](sender, cmd, "") + if 'err' in ret: + output += 'ERROR: ' + ret['err'] + else: + output += ret['msg'] + self.changes[sender_name][0].append(["cancel", name]) -def main(): - if len(sys.argv) > 1 and sys.argv[1] in ["-d", "--dry-run"]: - global dry_run - dry_run = True - print("Dry Run no changes will apply!") + now = datetime.now().date() + scheduled_cmd["last_time"] = now.isoformat() - # Read cmds from stdin - for l in sys.stdin.read().splitlines(): - try: - message = json.loads(l)["envelope"] - except: - print(datetime.now(), l, "not valid json") - continue + return {'msg': output} + def cancel(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Cancel a previously scheduled command""" + cmd_name = args[1] + if not cmd_name in self.scheduled_cmds: + return {'err': f'"{cmd_name}" is not a scheduled command'} + cmd = self.scheduled_cmds[cmd_name] + + if not cmd["sender"] == sender: + return {'err': 'only the original creator can cancel this command'} + + del self.scheduled_cmds[cmd_name] + return {'msg': f'Cancelled the {cmd["schedule"]} cmd "{cmd_name}"'} + + def thanks(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument + """Thank geldschieberbot for its loyal service""" + sender_name = self.num2name.get(sender, sender) + msg = f'You are welcome. It is a pleasure to work with you, {sender_name}.' + nick = None if len(args) == 1 else args[1] + if nick: + msg = f"{msg}\nBut don't call me {nick}." + + return {'msg': msg} + + def __init__(self, dry_run=False): + self.dry_run = dry_run # Run without changing the stored state + self.load_state() + self.quiet = False # Run without sending messages + self.record_changes = True # Should changes be recorded + + # Command dispatch table + self.cmds = { + 'reg': self.register, + 'register': self.register, + 'sum': self.summary, + 'summary': self.summary, + 'full-sum': self.full_summary, + 'full-summary': self.full_summary, + 'ls': self.list_users, + 'list': self.list_users, + 'split': self.split, + 'teil': self.split, + 'schieb': self.transaction, + 'gib': self.transaction, + 'zieh': self.transaction, + 'nimm': self.transaction, + 'help': self.usage, + 'usage': self.usage, + 'transfer': self.transfer, + 'cars': self.cars, + 'tanken': self._tanken, + 'fuck': self.fuck, + 'rewind': self.fuck, + 'undo': self.fuck, + 'list-changes': self.list_changes, + 'export-state': self.export_state, + 'weekly': self.schedule, + 'monthly': self.schedule, + 'yearly': self.schedule, + 'cancel': self.cancel, + 'thanks': self.thanks, + } + + def __del__(self): + self.save_state() + + def enable_dry_run(self) -> bool: + """Enable dry run""" + old_value = self.dry_run + self.dry_run = True + return old_value + + def restore_dry_run(self, old_value: bool): + """Restore dry run setting to old value""" + self.dry_run = old_value + + def disable_record_changes(self) -> bool: + """Disable change recording and return previous value""" + old_value = self.record_changes + self.record_changes = False + return old_value + + def restore_record_changes(self, old_value: bool): + """Restore record change setting to old value""" + self.record_changes = old_value + + def may_record_change(self, user: str, change): + """Record a change for a user if change recording is enabled""" + if self.record_changes and not self.dry_run: + self.changes[user].append(change) + + def handle(self, message: dict): + """Parse and respond to a message""" sender_number = message["source"] if not "dataMessage" in message or not message[ "dataMessage"] or not message["dataMessage"]["message"]: - continue - else: - message = message["dataMessage"] - if message["groupInfo"] and message["groupInfo"][ - "groupId"] != group_id: - continue - body = [l.strip() for l in message["message"].lower().splitlines()] + return - if len(body) == 0: - continue + message = message["dataMessage"] + if message["groupInfo"] and message["groupInfo"]["groupId"] != GROUP_ID: + return - args = body[0].split(' ') + body = [l.strip() for l in message["message"].lower().splitlines()] - if args[0].startswith("!"): - cmd = args[0][1:] - if cmd in cmds: - ret = cmds[cmd](sender_number, args, body) - if 'err' in ret: - send(f'ERROR: {ret["err"]}') - else: - if not 'msg' in ret: - print(ret) - send(ret['msg'], attachment=ret.get('attachment', None)) + if len(body) == 0 or not body[0].startswith('!'): + return + + args = body[0].split(' ') + cmd = args[0][1:] + if cmd in self.cmds: + ret = self.cmds[cmd](sender_number, args, body) + if 'err' in ret: + self.send(f'ERROR: {ret["err"]}') else: - send('ERROR: unknown cmd. Enter !help for a list of commands.') + if not 'msg' in ret: + print(ret) + self.send(ret['msg'], attachment=ret.get('attachment', None)) + else: + self.send( + 'ERROR: unknown cmd. Enter !help for a list of commands.') + + self.save_state() - # Handle scheduled commands - global record_changes - record_changes = False + def run_scheduled_cmds(self): + """Progress the scheduled commands""" + self.record_changes = False now = datetime.now().date() week_delta = timedelta(days=7) - for name, cmd in scheduled_cmds.items(): + for name, cmd in self.scheduled_cmds.items(): last_time = cmd["last_time"] if hasattr(date, "fromisoformat"): @@ -945,24 +949,43 @@ def main(): d = d + week_delta if d <= now: - send("Running {} command {} for {} triggered on {}\n". - format(cmd["schedule"], name, num2name[cmd["sender"]], - d.isoformat())) + self.send("Running {} command {} for {} triggered on {}\n". + format(cmd["schedule"], + name, self.num2name[cmd["sender"]], + d.isoformat())) - ret = cmds[cmd["cmd"][0]](cmd["sender"], cmd["cmd"], "") + ret = self.cmds[cmd["cmd"][0]](cmd["sender"], cmd["cmd"], + "") if 'err' in ret: - send("ERROR: " + ret['err']) + self.send("ERROR: " + ret['err']) else: - send(ret['msg'], - attachment=ret.get('attachment', None)) + self.send(ret['msg'], + attachment=ret.get('attachment', None)) cmd["last_time"] = d.isoformat() else: break - with open(state_file, 'w', encoding='utf-8') as f: - json.dump(state, f) + +def main(): + dry_run = len(sys.argv) > 1 and sys.argv[1] in ["-d", "--dry-run"] + if dry_run: + print("Dry Run no changes will apply!") + + bot = Geldschieberbot(dry_run=dry_run) + + # Read cmds from stdin + for l in sys.stdin.read().splitlines(): + try: + message = json.loads(l)["envelope"] + except json.JSONDecodeError: + print(datetime.now(), l, "not valid json") + continue + + bot.handle(message) + + bot.run_scheduled_cmds() if __name__ == "__main__": |
