# Copyright (c) 2017-2023 Florian Fischer. All rights reserved. # # This file is part of geldschieberbot. # # geldschieberbot is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # geldschieberbot is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # geldschieberbot found in the LICENSE file. If not, # see . """Bot to manage a groups finances""" from datetime import date, datetime, timedelta import json import os import typing as T from models import MessageContext, Modification, Change, GeldschieberbotJSONEncoder, Reply from utils import to_euro, to_cent class Geldschieberbot: """ State of the geldschieberbot The state is stored in a central dict for convenient writing to and reading from disk. The state contains: * balance - dict of dicts associating two persons to an amount * name2num, num2name - dicts associating numbers to names and vice versa * cars - dict associating car names to their service charge * scheduled_cmds - dict associating names to cmds, their schedule, and their last execution * changes - dict associating users with their changes * aliases - list of names mapping to multiple other names The central component of the geldschieberbot's state is the balance dictionary assigning a cent amount to each pair of participants (following A and B). The tracked amount represents the money the first component has to provide to the second to even out the balance. If money is transferred from A to B it is expected from B to give the same amount of money back to A. Moving money from A to B is tracked in both balances A->B and B->A. The amount is subtracted from the A->B balance and added to the B->A balance meaning that B has to give A money to even out the balance. """ STATE_KEYS = ("balance", "name2num", "num2name", "available_cars", "scheduled_cmds", "changes", "aliases") def load_state(self, state_path=''): """Load state from disk""" state_path = state_path or self.state_path if os.path.isfile(state_path): with open(state_path, 'r', encoding='utf-8') as state_f: self.state = json.load(state_f) else: self.state = {key: {} for key in self.STATE_KEYS} try: # decode JSON changes self.state['changes'] = { name: [ Change(ch['cmd'], [ Modification(*mod.values()) for mod in ch['modifications'] ], ch['timestamp'], ch['rewind_cmds']) for ch in changes ] for name, changes in self.state['changes'].items() } except (KeyError, TypeError): # convert from old plain changes format if isinstance(list(self.state['changes'].values())[0], list): for name, changes in self.state['changes'].items(): user_changes = [] for ch in changes: try: user_changes.append( Change(ch[0], [ Modification(r, d, a) for r, d, a in ch[1:] ], None)) except (ValueError, TypeError): continue self.state['changes'][name] = user_changes for key in self.STATE_KEYS: # add missing keys to an existsing state setattr(self, key, self.state.setdefault(key, {})) # Do this simply to allow pylint to lint the Geldschieberbot and # prevent false positive 'no-member' errors. self.balance = self.state["balance"] self.name2num = self.state["name2num"] self.num2name = self.state["num2name"] # Workaround old states using the cars key instead of available_cars if 'cars' in self.state: all_cars = self.state['cars'] del self.state['cars'] all_cars.update(self.state['available_cars']) self.state['available_cars'] = all_cars self.available_cars = self.state["available_cars"] self.scheduled_cmds = self.state["scheduled_cmds"] self.changes = self.state["changes"] self.aliases = self.state["aliases"] def save_state(self, state_path=''): """Load state from disk""" state_path = state_path or self.state_path with open(state_path, 'w', encoding='utf-8') as state_file: json.dump(self.state, state_file, cls=GeldschieberbotJSONEncoder) def record(self, recipient: str, donor: str, amount: int) -> T.Optional[Modification]: """Apply changes to the balance""" return self.apply(Modification(recipient, donor, amount)) def apply(self, mod: Modification) -> T.Optional[Modification]: """Apply a single modification to the balance""" # Only change anything if this is not a dry run if self.dry_run: return None self.balance[mod.donor][mod.recipient] += mod.amount self.balance[mod.recipient][mod.donor] -= mod.amount return mod def reverse(self, mod: Modification) -> Modification: """Reverse the effect of a single modification to the balance""" self.balance[mod.donor][mod.recipient] -= mod.amount self.balance[mod.recipient][mod.donor] += mod.amount return Modification(mod.donor, mod.recipient, mod.amount) def create_summary(self, user, include=None) -> str: """Create a summary for a user""" msg = '' cars_summary = "" total = 0 cars_total = 0 p_balances = self.balance[user] # failes if user is not in balance for person in p_balances: if include and not person in include: continue amount = p_balances[person] if amount == 0: continue if person in self.available_cars: cars_total -= amount cars_summary += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' else: total -= amount msg += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n' if not msg: msg = '\tAll fine :)' else: msg += f'\tBalance: {to_euro(total)}' ret_summary = f'{user}:\n{msg}' if cars_summary: cars_summary += f'\tLiability: {to_euro(cars_total)}' ret_summary += f'\n\tCars:\n{cars_summary}' return ret_summary def create_total_summary(self) -> str: """Create a summary of all balances""" msg = 'Summary:' cars_summary = '' for person in self.balance: p_summary = self.create_summary(person) if person in self.available_cars: cars_summary += f'\n{p_summary}' else: msg += f'\n{p_summary}' if cars_summary: msg += f'\nCars:{cars_summary}' return msg def create_members(self) -> str: """Create a list of all group members""" out = "" for member in self.name2num: out += f'{member}: {self.name2num[member]}\n' return out def add_to_balance(self, name: str): """Add a new user balance""" new_balance = {} for member in self.balance: self.balance[member][name] = 0 new_balance[member] = 0 self.balance[name] = new_balance def remove_from_balance(self, name: str): """Remove a user balance""" del self.balance[name] for member in self.balance: del self.balance[member][name] def expand_aliases(self, users: list[str], exclude_users=None) -> list[str]: """Expand any alias Any user name in exclude_users will not be expanded. This is usefull when using aliases with commands implicitly adding a user like !split. """ ret = [] for user in users: if not user in self.aliases and user != 'all': ret.append(user) continue expanded_users = [] if user in self.aliases: expanded_users += self.aliases[user] else: expanded_users += list(self.name2num.keys()) ret.extend([ u for u in expanded_users if not exclude_users or u not in exclude_users ]) return ret @classmethod def create_help(cls) -> str: """Return a help message explaining how to use geldschieberbot""" return """ Usage: send a message starting with '!' followed by a command Commands: ls | list - print all registered members help - print this help message reg name - register the sender with the name: name sum [name] - print summary of specific users full-sum - print summary of all users alias - list registered aliases alias name person [persons] - create an alias for one or multiple persons alias remove name - remove an alias split amount person [persons] - split amount between the sender and persons teil amount person [persons] - split amount between the sender and persons schieb amount recipient [recipients ...] - give money to one or more recipients gib amount recipient [recipients ...] - give money to one or more recipients zieh amount donor [donor ...] - get money from one or more donors nimm amount donor [donor ...] - get money from one or more donors transfer amount source destination - transfer amount of your balance from source to destination cars [cmd] - interact with the available cars cars [list | ls] - list available cars and their service charge cars add car-name service-charge - add new car cars new car-name service-charge - add new car cars remove car-name - remove a car cars pay car-name amount - pay a bill for the specified car tanken amount [person] [car] [info] - calculate fuel costs, service charge and add them to the person's and car's balance respectively fuck [change] - rewind last or specific change list-changes [n] [first] - list the last n changes starting at first export-state - send the state file weekly name cmd - repeat cmd each week monthly name cmd - repeat cmd each month yearly name cmd - repeat cmd each year cancel name - stop repeating cmd Happy Geldschieben! """ def register(self, msg: MessageContext) -> dict[str, str]: """Register a new user""" if len(msg.args) != 2: return {'err': f'not in form "{msg.args[0]} name"'} name = msg.args[1] try: to_cent(name) return {'err': 'pure numerical names are not allowed'} except (ValueError, TypeError): pass if name in self.name2num: return {'err': f'{name} already registered'} if msg.sender: return {'err': f'you are already registered as {msg.sender}'} self.num2name[msg.sender_number] = name self.name2num[name] = msg.sender_number self.add_to_balance(name) # add changes list self.changes[name] = [] return {'msg': f'Happy geldschiebing {name}!'} def summary(self, msg: MessageContext) -> dict[str, str]: """Print summary for one or multiple balances""" if len(msg.args) == 1: if not msg.sender: return {'err': 'You must register first to print your summary'} return {'msg': f'Summary:\n{self.create_summary(msg.sender)}'} out = "Summary:\n" for name in self.expand_aliases(msg.args[1:]): if name in self.name2num or name in self.available_cars: out += self.create_summary(name) + "\n" else: return {'err': f'name "{name}" not registered'} return {'msg': out} def full_summary(self, msg: MessageContext) -> dict[str, str]: """Print a summary of all balances""" if len(msg.args) == 1: return {'msg': self.create_total_summary()} return {'err': f'{msg.args[0][1:]} takes no arguments'} def list_users(self, msg: MessageContext) -> dict[str, str]: # pylint: disable=unused-argument """List all registered users""" return {'msg': self.create_members()} @classmethod def usage(cls, msg: MessageContext) -> dict[str, str]: # pylint: disable=unused-argument """Return the usage""" return {'msg': cls.create_help()} def split(self, msg: MessageContext) -> dict[str, str]: """Split a fixed amount across multiple persons""" if not msg.sender: return {'err': 'you must register first'} if len(msg.args) < 3: return {'err': f'not in form "{msg.args[0]} amount [name]+"'} try: amount = to_cent(msg.args[1]) persons = msg.args[2:] except (ValueError, TypeError): # support !split name amount if len(msg.args) == 3: try: amount = to_cent(msg.args[2]) persons = [msg.args[1]] except (ValueError, TypeError): return {'err': 'amount must be a positive number'} else: return {'err': 'amount must be a positive number'} recipient = msg.sender # exclude the implicit recipient from alias expension persons = self.expand_aliases(persons, exclude_users=[recipient]) # persons + sender npersons = len(persons) + 1 amount_per_person = int(amount / npersons) output = f"Split {to_euro(amount)} between {npersons} -> {to_euro(amount_per_person)} each\n" modifications: list[Modification] = [] for person in persons: if person in self.name2num: if person == recipient: output += ( f'{person}, you will be charged multiple times. ' 'This may not be what you want\n') else: modification = self.record(recipient, person, amount_per_person) if modification: modifications.append(modification) else: output += f"{person} not known. Please take care manually\n" self.may_record_change(recipient, Change(msg.args, modifications, msg.timestamp)) output += "New Balance:\n" output += self.create_summary(recipient) return {'msg': output} def _transaction(self, initiator: str, recipients: list[str], amount: int) -> tuple[str, list[Modification]]: """Record a transaction""" transaction_sum = '' modifications: list[Modification] = [] for recipient in recipients: modification = self.record(initiator, recipient, amount) if modification: modifications.append(modification) p_balance = self.balance[initiator][recipient] transaction_sum += f'{"->" if amount > 0 else "<-"} {recipient} {to_euro(abs(amount))}\n' output = '' if len(recipients) > 1: output += transaction_sum output += 'New Balance:\n' output += self.create_summary(initiator, recipients) else: output = f'New Balance: {initiator} {"->" if p_balance > 0 else "<-"} {to_euro(abs(p_balance))} {recipients[0]}\n' return output, modifications def transaction(self, msg: MessageContext) -> dict[str, str]: """Record a transaction received in a message""" if len(msg.args) < 3: return { 'err': f'not in form "{msg.args[0]} amount recipient [recipient ...]"' } if not msg.sender: return {'err': 'you must register first'} if len(msg.args) == 3: if msg.args[1] in self.balance or msg.args[1] in self.aliases: recipient, _amount = msg.args[1:3] elif msg.args[2] in self.balance or msg.args[2] in self.aliases: _amount, recipient = msg.args[1:3] else: return {'err': 'recipient not known'} recipients = self.expand_aliases([recipient]) else: _amount, recipients = msg.args[1], msg.args[2:] if msg.sender in recipients: return {'err': 'you can not transfer money to or from yourself'} try: amount = to_cent(_amount) except (ValueError, TypeError): return {'err': 'amount must be a positive number'} for recipient in recipients: if recipient not in self.balance: return {'err': f'recipient "{recipient}" not known'} if msg.args[0] in ["!zieh", "!nimm"]: amount *= -1 output, modifications = self._transaction(msg.sender, recipients, amount) self.may_record_change(msg.sender, Change(msg.args, modifications, msg.timestamp)) return {'msg': output} def _transfer(self, sender: str, source: str, destination: str, amount: int) -> tuple[str, list[Modification]]: """Transfer amount from one balance to another""" # Sender <- X Source output, modifications = self._transaction(sender, [source], -amount) # Sender -> X Destination out, modification = self._transaction(sender, [destination], amount) output += out modifications += modification # Destination -> X Source out, modification = self._transaction(source, [destination], -amount) output += out modifications += modification return output, modifications def transfer(self, msg: MessageContext) -> dict[str, str]: """Message handler wrapping the transfer function""" if len(msg.args) < 4: return { 'err': f'not in form "{msg.args[0]} amount source destination"' } if not msg.sender: return {'err': 'you must register first'} try: amount_raw = msg.args[1] amount_cent = to_cent(amount_raw) except (ValueError, TypeError): return {'err': 'amount must be a positive number'} source, destination = msg.args[2:4] if source not in self.balance: return {'err': f'source "{source}" not known'} if destination not in self.balance: return {'err': f'destination "{destination}" not known'} out, modifications = self._transfer(msg.sender, source, destination, amount_cent) self.may_record_change(msg.sender, Change(msg.args, modifications, msg.timestamp)) return {'msg': out} def cars(self, msg: MessageContext) -> dict[str, str]: """Manage available cars List, add, remove or pay a bill for a car. """ # list cars if len(msg.args) < 2 or msg.args[1] in ["ls", "list"]: if len(self.available_cars) == 0: return {'msg': 'No cars registered yet.'} ret_msg = "" if len(msg.args) > 2: cars_to_list = msg.args[2:] else: cars_to_list = self.available_cars for car in cars_to_list: if car in self.available_cars: ret_msg += f"{car} - service charge {self.available_cars[car]}ct/km\n" ret_msg += self.create_summary(car) + "\n" else: return {'err': f'"{car}" is no available car\n'} return {'msg': ret_msg[:-1]} # add car if msg.args[1] in ["add", "new"]: if len(msg.args) < 4: return { 'err': f'not in form "{msg.args[0]} {msg.args[1]} car-name service-charge"' } car = msg.args[2] if car in self.available_cars: return {'err': f'"{car}" already registered'} if car in self.balance: return { 'err': f'A user named "{car}" already exists. Please use a different name for this car' } try: service_charge = to_cent(msg.args[3]) except (ValueError, TypeError): return {'err': 'service-charge must be a positive number'} self.available_cars[car] = service_charge self.add_to_balance(car) return {'msg': f'added "{car}" as an available car'} # remove car if msg.args[1] in ["rm", "remove"]: if len(msg.args) < 3: return { 'err': f'not in form "{msg.args[0]} {msg.args[1]} car-name"' } car = msg.args[2] if car not in self.available_cars: return {'err': f'A car with the name "{car}" does not exists'} del self.available_cars[car] self.remove_from_balance(car) return {'msg': f'removed "{car}" from the available cars'} # pay bill if msg.args[1] in ["pay"]: if len(msg.args) < 4: return { 'err': f'not in form "{msg.args[0]} {msg.args[1]} car-name amount"' } if not msg.sender: return {'err': 'you must register first'} car = msg.args[2] # print(self.state, self.available_cars) if car not in self.available_cars: return {'err': f'car "{car}" not known'} try: amount = to_cent(msg.args[3]) amount_euro = to_euro(amount) except (ValueError, TypeError): return {'err': 'amount must be a positive number'} output = "" total_available_charge = 0 available_charges = [] for person in self.balance[car]: _amount = self.balance[car][person] if _amount < 0: total_available_charge -= _amount available_charges.append((person, _amount)) proportion = -1.0 if amount < total_available_charge: proportion = -1 * (amount / total_available_charge) _, modifications = self._transaction(msg.sender, [car], amount) output += f"{msg.sender} payed {amount_euro}\n" # transfer money output += f"Transferring {proportion * -100:.2f}% of everybody's charges\n" for person, _amount in available_charges: if person == msg.sender or _amount >= 0: continue to_move = int(_amount * proportion) to_move_euro = to_euro(to_move) _, modification = self._transfer(msg.sender, car, person, to_move) modifications += modification output += f'Transfer {to_move_euro} from {person} to {msg.sender}\n' output += "New Balances:\n" output += self.create_summary(msg.sender) + "\n" output += self.create_summary(car) self.may_record_change( msg.sender, Change(msg.args, modifications, msg.timestamp)) return {'msg': output} return {'err': f'unknown car subcommand "{msg.args[1]}".'} def parse_tank_bill(self, _drives: list[str], fuel_charge: int, service_charge: int): """Parse and distribute the tank bill across all passangers""" passengers: dict[str, dict[str, int]] = {} distance = 0. drives = [d.split(' ') for d in _drives] for drive in drives: try: drive_distance = int(drive[0]) except (IndexError, ValueError): return None, "Lines have to start with the driven distance!" # calculate overall distance distance += drive_distance # collect distances per passenger drive_passengers = self.expand_aliases(drive[1:]) for passenger in drive_passengers: passengers.setdefault(passenger, { "distance": 0, "cost": 0, "service_charge": 0 })["distance"] += drive_distance # calculate cost per kilometer if distance <= 0: return None, "Driven distance must be greater than 0!" c_km = fuel_charge / distance for drive in drives: drive_distance = int(drive[0]) drive_passengers = self.expand_aliases(drive[1:]) # calculate cost per drive split among passengers c_d = int(c_km * drive_distance / (len(drive_passengers))) sc_d = int(service_charge * drive_distance / (len(drive_passengers))) for passenger in drive_passengers: passengers[passenger]["cost"] += c_d passengers[passenger]["service_charge"] += sc_d return passengers, None def tanken(self, msg: MessageContext) -> dict[str, str]: """Split a tank across all passengers""" if len(msg.args) < 2: return { 'err': f'not in form "{msg.args[0]} amount [person] [car] [info]"' } try: amount = to_cent(msg.args[1]) except (ValueError, TypeError): return {'err': 'amount must be a number'} # find recipient if len(msg.args) > 2 and msg.args[2] in self.name2num: recipient = msg.args[2] elif msg.sender: recipient = msg.sender else: return {'err': 'recipient unknown'} # find car car = None if len(msg.args) > 2 and msg.args[2] in self.available_cars: car = msg.args[2] elif len(msg.args) > 3 and msg.args[3] in self.available_cars: car = msg.args[3] service_charge = self.available_cars.get(car, 0) parts, err = self.parse_tank_bill(msg.body[1:], amount, service_charge) if err: return {'err': err} assert parts output = "" modifications: list[Modification] = [] for pname, values in parts.items(): output += f'{pname}: {values["distance"]}km = fuel: {to_euro(values["cost"])}, service charge: {to_euro(values["service_charge"])}\n' # record service charges if pname not in self.name2num: output += f'{pname} not known.' if car: person_to_charge = pname if pname not in self.name2num: person_to_charge = recipient output += f" {recipient} held accountable for service charge." modification = self.record(car, person_to_charge, values["service_charge"]) if modification: modifications.append(modification) # recipient paid the fuel -> don't charge them if pname == recipient: continue if pname in self.name2num: modification = self.record(recipient, pname, values["cost"]) if modification: modifications.append(modification) else: output += " Please collect fuel cost manually\n" if msg.sender: self.may_record_change( msg.sender, Change(msg.args, modifications, msg.timestamp)) output += "New Balance:\n" output += self.create_summary(recipient) if car: output += "\nCar " output += self.create_summary(car) return {'msg': output} def fuck(self, msg: MessageContext) -> dict[str, str]: """Rewind past changes""" if not msg.sender: return {'err': 'you must register first'} name = msg.sender nchanges = len(self.changes[name]) if nchanges == 0: return {'msg': 'Nothing to rewind'} change_to_rewind = -1 if len(msg.args) >= 2: try: change_to_rewind = int(msg.args[1]) - 1 except ValueError: return {'err': 'change to rewind must be a number'} if change_to_rewind > nchanges: return { 'err': 'change to rewind is bigger than there are changes' } # pop last item change = self.changes[name].pop(change_to_rewind) output = name + ": sorry I fucked up!\nRewinding:\n" output += ' '.join(change.cmd) + "\n" for mod in change.modifications: output += f'{self.reverse(mod).out_string()}\n' for cmd_args in change.rewind_cmds: ret = self.cmds[cmd_args[0]](MessageContext( msg.sender_number, msg.sender, cmd_args, [''.join(cmd_args)], msg.timestamp)) if 'err' in ret: output += "ERROR: " + ret['err'] else: output += ret['msg'] return {'msg': output} def list_changes(self, msg: MessageContext) -> dict[str, str]: """List changes made by the sender""" if not msg.sender: return {'err': 'you must register first'} changes_to_list = 5 if len(msg.args) >= 2: try: changes_to_list = int(msg.args[1]) except ValueError: return { 'err': 'the amount of changes to list must be a number' } nchanges = len(self.changes[msg.sender]) if nchanges == 0: return {'msg': 'Nothing to list'} first_to_list = max(nchanges - changes_to_list, 0) if len(msg.args) == 3: try: first_to_list = int(msg.args[2]) - 1 except ValueError: return {'err': 'the first change to list must be a number'} if first_to_list > nchanges: return { 'err': 'the first change to list is bigger than there are changes' } out = "" i = 0 for i, change in enumerate(self.changes[msg.sender]): if i < first_to_list: continue if i >= first_to_list + changes_to_list: # i is used to track how many changes we listed # This change will not be listed so decrement i before extiting the loop. i -= 1 break out += f'Change {i + 1}:\n' out += f'\t{" ".join(change.cmd)}\n' for mod in change.modifications: out += f'\t{mod.in_string()}\n' # prepend message header because we want to know how much changes we actually listed out = f'Changes from {msg.sender} {first_to_list + 1}-{i + 1}\n' + out return {'msg': out} def export_state(self, msg: MessageContext) -> dict[str, str]: """Send the state file as attachment""" if not msg.sender: return {'err': 'you must register first'} out = f'State from {datetime.now().date().isoformat()}' return {'msg': out, 'attachment': self.state_path} def schedule(self, msg: MessageContext) -> dict[str, str]: """Schedule a command for periodic execution""" if not msg.sender: return {'err': 'you must register first'} if len(msg.args) < 3: return {'err': f'not in form "{msg.args[0]} name cmd"'} name = msg.args[1] cmd = msg.args[2:] if not cmd[0] in self.cmds: return { 'err': f'the command "{name}" can not be registered because "{cmd[0]}" is unknown' } initial_commad_msg = MessageContext(msg.sender_number, msg.sender, cmd, [], msg.timestamp) if name in self.scheduled_cmds: return { 'err': f'there is already a scheduled command named "{name}"' } # Test the command saved_dry_run = self.enable_dry_run() ret = self.cmds[cmd[0]](initial_commad_msg) self.restore_dry_run(saved_dry_run) if 'err' in ret: return { 'err': f'the command "{name}" failed with "{ret["err"]}" and will not be recorded' } scheduled_cmd = { "schedule": msg.args[0][1:], "last_time": None, "sender": msg.sender, "cmd": cmd } self.scheduled_cmds[name] = scheduled_cmd output = f'Recorded the {msg.args[0][1:]} command "{" ".join(cmd)}" as "{name}"\n' output += f'Running {scheduled_cmd["schedule"]} command {name} for {msg.sender} initially\n' previous_change_count = len(self.changes[msg.sender]) ret = self.cmds[cmd[0]](initial_commad_msg) if 'err' in ret: output += 'ERROR: ' + ret['err'] else: output += ret['msg'] # The executed command did not create a new Change # -> create a dummy change to cancel the scheduled command if previous_change_count == len(self.changes[msg.sender]): self.changes[msg.sender].append(Change(cmd, [], msg.timestamp)) self.changes[msg.sender][-1].rewind_cmds.append(["cancel", name]) now = datetime.now().date() scheduled_cmd["last_time"] = now.isoformat() return {'msg': output} def cancel(self, msg: MessageContext) -> dict[str, str]: """Cancel a previously scheduled command""" cmd_name = msg.args[1] if not cmd_name in self.scheduled_cmds: return {'err': f'"{cmd_name}" is not a scheduled command'} cmd = self.scheduled_cmds[cmd_name] if not cmd["sender"] == msg.sender: return {'err': 'only the original creator can cancel this command'} del self.scheduled_cmds[cmd_name] return {'msg': f'Cancelled the {cmd["schedule"]} cmd "{cmd_name}"'} @classmethod def thanks(cls, msg: MessageContext) -> dict[str, str]: """Thank geldschieberbot for its loyal service""" sender_name = msg.sender or msg.sender_number out = f'You are welcome. It is a pleasure to work with you, {sender_name}.' nick = None if len(msg.args) == 1 else msg.args[1] if nick: out = f"{out}\nBut don't call me {nick}." return {'msg': out} def alias(self, msg: MessageContext) -> dict[str, str]: """List or create aliases""" if len(msg.args) == 1: out = '' for alias, users in self.aliases.items(): out += f'\n\t{alias}: {" ".join(users)}' return {'msg': f'Aliases:\n\tall: {" ".join(self.name2num)}{out}'} if len(msg.args) == 2: return {'err': 'Not a valid alias command'} if msg.args[1] == 'remove': alias = msg.args[2] if alias not in self.aliases: return {'err': f'Alias "{alias}" not registered'} del self.aliases[alias] return {'msg': f'Alias "{alias}" removed'} # create a new alias alias = msg.args[1] if alias in self.aliases or alias == 'all': return {'err': f'Alias "{alias}" is already registered'} if alias in self.name2num: return {'err': f'A user "{alias}" is already registered'} try: to_cent(alias) return {'err': 'Pure numerical aliases are not allowed'} except (ValueError, TypeError): pass users = msg.args[2:] for user in users: if user not in self.name2num: return {'err': f'User {user} is not registered'} self.aliases[alias] = users return {'msg': f'New alias "{alias}" registered'} def __init__(self, state_path, group_id, dry_run=False): """Create a new Geldschieberbot""" self.state_path = state_path self.group_id = group_id self.dry_run = dry_run # Run without changing the stored state self.record_changes = True # Should changes be recorded self.load_state() # Command dispatch table self.cmds = { 'reg': self.register, 'register': self.register, 'alias': self.alias, 'sum': self.summary, 'summary': self.summary, 'balance': self.summary, 'full-sum': self.full_summary, 'full-summary': self.full_summary, 'ls': self.list_users, 'list': self.list_users, 'split': self.split, 'teil': self.split, 'schieb': self.transaction, 'gib': self.transaction, 'zieh': self.transaction, 'nimm': self.transaction, 'help': self.usage, 'usage': self.usage, 'transfer': self.transfer, 'cars': self.cars, 'tanken': self.tanken, 'fuck': self.fuck, 'rewind': self.fuck, 'undo': self.fuck, 'list-changes': self.list_changes, 'export-state': self.export_state, 'weekly': self.schedule, 'monthly': self.schedule, 'yearly': self.schedule, 'cancel': self.cancel, 'thanks': self.thanks, } def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): if not exc_type: self.save_state() def enable_dry_run(self) -> bool: """Enable dry run""" old_value = self.dry_run self.dry_run = True return old_value def restore_dry_run(self, old_value: bool): """Restore dry run setting to old value""" self.dry_run = old_value def disable_record_changes(self) -> bool: """Disable change recording and return previous value""" old_value = self.record_changes self.record_changes = False return old_value def restore_record_changes(self, old_value: bool): """Restore record change setting to old value""" self.record_changes = old_value def may_record_change(self, user: str, change: Change): """Record a change for a user if change recording is enabled""" if self.record_changes and not self.dry_run: self.changes[user].append(change) def handle(self, envelope: dict) -> list[Reply]: """Parse and respond to a message""" sender_number = envelope["source"] if not "dataMessage" in envelope or not envelope[ "dataMessage"] or not envelope["dataMessage"]["message"]: return [] message = envelope["dataMessage"] if message["groupInfo"] and message["groupInfo"][ "groupId"] != self.group_id: return [] body = [l.strip() for l in message["message"].lower().splitlines()] if len(body) == 0 or not body[0].startswith('!'): return [] replies: list[Reply] = [] args = body[0].split(' ') msg_context = MessageContext(sender_number, self.num2name.get(sender_number, None), args, body, message['timestamp']) cmd = args[0][1:] if cmd in self.cmds: ret = self.cmds[cmd](msg_context) if 'err' in ret: replies.append(Reply(f'ERROR: {ret["err"]}', None)) else: if not 'msg' in ret: print(ret) replies.append(Reply(ret['msg'], ret.get('attachment', None))) else: replies.append( Reply( 'ERROR: unknown cmd. Enter !help for a list of commands.', None)) self.save_state() return replies def run_scheduled_cmds(self) -> list[Reply]: """Progress the scheduled commands""" replies: list[Reply] = [] self.record_changes = False now = datetime.now().date() week_delta = timedelta(days=7) for name, scheduled_cmd in self.scheduled_cmds.items(): last_time, interval = scheduled_cmd["last_time"], scheduled_cmd[ "schedule"] if hasattr(date, "fromisoformat"): last_time = date.fromisoformat(last_time) else: last_time = date(*map(int, last_time.split("-"))) d = last_time while True: if interval == "yearly": d = date(d.year + 1, d.month, d.day) elif interval == "monthly": if d.day > 28: d = date(d.year, d.month, 28) if d.month == 12: d = date(d.year + 1, 1, d.day) else: d = date(d.year, d.month + 1, d.day) else: d = d + week_delta if d <= now: cmd_args, runas = scheduled_cmd['cmd'], scheduled_cmd[ 'sender'] runas_name = self.num2name.get(runas, runas) out = f'Running {interval} command {name} for {runas_name} triggered on {d.isoformat()}' # TODO: use d as timestamp cmd_ctx = MessageContext(runas, runas_name, cmd_args, [], '') ret = self.cmds[cmd_args[0]](cmd_ctx) if 'err' in ret: replies.append( Reply(f'{out}\nERROR: {ret["err"]}', None)) else: replies.append( Reply(f'{out}\n{ret["msg"]}', ret.get('attachment', None))) scheduled_cmd["last_time"] = d.isoformat() else: break return replies