aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--geldschieberbot.py1511
1 files changed, 767 insertions, 744 deletions
diff --git a/geldschieberbot.py b/geldschieberbot.py
index f21b7d6..2dab9f2 100644
--- a/geldschieberbot.py
+++ b/geldschieberbot.py
@@ -8,62 +8,18 @@ import subprocess
import sys
import tanken
+
# Path where our data is stored persistent on disk
-state_file = os.environ["GSB_STATE_FILE"]
-
-if os.path.isfile(state_file):
- with open(state_file, 'r', encoding='utf-8') as state_f:
- state = json.load(state_f)
-else:
- # Dict containing the whole state of geldschieberbot
- # balance - dict of dicts associating two persons to an amount
- # name2num, num2name - dicts associating numbers to names and vice versa
- # cars - dict associating car names to their service charge
- # scheduled_cmds - dict associating names to cmds, their schedule, and the last execution
- # changes - dict associating users with their changes
- state = {
- "balance": {},
- "name2num": {},
- "num2name": {},
- "cars": {},
- "scheduled_cmds": {},
- "changes": {},
- }
-
-balance = state["balance"]
-name2num = state["name2num"]
-num2name = state["num2name"]
-available_cars = state["cars"]
-scheduled_cmds = state["scheduled_cmds"]
-changes = state["changes"]
-
-group_id = os.environ["GSB_GROUP_ID"]
-
-send_cmd = os.environ["GSB_SEND_CMD"]
-group_send_cmd = send_cmd + group_id
-
-dry_run = False
-"""Run without changing the stored state"""
-
-quiet = False
-"""Run without sending messages"""
-
-record_changes = True
-"""Should changes be recorded"""
-
-
-def record(recipient, donor, amount):
- """Apply changes to the balance"""
-
- # Only change anything if this is not a dry run
- if dry_run:
- return
+STATE_FILE = os.environ["GSB_STATE_FILE"]
- balance[donor][recipient] += amount
- balance[recipient][donor] -= amount
+GROUP_ID = os.environ["GSB_GROUP_ID"]
+
+SEND_CMD = os.environ["GSB_SEND_CMD"]
+GROUP_SEND_CMD = SEND_CMD + GROUP_ID
def to_cent(euro):
+ """Parse string containing euros into a cent value"""
if '.' in euro:
euro = euro.split('.')
else:
@@ -85,844 +41,892 @@ def to_cent(euro):
def to_euro(cents):
+ """Format cents as euro"""
return f"{cents/100:.2f}"
-def send(msg, attachment=None, cmd=send_cmd):
- if not quiet:
- if attachment:
- cmd += f' -a {attachment}'
- subprocess.run(cmd.split(' '), input=msg.encode(), check=False)
-
+class Geldschieberbot:
+ """
+ State of the geldschieberbot
+ """
-def create_summary(user):
- msg = ''
- cars_summary = ""
- total = 0
- cars_total = 0
- p_balances = balance[user] # failes if user is not in balance
- for person in p_balances:
- amount = p_balances[person]
- if amount == 0:
- continue
- if person in available_cars:
- cars_total -= amount
- cars_summary += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n'
+ def load_state(self, state_path=STATE_FILE):
+ """Load state from disk"""
+ if os.path.isfile(state_path):
+ with open(state_path, 'r', encoding='utf-8') as state_f:
+ self.state = json.load(state_f)
else:
- total -= amount
- msg += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n'
-
- if not msg:
- msg = '\tAll fine :)'
- else:
- msg += f'\tBalance: {to_euro(total)}'
-
- ret_summary = f'{user}:\n{msg}'
-
- if cars_summary:
- cars_summary += f'\tLiability: {to_euro(cars_total)}'
- ret_summary += f'\n\tCars:\n{cars_summary}'
-
- return ret_summary
-
+ # Dict containing the whole state of geldschieberbot
+ # balance - dict of dicts associating two persons to an amount
+ # name2num, num2name - dicts associating numbers to names and vice versa
+ # cars - dict associating car names to their service charge
+ # scheduled_cmds - dict associating names to cmds, their schedule, and the last execution
+ # changes - dict associating users with their changes
+ self.state = {
+ "balance": {},
+ "name2num": {},
+ "num2name": {},
+ "cars": {},
+ "scheduled_cmds": {},
+ "changes": {},
+ }
-def create_total_summary() -> str:
- msg = 'Summary:'
+ self.balance = self.state["balance"]
+ self.name2num = self.state["name2num"]
+ self.num2name = self.state["num2name"]
+ self.available_cars = self.state["cars"]
+ self.scheduled_cmds = self.state["scheduled_cmds"]
+ self.changes = self.state["changes"]
+
+ def save_state(self, state_path=STATE_FILE):
+ """Load state from disk"""
+ with open(state_path, 'w', encoding='utf-8') as f:
+ json.dump(self.state, f)
+
+ def record(self, recipient, donor, amount):
+ """Apply changes to the balance"""
+
+ # Only change anything if this is not a dry run
+ if self.dry_run:
+ return
+
+ self.balance[donor][recipient] += amount
+ self.balance[recipient][donor] -= amount
+
+ def send(self, msg, attachment=None, cmd=SEND_CMD):
+ """Send a message with optional attachment"""
+ if not self.quiet:
+ if attachment:
+ cmd += f' -a {attachment}'
+ subprocess.run(cmd.split(' '), input=msg.encode(), check=False)
+
+ def create_summary(self, user) -> str:
+ """Create a summary for a user"""
+ msg = ''
+ cars_summary = ""
+ total = 0
+ cars_total = 0
+ p_balances = self.balance[user] # failes if user is not in balance
+ for person in p_balances:
+ amount = p_balances[person]
+ if amount == 0:
+ continue
+ if person in self.available_cars:
+ cars_total -= amount
+ cars_summary += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n'
+ else:
+ total -= amount
+ msg += f'\t{"<-" if amount < 0 else "->"} {person} {to_euro(abs(amount))}\n'
- cars_summary = ''
- for person in balance:
- p_summary = create_summary(person)
- if person in available_cars:
- cars_summary += f'\n{p_summary}'
+ if not msg:
+ msg = '\tAll fine :)'
else:
- msg += f'\n{p_summary}'
+ msg += f'\tBalance: {to_euro(total)}'
- if cars_summary:
- msg += f'\nCars:{cars_summary}'
- return msg
+ ret_summary = f'{user}:\n{msg}'
+ if cars_summary:
+ cars_summary += f'\tLiability: {to_euro(cars_total)}'
+ ret_summary += f'\n\tCars:\n{cars_summary}'
-def create_members() -> str:
- r = ""
- for m in name2num:
- r += m + ": " + name2num[m] + "\n"
- return r
+ return ret_summary
+ def create_total_summary(self) -> str:
+ """Create a summary of all balances"""
+ msg = 'Summary:'
-def add_to_balance(name):
- nb = {}
- for m in balance:
- balance[m][name] = 0
- nb[m] = 0
- balance[name] = nb
-
-
-def remove_from_balance(name):
- del balance[name]
- for m in balance:
- del balance[m][name]
-
-
-def create_help():
- return """
-Usage: send a message starting with '!' followed by a command
-Commands:
-ls | list - print all registered members
-help - print this help message
-reg name - register the sender with the name: name
-sum [name] - print summary of specific users
-full-sum - print summary of all users
-
-split amount person [persons] - split amount between the sender and persons
-teil amount person [persons] - split amount between the sender and persons
-
-schieb amount recipient - give money to recipient
-gib amount recipient - give money to recipient
-zieh amount donor - get money from donor
-nimm amount donor - get money from donor
-
-transfer amount source destination - transfer amount of your balance from source to destination
+ cars_summary = ''
+ for person in self.balance:
+ p_summary = self.create_summary(person)
+ if person in self.available_cars:
+ cars_summary += f'\n{p_summary}'
+ else:
+ msg += f'\n{p_summary}'
+
+ if cars_summary:
+ msg += f'\nCars:{cars_summary}'
+ return msg
+
+ def create_members(self) -> str:
+ """Create a list of all group members"""
+ r = ""
+ for m in self.name2num:
+ r += m + ": " + self.name2num[m] + "\n"
+ return r
+
+ def add_to_balance(self, name):
+ """Add a new user balance"""
+ nb = {}
+ for m in self.balance:
+ self.balance[m][name] = 0
+ nb[m] = 0
+ self.balance[name] = nb
+
+ def remove_from_balance(self, name):
+ """Remove a user balance"""
+ del self.balance[name]
+ for m in self.balance:
+ del self.balance[m][name]
+
+ @classmethod
+ def create_help(cls):
+ return """
+ Usage: send a message starting with '!' followed by a command
+ Commands:
+ ls | list - print all registered members
+ help - print this help message
+ reg name - register the sender with the name: name
+ sum [name] - print summary of specific users
+ full-sum - print summary of all users
+
+ split amount person [persons] - split amount between the sender and persons
+ teil amount person [persons] - split amount between the sender and persons
+
+ schieb amount recipient - give money to recipient
+ gib amount recipient - give money to recipient
+ zieh amount donor - get money from donor
+ nimm amount donor - get money from donor
+
+ transfer amount source destination - transfer amount of your balance from source to destination
+
+ cars [cmd] - interact with the available cars
+ cars [list | ls] - list available cars and their service charge
+ cars add car-name service-charge - add new car
+ cars new car-name service-charge - add new car
+ cars remove car-name - remove new car
+ cars pay car-name amount - pay a bill for the specified car
+
+ tanken amount [person] [car] [info] - calculate fuel costs, service charge and add them to the person's and car's balance respectively
+
+ fuck [change] - rewind last or specific change
+ list-changes [n] [first] - list the last n changes starting at first
+
+ export-state - send the state file
+
+ weekly name cmd - repeat cmd each week
+ monthly name cmd - repeat cmd each month
+ yearly name cmd - repeat cmd each year
+ cancel name - stop repeating cmd
+
+ Happy Geldschieben!
+ """
+
+ def register(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Register a new user"""
+ if len(args) != 2:
+ return {'err': f'not in form "{args[0]} name"'}
+ name = args[1]
-cars [cmd] - interact with the available cars
-cars [list | ls] - list available cars and their service charge
-cars add car-name service-charge - add new car
-cars new car-name service-charge - add new car
-cars remove car-name - remove new car
-cars pay car-name amount - pay a bill for the specified car
+ try:
+ to_cent(name)
+ return {'err': 'pure numerical names are not allowed'}
+ except (ValueError, TypeError):
+ pass
-tanken amount [person] [car] [info] - calculate fuel costs, service charge and add them to the person's and car's balance respectively
+ if name in self.name2num:
+ return {'err': f'{name} already registered'}
-fuck [change] - rewind last or specific change
-list-changes [n] [first] - list the last n changes starting at first
+ if sender in self.num2name:
+ return {'err': 'you are already registered'}
-export-state - send the state file
+ self.num2name[sender] = name
+ self.name2num[name] = sender
-weekly name cmd - repeat cmd each week
-monthly name cmd - repeat cmd each month
-yearly name cmd - repeat cmd each year
-cancel name - stop repeating cmd
+ self.add_to_balance(name)
-Happy Geldschieben!
-"""
+ # add changes list
+ self.changes[name] = []
+ return {'msg': f'Happy geldschiebing {name}!'}
+ def summary(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Print summary for one or multiple balances"""
+ if len(args) == 1:
+ if not sender in self.num2name:
+ return {'err': 'You must register first to print your summary'}
+ name = self.num2name[sender]
+ return {'msg': f'Summary:\n{self.create_summary(name)}'}
-cmds = {}
+ msg = "Summary:\n"
+ for name in args[1:]:
+ if name in self.name2num or name in self.available_cars:
+ msg += self.create_summary(name) + "\n"
+ else:
+ return {'err': f'name "{name}" not registered'}
+ return {'msg': msg}
+ def full_summary(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Print a summary of all balances"""
+ if len(args) == 1:
+ return {'msg': self.create_total_summary()}
+ else:
+ return {'err': f'{args[0][1:]} takes no arguments'}
-def register(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if len(args) != 2:
- return {'err': f'not in form "{args[0]} name"'}
- name = args[1]
+ def list_users(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """List all registered users"""
+ return {'msg': self.create_members()}
- try:
- to_cent(name)
- return {'err': 'pure numerical names are not allowed'}
- except (ValueError, TypeError):
- pass
+ @classmethod
+ def usage(cls, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Return the usage"""
+ return {'msg': cls.create_help()}
- if name in name2num:
- return {'err': f'{name} already registered'}
+ def split(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Split a fixed amount across multiple persons"""
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- if sender in num2name:
- return {'err': 'you are already registered'}
+ if len(args) < 3:
+ return {'err': f'not in form "{args[0]} amount [name]+"'}
- num2name[sender] = name
- name2num[name] = sender
+ try:
+ amount = to_cent(args[1])
+ persons = args[2:]
+ except (ValueError, TypeError):
+ # support !split name amount
+ if len(args) == 3:
+ try:
+ amount = to_cent(args[2])
+ persons = [args[1]]
+ except (ValueError, TypeError):
+ return {'err': 'amount must be a positive number'}
+ else:
+ return {'err': 'amount must be a positive number'}
- add_to_balance(name)
+ recipient = self.num2name[sender]
+ # persons + sender
+ npersons = len(persons) + 1
+ amount_per_person = int(amount / npersons)
- # add changes list
- changes[name] = []
- return {'msg': f'Happy geldschiebing {name}!'}
+ output = f"Split {to_euro(amount)} between {npersons} -> {to_euro(amount_per_person)} each\n"
+ change = [args]
+ for p in persons:
+ if p in self.name2num:
+ if p == recipient:
+ output += (f'{p}, you will be charged multiple times. '
+ 'This may not be what you want\n')
+ else:
+ self.record(recipient, p, amount_per_person)
+ change.append([recipient, p, amount_per_person])
+ else:
+ output += f"{p} not known. Please take care manually\n"
+ self.may_record_change(recipient, change)
-cmds["reg"] = register
-cmds["register"] = register
+ output += "New Balance:\n"
+ output += self.create_summary(recipient)
+ return {'msg': output}
+ def transaction(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Record a transaction"""
+ if len(args) != 3:
+ return {'err': f'not in form "{args[0]} amount recipient"'}
-def summary(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if len(args) == 1:
- if not sender in num2name:
- return {'err': 'You must register first to print your summary'}
- name = num2name[sender]
- return {'msg': f'Summary:\n{create_summary(name)}'}
+ if not sender in self.balance:
+ if sender not in self.num2name:
+ return {'err': 'you must register first'}
+ sender = self.num2name[sender]
- msg = "Summary:\n"
- for name in args[1:]:
- if name in name2num or name in available_cars:
- msg += create_summary(name) + "\n"
+ if args[1] in self.balance:
+ recipient, amount = args[1:3]
+ elif args[2] in self.balance:
+ amount, recipient = args[1:3]
else:
- return {'err': f'name "{name}" not registered'}
- return {'msg': msg}
-
-
-cmds["sum"] = summary
-cmds["summary"] = summary
-
-
-def full_summary(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if len(args) == 1:
- return {'msg': create_total_summary()}
- else:
- return {'err': f'{args[0][1:]} takes no arguments'}
-
-
-cmds["full-sum"] = full_summary
-cmds["full-summary"] = full_summary
-
+ return {'err': 'recipient not known'}
-def list_users(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- return {'msg': create_members()}
+ if sender == recipient:
+ return {'err': 'you can not transfer money to or from yourself'}
+ try:
+ amount = to_cent(amount)
+ except (ValueError, TypeError):
+ return {'err': 'amount must be a positive number'}
-cmds["ls"] = list_users
-cmds["list"] = list_users
+ if args[0] in ["!zieh", "!nimm"]:
+ amount *= -1
+ self.may_record_change(sender, [args, [sender, recipient, amount]])
-def usage(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- return {'msg': create_help()}
+ self.record(sender, recipient, amount)
+ p_balance = self.balance[sender][recipient]
-cmds["help"] = usage
-cmds["usage"] = usage
+ output = ("New Balance: {} {} {} {}\n".format(
+ sender, ("->" if p_balance > 0 else "<-"), to_euro(abs(p_balance)),
+ recipient))
+ return {'msg': output}
+ def transfer(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Transfer amount from one balance to another"""
+ if len(args) < 4:
+ return {
+ 'err': f'not in form "{args[0]} amount source destination"'
+ }
-def split(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if not sender in num2name:
- return {'err': 'you must register first'}
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- if len(args) < 3:
- return {'err': f'not in form "{args[0]} amount [name]+"'}
+ sender = self.num2name[sender]
- try:
- amount = to_cent(args[1])
- persons = args[2:]
- except (ValueError, TypeError):
- # support !split name amount
- if len(args) == 3:
- try:
- amount = to_cent(args[2])
- persons = [args[1]]
- except (ValueError, TypeError):
- return {'err': 'amount must be a positive number'}
- else:
+ try:
+ amount_raw = args[1]
+ amount_cent = to_cent(amount_raw)
+ except (ValueError, TypeError):
return {'err': 'amount must be a positive number'}
- recipient = num2name[sender]
- # persons + sender
- npersons = len(persons) + 1
- amount_per_person = int(amount / npersons)
-
- output = f"Split {to_euro(amount)} between {npersons} -> {to_euro(amount_per_person)} each\n"
- change = [args]
- for p in persons:
- if p in name2num:
- if p == recipient:
- output += (f'{p}, you will be charged multiple times. '
- 'This may not be what you want\n')
- else:
- record(recipient, p, amount_per_person)
- change.append([recipient, p, amount_per_person])
- else:
- output += f"{p} not known. Please take care manually\n"
-
- if record_changes and not dry_run:
- changes[recipient].append(change)
+ source, destination = args[2:4]
+ if source not in self.balance:
+ return {'err': f'source "{source}" not known'}
- output += "New Balance:\n"
- output += create_summary(recipient)
- return {'msg': output}
+ if destination not in self.balance:
+ return {'err': f'destination "{destination}" not known'}
+ output = ""
+ saved_record_changes = self.disable_record_changes()
+ change = [args]
-cmds["split"] = split
-cmds["teil"] = split
-
+ ret = self.transaction(sender, ["!zieh", source, amount_raw], "")
+ if 'err' in ret:
+ # No changes yet we can fail
+ return {'err': ret['err']}
-def transaction(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if len(args) != 3:
- return {'err': f'not in form "{args[0]} amount recipient"'}
+ output += ret['msg']
+ # Sender <- X Source
+ change.append((sender, source, -amount_cent))
- if not sender in balance:
- if sender not in num2name:
- return {'err': 'you must register first'}
- sender = num2name[sender]
+ ret = self.transaction(sender, ["!schieb", destination, amount_raw],
+ "")
+ err = ret.get('err', None)
+ if err:
+ output += err + "\nThe balance may be in a inconsistent state please take care manually"
+ return {'msg': output}
- if args[1] in balance:
- recipient, amount = args[1:3]
- elif args[2] in balance:
- amount, recipient = args[1:3]
- else:
- return {'err': 'recipient not known'}
+ output += ret['msg']
+ # Sender -> X Destination
+ change.append((sender, destination, amount_cent))
- if sender == recipient:
- return {'err': 'you can not transfer money to or from yourself'}
+ ret = self.transaction(source, ["!zieh", destination, amount_raw], "")
+ err = ret.get('err', None)
+ if err:
+ output += err + "\nThe balance may be in a inconsistent state please take care manually"
+ return {'msg': output}
- try:
- amount = to_cent(amount)
- except (ValueError, TypeError):
- return {'err': 'amount must be a positive number'}
+ output += ret['msg']
+ # Destination -> X Source
+ change.append((destination, source, amount_cent))
- if args[0] in ["!zieh", "!nimm"]:
- amount *= -1
+ self.restore_record_changes(saved_record_changes)
+ self.may_record_change(sender, change)
- if record_changes and not dry_run:
- changes[sender].append([args, [sender, recipient, amount]])
+ return {'msg': output}
- record(sender, recipient, amount)
+ def cars(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Manage available cars
- p_balance = balance[sender][recipient]
+ List, add, remove or pay a bill for a car.
+ """
+ # list cars
+ if len(args) < 2 or args[1] in ["ls", "list"]:
+ if len(self.available_cars) == 0:
+ return {'msg': 'No cars registered yet.'}
- output = ("New Balance: {} {} {} {}\n".format(
- sender, ("->" if p_balance > 0 else "<-"), to_euro(abs(p_balance)),
- recipient))
- return {'msg': output}
+ ret_msg = ""
+ if len(args) > 2:
+ cars_to_list = args[2:]
+ else:
+ cars_to_list = self.available_cars
-cmds["schieb"] = transaction
-cmds["gib"] = transaction
-cmds["zieh"] = transaction
-cmds["nimm"] = transaction
+ for car in cars_to_list:
+ if car in self.available_cars:
+ ret_msg += f"{car} - service charge {self.available_cars[car]}ct/km\n"
+ ret_msg += self.create_summary(car) + "\n"
+ else:
+ return {'err': f'"{car}" is no available car\n'}
+ return {'msg': ret_msg[:-1]}
-def transfer(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if len(args) < 4:
- return {'err': f'not in form "{args[0]} amount source destination"'}
+ # add car
+ if args[1] in ["add", "new"]:
+ if len(args) < 4:
+ return {
+ 'err':
+ f'not in form "{args[0]} {args[1]} car-name service-charge"'
+ }
- if not sender in num2name:
- return {'err': 'you must register first'}
+ car = args[2]
+ if car in self.available_cars:
+ return {'err': f'"{car}" already registered'}
- sender = num2name[sender]
+ if car in self.balance:
+ return {
+ 'err':
+ f'A user named "{car}" already exists. Please use a different name for this car'
+ }
- try:
- amount_raw = args[1]
- amount_cent = to_cent(amount_raw)
- except (ValueError, TypeError):
- return {'err': 'amount must be a positive number'}
+ try:
+ service_charge = to_cent(args[3])
+ except (ValueError, TypeError):
+ return {'err': 'service-charge must be a positive number'}
- source, destination = args[2:4]
- if source not in balance:
- return {'err': f'source "{source}" not known'}
+ self.available_cars[car] = service_charge
+ self.add_to_balance(car)
+ return {'msg': f'added "{car}" as an available car'}
- if destination not in balance:
- return {'err': f'destination "{destination}" not known'}
+ # remove car
+ if args[1] in ["rm", "remove"]:
+ if len(args) < 3:
+ return {'err': f'not in form "{args[0]} {args[1]} car-name"'}
- output = ""
- global record_changes
- saved_record_changes = record_changes
- record_changes = False
- change = [args]
+ car = args[2]
+ if car not in self.available_cars:
+ return {'err': f'A car with the name "{car}" does not exists'}
- ret = transaction(sender, ["!zieh", source, amount_raw], "")
- if 'err' in ret:
- # No changes yet we can fail
- return {'err': ret['err']}
+ del self.available_cars[car]
+ self.remove_from_balance(car)
+ return {'msg': f'removed "{car}" from the available cars'}
- output += ret['msg']
- # Sender <- X Source
- change.append((sender, source, -amount_cent))
+ # pay bill
+ if args[1] in ["pay"]:
+ if len(args) < 4:
+ return {
+ 'err': f'not in form "{args[0]} {args[1]} car-name amount"'
+ }
- ret = transaction(sender, ["!schieb", destination, amount_raw], "")
- err = ret.get('err', None)
- if err:
- output += err + "\nThe balance may be in a inconsistent state please take care manually"
- return {'msg': output}
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- output += ret['msg']
- # Sender -> X Destination
- change.append((sender, destination, amount_cent))
+ sender_name = self.num2name[sender]
- ret = transaction(source, ["!zieh", destination, amount_raw], "")
- err = ret.get('err', None)
- if err:
- output += err + "\nThe balance may be in a inconsistent state please take care manually"
- return {'msg': output}
+ car = args[2]
+ if car not in self.available_cars:
+ return {'err': f'car "{car}" not known'}
- output += ret['msg']
- # Destination -> X Source
- change.append((destination, source, amount_cent))
+ try:
+ amount = to_cent(args[3])
+ amount_euro = to_euro(amount)
+ except (ValueError, TypeError):
+ return {'err': 'amount must be a positive number'}
- record_changes = saved_record_changes
- if err is None and record_changes and not dry_run:
- changes[sender].append(change)
+ output = ""
- return {'msg': output}
+ saved_record_changes = self.disable_record_changes()
+ change = [args]
+ total_available_charge = 0
+ available_charges = []
+ for person in self.balance[car]:
+ _amount = self.balance[car][person]
+ if _amount < 0:
+ total_available_charge -= _amount
+ available_charges.append((person, _amount))
-cmds["transfer"] = transfer
+ proportion = -1
+ if amount < total_available_charge:
+ proportion = -1 * (amount / total_available_charge)
+ ret = self.transaction(sender, f'!gib {car} {amount_euro}'.split(),
+ '')
+ assert 'err' not in ret
+ output += f"{sender_name} payed {amount_euro}\n"
-def cars(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- # list cars
- if len(args) < 2 or args[1] in ["ls", "list"]:
- if len(available_cars) == 0:
- return {'msg': 'No cars registered yet.'}
+ # transfer money
+ output += f"Transferring {proportion * -100:.2f}% of everybody's charges\n"
+ for person, _amount in available_charges:
+ if person == sender_name or _amount >= 0:
+ continue
- ret_msg = ""
+ to_move = int(_amount * proportion)
+ to_move_euro = to_euro(to_move)
+ ret = self.transfer(sender,
+ ['transfer', to_move_euro, car, person],
+ '')
+ assert 'err' not in ret
- if len(args) > 2:
- cars_to_list = args[2:]
- else:
- cars_to_list = available_cars
+ output += "Transfer {} from {} to {}\n".format(
+ to_move_euro, person, sender_name)
- for car in cars_to_list:
- if car in available_cars:
- ret_msg += f"{car} - service charge {available_cars[car]}ct/km\n"
- ret_msg += create_summary(car) + "\n"
- else:
- return {'err': f'"{car}" is no available car\n'}
+ output += "New Balances:\n"
+ output += self.create_summary(sender_name) + "\n"
+ output += self.create_summary(car)
- return {'msg': ret_msg[:-1]}
+ self.restore_record_changes(saved_record_changes)
+ self.may_record_change(sender_name, change)
- # add car
- if args[1] in ["add", "new"]:
- if len(args) < 4:
- return {
- 'err':
- f'not in form "{args[0]} {args[1]} car-name service-charge"'
- }
+ return {'msg': output}
- car = args[2]
- if car in available_cars:
- return {'err': f'"{car}" already registered'}
+ return {'err': f'unknown car subcommand "{args[1]}".'}
- if car in balance:
+ def _tanken(self, sender, args, msg) -> dict[str, str]:
+ """Split a tank across all passengers"""
+ if len(args) < 2:
return {
- 'err':
- f'A user named "{car}" already exists. Please use a different name for this car'
+ 'err': f'not in form "{args[0]} amount [person] [car] [info]"'
}
-
try:
- service_charge = to_cent(args[3])
+ amount = to_cent(args[1])
except (ValueError, TypeError):
- return {'err': 'service-charge must be a positive number'}
+ return {'err': 'amount must be a number'}
- available_cars[car] = service_charge
- add_to_balance(car)
- return {'msg': f'added "{car}" as an available car'}
-
- # remove car
- if args[1] in ["rm", "remove"]:
- if len(args) < 3:
- return {'err': f'not in form "{args[0]} {args[1]} car-name"'}
-
- car = args[2]
- if car not in available_cars:
- return {'err': f'A car with the name "{car}" does not exists'}
-
- del available_cars[car]
- remove_from_balance(car)
- return {'msg': f'removed "{car}" from the available cars'}
+ # find recipient
+ if len(args) > 2 and args[2] in self.name2num:
+ recipient = args[2]
+ elif sender in self.num2name:
+ recipient = self.num2name[sender]
+ else:
+ return {'err': 'recipient unknown'}
- # pay bill
- if args[1] in ["pay"]:
- if len(args) < 4:
- return {
- 'err': f'not in form "{args[0]} {args[1]} car-name amount"'
- }
+ # find car
+ car = None
+ if len(args) > 2 and args[2] in self.available_cars:
+ car = args[2]
+ elif len(args) > 3 and args[3] in self.available_cars:
+ car = args[3]
- if not sender in num2name:
- return {'err': 'you must register first'}
+ service_charge = 0
+ if car:
+ service_charge = self.available_cars[car]
- sender_name = num2name[sender]
+ parts, err = tanken.tanken(msg[1:], amount, service_charge)
- car = args[2]
- if car not in available_cars:
- return {'err': f'car "{car}" not known'}
+ if err:
+ return {'err': err}
- try:
- amount = to_cent(args[3])
- amount_euro = to_euro(amount)
- except (ValueError, TypeError):
- return {'err': 'amount must be a positive number'}
+ assert parts
output = ""
-
- global record_changes
- saved_record_changes = record_changes
- record_changes = False
change = [args]
-
- total_available_charge = 0
- available_charges = []
- for person in balance[car]:
- _amount = balance[car][person]
- if _amount < 0:
- total_available_charge -= _amount
- available_charges.append((person, _amount))
-
- proportion = -1
- if amount < total_available_charge:
- proportion = -1 * (amount / total_available_charge)
-
- ret = transaction(sender, f'!gib {car} {amount_euro}'.split(), '')
- assert 'err' not in ret
- output += f"{sender_name} payed {amount_euro}\n"
-
- # transfer money
- output += f"Transferring {proportion * -100:.2f}% of everybody's charges\n"
- for person, _amount in available_charges:
- if person == sender_name or _amount >= 0:
+ for pname, values in parts.items():
+ output += pname + ": {}km = fuel: {}, service charge: {}\n".format(
+ values["distance"], to_euro(values["cost"]),
+ to_euro(values["service_charge"]))
+ # record service charges
+ if pname not in self.name2num:
+ output += pname + " not known."
+ if car:
+ person_to_charge = pname
+ if pname not in self.name2num:
+ person_to_charge = recipient
+ output += f" {recipient} held accountable for service charge."
+
+ self.record(car, person_to_charge, values["service_charge"])
+ change.append(
+ [car, person_to_charge, values["service_charge"]])
+
+ # recipient paid the fuel -> don't charge them
+ if pname == recipient:
continue
- to_move = int(_amount * proportion)
- to_move_euro = to_euro(to_move)
- ret = transfer(sender, ['transfer', to_move_euro, car, person], '')
- assert 'err' not in ret
-
- output += "Transfer {} from {} to {}\n".format(
- to_move_euro, person, sender_name)
-
- output += "New Balances:\n"
- output += create_summary(sender_name) + "\n"
- output += create_summary(car)
-
- record_changes = saved_record_changes
- if record_changes and not dry_run:
- changes[sender_name].append(change)
-
- return {'msg': output}
-
- return {'err': f'unknown car subcommand "{args[1]}".'}
-
-
-cmds["cars"] = cars
-
+ if pname in self.name2num:
+ self.record(recipient, pname, values["cost"])
+ change.append([recipient, pname, values["cost"]])
+ else:
+ output += " Please collect fuel cost manually\n"
-def _tanken(sender, args, msg) -> dict[str, str]:
- if len(args) < 2:
- return {'err': f'not in form "{args[0]} amount [person] [car] [info]"'}
- try:
- amount = to_cent(args[1])
- except (ValueError, TypeError):
- return {'err': 'amount must be a number'}
+ self.may_record_change(self.num2name[sender], change)
- # find recipient
- if len(args) > 2 and args[2] in name2num:
- recipient = args[2]
- elif sender in num2name:
- recipient = num2name[sender]
- else:
- return {'err': 'recipient unknown'}
-
- # find car
- car = None
- if len(args) > 2 and args[2] in available_cars:
- car = args[2]
- elif len(args) > 3 and args[3] in available_cars:
- car = args[3]
-
- service_charge = 0
- if car:
- service_charge = available_cars[car]
-
- parts, err = tanken.tanken(msg[1:], amount, service_charge)
-
- if err:
- return {'err': err}
-
- assert parts
-
- output = ""
- change = [args]
- for pname, values in parts.items():
- output += pname + ": {}km = fuel: {}, service charge: {}\n".format(
- values["distance"], to_euro(values["cost"]),
- to_euro(values["service_charge"]))
- # record service charges
- if pname not in name2num:
- output += pname + " not known."
+ output += "New Balance:\n"
+ output += self.create_summary(recipient)
if car:
- person_to_charge = pname
- if pname not in name2num:
- person_to_charge = recipient
- output += f" {recipient} held accountable for service charge."
-
- record(car, person_to_charge, values["service_charge"])
- change.append([car, person_to_charge, values["service_charge"]])
+ output += "\nCar "
+ output += self.create_summary(car)
+ return {'msg': output}
- # recipient paid the fuel -> don't charge them
- if pname == recipient:
- continue
+ def fuck(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Rewind past changes"""
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- if pname in name2num:
- record(recipient, pname, values["cost"])
- change.append([recipient, pname, values["cost"]])
- else:
- output += " Please collect fuel cost manually\n"
+ name = self.num2name[sender]
- if record_changes and not dry_run:
- changes[num2name[sender]].append(change)
+ nchanges = len(self.changes[name])
+ if nchanges == 0:
+ return {'msg': 'Nothing to rewind'}
- output += "New Balance:\n"
- output += create_summary(recipient)
- if car:
- output += "\nCar "
- output += create_summary(car)
- return {'msg': output}
+ change_to_rewind = -1
+ if len(args) >= 2:
+ try:
+ change_to_rewind = int(args[1]) - 1
+ except ValueError:
+ return {'err': 'change to rewind must be a number'}
+
+ if change_to_rewind > nchanges:
+ return {
+ 'err': 'change to rewind is bigger than there are changes'
+ }
+
+ # pop last item
+ last_changes = self.changes[name].pop(change_to_rewind)
+ args, last_changes = last_changes[0], last_changes[1:]
+
+ output = name + ": sorry I fucked up!\nRewinding:\n"
+ output += ' '.join(args) + "\n"
+ for change in last_changes:
+ if not change[0] in self.cmds:
+ output += "{} {} {} {}\n".format(
+ change[0], ("->" if change[2] < 0 else "<-"),
+ to_euro(abs(change[2])), change[1])
+ self.record(change[1], change[0], change[2])
+
+ for change in last_changes:
+ if change[0] in self.cmds:
+ ret = self.cmds[change[0]](sender, change, "")
+ if 'err' in ret:
+ output += "ERROR: " + ret['err']
+ else:
+ output += ret['msg']
-cmds["tanken"] = _tanken
+ return {'msg': output}
+ def list_changes(self, sender, args, msg) -> dict[str, str]:
+ """List changes made by the sender"""
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
-def fuck(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if not sender in num2name:
- return {'err': 'you must register first'}
+ sender_name = self.num2name[sender]
- name = num2name[sender]
+ changes_to_list = 5
+ if len(args) >= 2:
+ try:
+ changes_to_list = int(args[1])
+ except ValueError:
+ return {
+ 'err': 'the amount of changes to list must be a number'
+ }
- nchanges = len(changes[name])
- if nchanges == 0:
- return {'msg': 'Nothing to rewind'}
+ nchanges = len(self.changes[sender_name])
+ if nchanges == 0:
+ return {'msg': 'Nothing to list'}
- change_to_rewind = -1
- if len(args) >= 2:
- try:
- change_to_rewind = int(args[1]) - 1
- except ValueError:
- return {'err': 'change to rewind must be a number'}
-
- if change_to_rewind > nchanges:
- return {'err': 'change to rewind is bigger than there are changes'}
-
- # pop last item
- last_changes = changes[name].pop(change_to_rewind)
- args, last_changes = last_changes[0], last_changes[1:]
-
- output = name + ": sorry I fucked up!\nRewinding:\n"
- output += ' '.join(args) + "\n"
- for change in last_changes:
- if not change[0] in cmds:
- output += "{} {} {} {}\n".format(change[0],
- ("->" if change[2] < 0 else "<-"),
- to_euro(abs(change[2])),
- change[1])
- record(change[1], change[0], change[2])
-
- for change in last_changes:
- if change[0] in cmds:
- ret = cmds[change[0]](sender, change, "")
+ first_to_list = max(nchanges - changes_to_list, 0)
- if 'err' in ret:
- output += "ERROR: " + ret['err']
- else:
- output += ret['msg']
+ if len(args) == 3:
+ try:
+ first_to_list = int(args[2]) - 1
+ except ValueError:
+ return {'err': 'the first change to list must be a number'}
+
+ if first_to_list > nchanges:
+ return {
+ 'err':
+ 'the first change to list is bigger than there are changes'
+ }
+
+ msg = ""
+ i = 0
+ for i, change in enumerate(self.changes[sender_name]):
+ if i < first_to_list:
+ continue
- return {'msg': output}
+ if i >= first_to_list + changes_to_list:
+ # i is used to track how many changes we listed
+ # This change will not be listed so decrement i before extiting the loop.
+ i -= 1
+ break
+ msg += f'Change {i + 1}:\n'
+ msg += f'\t{" ".join(change[0])}\n'
+ for sender, recipient, amount in change[1:]:
+ msg += "\t{} {} {} {}\n".format(sender,
+ ("->" if amount < 0 else "<-"),
+ to_euro(abs(amount)),
+ recipient)
-cmds["fuck"] = fuck
-cmds["rewind"] = fuck
-cmds["undo"] = fuck
+ # prepend message header because we want to know how much changes we actually listed
+ msg = f'Changes from {sender_name} {first_to_list + 1}-{i + 1}\n' + msg
+ return {'msg': msg}
-def list_changes(sender, args, msg) -> dict[str, str]:
- if not sender in num2name:
- return {'err': 'you must register first'}
+ def export_state(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Send the state file as attachment"""
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- sender_name = num2name[sender]
+ msg = f'State from {datetime.now().date().isoformat()}'
+ return {'msg': msg, 'attachment': STATE_FILE}
- changes_to_list = 5
- if len(args) >= 2:
- try:
- changes_to_list = int(args[1])
- except ValueError:
- return {'err': 'the amount of changes to list must be a number'}
+ def schedule(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Schedule a command for periodic execution"""
+ if not sender in self.num2name:
+ return {'err': 'you must register first'}
- nchanges = len(changes[sender_name])
- if nchanges == 0:
- return {'msg': 'Nothing to list'}
+ sender_name = self.num2name[sender]
- first_to_list = max(nchanges - changes_to_list, 0)
+ if len(args) < 3:
+ return {'err': f'not in form "{args[0]} name cmd"'}
- if len(args) == 3:
- try:
- first_to_list = int(args[2]) - 1
- except ValueError:
- return {'err': 'the first change to list must be a number'}
+ name = args[1]
+ cmd = args[2:]
- if first_to_list > nchanges:
+ if name in self.scheduled_cmds:
return {
- 'err':
- 'the first change to list is bigger than there are changes'
+ 'err': f'there is already a scheduled command named "{name}"'
}
- msg = ""
- i = 0
- for i, change in enumerate(changes[sender_name]):
- if i < first_to_list:
- continue
-
- if i >= first_to_list + changes_to_list:
- # i is used to track how many changes we listed
- # This change will not be listed so decrement i before extiting the loop.
- i -= 1
- break
-
- msg += f'Change {i + 1}:\n'
- msg += f'\t{" ".join(change[0])}\n'
- for sender, recipient, amount in change[1:]:
- msg += "\t{} {} {} {}\n".format(sender,
- ("->" if amount < 0 else "<-"),
- to_euro(abs(amount)), recipient)
-
- # prepend message header because we want to know how much changes we actually listed
- msg = f'Changes from {sender_name} {first_to_list + 1}-{i + 1}\n' + msg
-
- return {'msg': msg}
-
-
-cmds["list-changes"] = list_changes
-
-
-def export_state(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if not sender in num2name:
- return {'err': 'you must register first'}
-
- msg = f'State from {datetime.now().date().isoformat()}'
- return {'msg': msg, 'attachment': state_file}
-
-
-cmds["export-state"] = export_state
-
-
-def schedule(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- if not sender in num2name:
- return {'err': 'you must register first'}
-
- sender_name = num2name[sender]
-
- if len(args) < 3:
- return {'err': f'not in form "{args[0]} name cmd"'}
-
- name = args[1]
- cmd = args[2:]
-
- if name in scheduled_cmds:
- return {'err': f'there is already a scheduled command named "{name}"'}
-
- # Test the command
- global dry_run
- old_dry_run, dry_run = dry_run, True
-
- ret = cmds[cmd[0]](sender, cmd, '')
-
- dry_run = old_dry_run
-
- if 'err' in ret:
- return {'err': 'the command "{}" failed and will not be recorded'}
-
- scheduled_cmd = {
- "schedule": args[0][1:],
- "last_time": None,
- "sender": sender,
- "cmd": cmd
- }
+ # Test the command
+ saved_dry_run = self.enable_dry_run()
+ ret = self.cmds[cmd[0]](sender, cmd, '')
+ self.restore_dry_run(saved_dry_run)
- scheduled_cmds[name] = scheduled_cmd
- output = 'Recorded the {} command "{}" as "{}"\n'.format(
- args[0][1:], ' '.join(cmd), name)
+ if 'err' in ret:
+ return {'err': 'the command "{}" failed and will not be recorded'}
- output += "Running {} command {} for {} initially\n".format(
- scheduled_cmd["schedule"], name, sender_name)
-
- ret = cmds[cmd[0]](sender, cmd, "")
- if 'err' in ret:
- output += 'ERROR: ' + ret['err']
- else:
- output += ret['msg']
-
- changes[sender_name][0].append(["cancel", name])
-
- now = datetime.now().date()
- scheduled_cmd["last_time"] = now.isoformat()
-
- return {'msg': output}
-
-
-cmds["weekly"] = schedule
-cmds["monthly"] = schedule
-cmds["yearly"] = schedule
-
-
-def cancel(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- cmd_name = args[1]
- if not cmd_name in scheduled_cmds:
- return {'err': f'"{cmd_name}" is not a scheduled command'}
- cmd = scheduled_cmds[cmd_name]
-
- if not cmd["sender"] == sender:
- return {'err': 'only the original creator can cancel this command'}
-
- del scheduled_cmds[cmd_name]
- return {'msg': f'Cancelled the {cmd["schedule"]} cmd "{cmd_name}"'}
-
-
-cmds["cancel"] = cancel
+ scheduled_cmd = {
+ "schedule": args[0][1:],
+ "last_time": None,
+ "sender": sender,
+ "cmd": cmd
+ }
+ self.scheduled_cmds[name] = scheduled_cmd
+ output = 'Recorded the {} command "{}" as "{}"\n'.format(
+ args[0][1:], ' '.join(cmd), name)
-def thanks(sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
- sender_name = num2name.get(sender, sender)
- msg = f'You are welcome. It is a pleasure to work with you, {sender_name}.'
- nick = None if len(args) == 1 else args[1]
- if nick:
- msg = f"{msg}\nBut don't call me {nick}."
+ output += "Running {} command {} for {} initially\n".format(
+ scheduled_cmd["schedule"], name, sender_name)
- return {'msg': msg}
-
-
-cmds["thanks"] = thanks
+ ret = self.cmds[cmd[0]](sender, cmd, "")
+ if 'err' in ret:
+ output += 'ERROR: ' + ret['err']
+ else:
+ output += ret['msg']
+ self.changes[sender_name][0].append(["cancel", name])
-def main():
- if len(sys.argv) > 1 and sys.argv[1] in ["-d", "--dry-run"]:
- global dry_run
- dry_run = True
- print("Dry Run no changes will apply!")
+ now = datetime.now().date()
+ scheduled_cmd["last_time"] = now.isoformat()
- # Read cmds from stdin
- for l in sys.stdin.read().splitlines():
- try:
- message = json.loads(l)["envelope"]
- except:
- print(datetime.now(), l, "not valid json")
- continue
+ return {'msg': output}
+ def cancel(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Cancel a previously scheduled command"""
+ cmd_name = args[1]
+ if not cmd_name in self.scheduled_cmds:
+ return {'err': f'"{cmd_name}" is not a scheduled command'}
+ cmd = self.scheduled_cmds[cmd_name]
+
+ if not cmd["sender"] == sender:
+ return {'err': 'only the original creator can cancel this command'}
+
+ del self.scheduled_cmds[cmd_name]
+ return {'msg': f'Cancelled the {cmd["schedule"]} cmd "{cmd_name}"'}
+
+ def thanks(self, sender, args, msg) -> dict[str, str]: # pylint: disable=unused-argument
+ """Thank geldschieberbot for its loyal service"""
+ sender_name = self.num2name.get(sender, sender)
+ msg = f'You are welcome. It is a pleasure to work with you, {sender_name}.'
+ nick = None if len(args) == 1 else args[1]
+ if nick:
+ msg = f"{msg}\nBut don't call me {nick}."
+
+ return {'msg': msg}
+
+ def __init__(self, dry_run=False):
+ self.dry_run = dry_run # Run without changing the stored state
+ self.load_state()
+ self.quiet = False # Run without sending messages
+ self.record_changes = True # Should changes be recorded
+
+ # Command dispatch table
+ self.cmds = {
+ 'reg': self.register,
+ 'register': self.register,
+ 'sum': self.summary,
+ 'summary': self.summary,
+ 'full-sum': self.full_summary,
+ 'full-summary': self.full_summary,
+ 'ls': self.list_users,
+ 'list': self.list_users,
+ 'split': self.split,
+ 'teil': self.split,
+ 'schieb': self.transaction,
+ 'gib': self.transaction,
+ 'zieh': self.transaction,
+ 'nimm': self.transaction,
+ 'help': self.usage,
+ 'usage': self.usage,
+ 'transfer': self.transfer,
+ 'cars': self.cars,
+ 'tanken': self._tanken,
+ 'fuck': self.fuck,
+ 'rewind': self.fuck,
+ 'undo': self.fuck,
+ 'list-changes': self.list_changes,
+ 'export-state': self.export_state,
+ 'weekly': self.schedule,
+ 'monthly': self.schedule,
+ 'yearly': self.schedule,
+ 'cancel': self.cancel,
+ 'thanks': self.thanks,
+ }
+
+ def __del__(self):
+ self.save_state()
+
+ def enable_dry_run(self) -> bool:
+ """Enable dry run"""
+ old_value = self.dry_run
+ self.dry_run = True
+ return old_value
+
+ def restore_dry_run(self, old_value: bool):
+ """Restore dry run setting to old value"""
+ self.dry_run = old_value
+
+ def disable_record_changes(self) -> bool:
+ """Disable change recording and return previous value"""
+ old_value = self.record_changes
+ self.record_changes = False
+ return old_value
+
+ def restore_record_changes(self, old_value: bool):
+ """Restore record change setting to old value"""
+ self.record_changes = old_value
+
+ def may_record_change(self, user: str, change):
+ """Record a change for a user if change recording is enabled"""
+ if self.record_changes and not self.dry_run:
+ self.changes[user].append(change)
+
+ def handle(self, message: dict):
+ """Parse and respond to a message"""
sender_number = message["source"]
if not "dataMessage" in message or not message[
"dataMessage"] or not message["dataMessage"]["message"]:
- continue
- else:
- message = message["dataMessage"]
- if message["groupInfo"] and message["groupInfo"][
- "groupId"] != group_id:
- continue
- body = [l.strip() for l in message["message"].lower().splitlines()]
+ return
- if len(body) == 0:
- continue
+ message = message["dataMessage"]
+ if message["groupInfo"] and message["groupInfo"]["groupId"] != GROUP_ID:
+ return
- args = body[0].split(' ')
+ body = [l.strip() for l in message["message"].lower().splitlines()]
- if args[0].startswith("!"):
- cmd = args[0][1:]
- if cmd in cmds:
- ret = cmds[cmd](sender_number, args, body)
- if 'err' in ret:
- send(f'ERROR: {ret["err"]}')
- else:
- if not 'msg' in ret:
- print(ret)
- send(ret['msg'], attachment=ret.get('attachment', None))
+ if len(body) == 0 or not body[0].startswith('!'):
+ return
+
+ args = body[0].split(' ')
+ cmd = args[0][1:]
+ if cmd in self.cmds:
+ ret = self.cmds[cmd](sender_number, args, body)
+ if 'err' in ret:
+ self.send(f'ERROR: {ret["err"]}')
else:
- send('ERROR: unknown cmd. Enter !help for a list of commands.')
+ if not 'msg' in ret:
+ print(ret)
+ self.send(ret['msg'], attachment=ret.get('attachment', None))
+ else:
+ self.send(
+ 'ERROR: unknown cmd. Enter !help for a list of commands.')
+
+ self.save_state()
- # Handle scheduled commands
- global record_changes
- record_changes = False
+ def run_scheduled_cmds(self):
+ """Progress the scheduled commands"""
+ self.record_changes = False
now = datetime.now().date()
week_delta = timedelta(days=7)
- for name, cmd in scheduled_cmds.items():
+ for name, cmd in self.scheduled_cmds.items():
last_time = cmd["last_time"]
if hasattr(date, "fromisoformat"):
@@ -945,24 +949,43 @@ def main():
d = d + week_delta
if d <= now:
- send("Running {} command {} for {} triggered on {}\n".
- format(cmd["schedule"], name, num2name[cmd["sender"]],
- d.isoformat()))
+ self.send("Running {} command {} for {} triggered on {}\n".
+ format(cmd["schedule"],
+ name, self.num2name[cmd["sender"]],
+ d.isoformat()))
- ret = cmds[cmd["cmd"][0]](cmd["sender"], cmd["cmd"], "")
+ ret = self.cmds[cmd["cmd"][0]](cmd["sender"], cmd["cmd"],
+ "")
if 'err' in ret:
- send("ERROR: " + ret['err'])
+ self.send("ERROR: " + ret['err'])
else:
- send(ret['msg'],
- attachment=ret.get('attachment', None))
+ self.send(ret['msg'],
+ attachment=ret.get('attachment', None))
cmd["last_time"] = d.isoformat()
else:
break
- with open(state_file, 'w', encoding='utf-8') as f:
- json.dump(state, f)
+
+def main():
+ dry_run = len(sys.argv) > 1 and sys.argv[1] in ["-d", "--dry-run"]
+ if dry_run:
+ print("Dry Run no changes will apply!")
+
+ bot = Geldschieberbot(dry_run=dry_run)
+
+ # Read cmds from stdin
+ for l in sys.stdin.read().splitlines():
+ try:
+ message = json.loads(l)["envelope"]
+ except json.JSONDecodeError:
+ print(datetime.now(), l, "not valid json")
+ continue
+
+ bot.handle(message)
+
+ bot.run_scheduled_cmds()
if __name__ == "__main__":