import asyncio import codecs import datetime import logging import os import re import dateparser import pytz import regex import discord from uuid import uuid4 from string import ascii_lowercase from matplotlib import rcParams from matplotlib.afm import AFM from pytz import UnknownTimeZoneError from unidecode import unidecode from essentials.multi_server import get_pre from essentials.exceptions import * from essentials.settings import SETTINGS from utils.misc import possible_timezones logger = logging.getLogger('bot') ## Helvetica is the closest font to Whitney (discord uses Whitney) in afm ## This is used to estimate text width and adjust the layout of the embeds afm_fname = os.path.join(rcParams['datapath'], 'fonts', 'afm', 'phvr8a.afm') with open(afm_fname, 'rb') as fh: afm = AFM(fh) ## A-Z Emojis for Discord AZ_EMOJIS = [(b'\\U0001f1a'.replace(b'a', bytes(hex(224 + (6 + i))[2:], "utf-8"))).decode("unicode-escape") for i in range(26)] class Poll: def __init__(self, bot, ctx=None, server=None, channel=None, load=False): self.bot = bot self.cursor_pos = 0 if not load and ctx: if server is None: server = ctx.message.guild if channel is None: channel = ctx.message.channel self.id = None self.author = ctx.message.author self.server = server self.channel = channel self.name = "Quick Poll" self.short = str(uuid4())[0:23] self.anonymous = False self.reaction = True self.multiple_choice = 1 self.options_reaction = ['yes', 'no'] self.options_reaction_default = False # self.options_traditional = [] # self.options_traditional_default = False self.roles = ['@everyone'] self.weights_roles = [] self.weights_numbers = [] self.duration = 0 self.duration_tz = 0.0 self.time_created = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) self.open = True self.active = True self.activation = 0 self.activation_tz = 0.0 self.votes = {} async def is_open(self, update_db=True): if self.server is None: self.open = True return if self.open and self.duration != 0 \ and datetime.datetime.utcnow().replace(tzinfo=pytz.utc) > self.get_duration_with_tz(): self.open = False if update_db: await self.save_to_db() return self.open async def is_active(self, update_db=True): if self.server is None: self.active = False return if not self.active and self.activation != 0 \ and datetime.datetime.utcnow().replace(tzinfo=pytz.utc) > self.get_activation_with_tz(): self.active = True if update_db: await self.save_to_db() return self.active async def wizard_says(self, ctx, text, footer=True): embed = discord.Embed(title="Poll creation Wizard", description=text, color=SETTINGS.color) if footer: embed.set_footer(text="Type `stop` to cancel the wizard.") return await ctx.send(embed=embed) async def wizard_says_edit(self, message, text, add=False): if add and message.embeds.__len__() > 0: text = message.embeds[0].description + text embed = discord.Embed(title="Poll creation Wizard", description=text, color=SETTINGS.color) embed.set_footer(text="Type `stop` to cancel the wizard.") return await message.edit(embed=embed) async def add_error(self, message, error): text = '' if message.embeds.__len__() > 0: text = message.embeds[0].description + '\n\n:exclamation: ' + error return await self.wizard_says_edit(message, text) async def add_vaild(self, message, string): text = '' if message.embeds.__len__() > 0: text = message.embeds[0].description + '\n\nāœ… ' + string return await self.wizard_says_edit(message, text) async def get_user_reply(self, ctx): """Pre-parse user input for wizard""" def check(m): return m.author == self.author reply = await self.bot.wait_for('message', check=check) if reply and reply.content: if reply.content.startswith(await get_pre(self.bot, reply)): await self.wizard_says(ctx, f'You can\'t use bot commands during the Poll Creation Wizard.\n' f'Stopping the Wizard and then executing the command:\n`{reply.content}`', footer=False) raise StopWizard elif reply.content.lower() == 'stop': await self.wizard_says(ctx, 'Poll Wizard stopped.', footer=False) raise StopWizard else: return reply.content else: raise InvalidInput def sanitize_string(self, string): """Sanitize user input for wizard""" # sanitize input if string is None: raise InvalidInput string = regex.sub("\p{C}+", "", string) if set(string).issubset(set(' ')): raise InvalidInput return string async def set_name(self, ctx, force=None): """Set the Question / Name of the Poll.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput min_len = 3 max_len = 200 in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif min_len <= in_reply.__len__() <= max_len: return in_reply else: raise InvalidInput try: self.name = await get_valid(force) return except InputError: pass text = ("**What is the question of your poll?**\n" "Try to be descriptive without writing more than one sentence.") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) self.name = await get_valid(reply) await self.add_vaild(message, self.name) break except InvalidInput: await self.add_error(message, '**Keep the name between 3 and 200 valid characters**') async def set_short(self, ctx, force=None): """Set the label of the Poll.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput min_len = 2 max_len = 25 in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif in_reply in ['open', 'closed', 'prepared']: raise ReservedInput elif await self.bot.db.polls.find_one({'server_id': str(self.server.id), 'short': in_reply}) is not None: raise DuplicateInput elif min_len <= in_reply.__len__() <= max_len and in_reply.split(" ").__len__() == 1: return in_reply else: raise InvalidInput try: self.short = await get_valid(force) return except InputError: pass text = """Great. **Now type a unique one word identifier, a label, for your poll.** This label will be used to refer to the poll. Keep it short and significant.""" message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) self.short = await get_valid(reply) await self.add_vaild(message, self.short) break except InvalidInput: await self.add_error(message, '**Only one word between 2 and 25 valid characters!**') except ReservedInput: await self.add_error(message, '**Can\'t use reserved words (open, closed, prepared) as label!**') except DuplicateInput: await self.add_error(message, f'**The label `{reply}` is not unique on this server. Choose a different one!**') async def set_preparation(self, ctx, force=None): """Set the preparation conditions for the Poll.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif in_reply == '0': return 0 dt = dateparser.parse(in_reply) if not isinstance(dt, datetime.datetime): raise InvalidInput if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: dt = dt.astimezone(pytz.utc) now = datetime.datetime.utcnow().astimezone(pytz.utc) # print(now, now.tzinfo) # print("orig",dt , dt.tzinfo) # dt = dt.astimezone(pytz.utc) # print("converted", dt, dt.tzinfo) if dt < now: raise DateOutOfRange(dt) return dt try: dt = await get_valid(force) self.activation = dt if self.activation != 0: self.activation_tz = dt.utcoffset().total_seconds() / 3600 self.active = False return except InputError: pass text = ("This poll will be created inactive. You can either schedule activation at a certain date or activate " "it manually. **Type `0` to activate it manually or tell me when you want to activate it** by " "typing an absolute or relative date. You can specify a timezone if you want.\n" "Examples: `in 2 days`, `next week CET`, `may 3rd 2019`, `9.11.2019 9pm EST` ") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) dt = await get_valid(reply) self.activation = dt if self.activation == 0: await self.add_vaild(message, 'manually activated') else: self.activation_tz = dt.utcoffset().total_seconds() / 3600 await self.add_vaild(message, self.activation.strftime('%d-%b-%Y %H:%M %Z')) self.active = False break except InvalidInput: await self.add_error(message, '**I could not understand that format.**') except TypeError: await self.add_error(message, '**Type Error.**') except DateOutOfRange as e: await self.add_error(message, f'**{e.date.strftime("%d-%b-%Y %H:%M")} is in the past.**') async def set_anonymous(self, ctx, force=None): """Determine if poll is anonymous.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput is_true = ['yes', '1'] is_false = ['no', '0'] in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif in_reply.lower() in is_true: return True elif in_reply.lower() in is_false: return False else: raise InvalidInput try: self.anonymous = await get_valid(force) return except InputError: pass text = ("Next you need to decide: **Do you want your poll to be anonymous?**\n" "\n" "`0 - No`\n" "`1 - Yes`\n" "\n" "An anonymous poll has the following effects:\n" "šŸ”¹ You will never see who voted for which option\n" "šŸ”¹ Once the poll is closed, you will see who participated (but not their choice)") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) self.anonymous = await get_valid(reply) await self.add_vaild(message, f'{"Yes" if self.anonymous else "No"}') break except InvalidInput: await self.add_error(message, '**You can only answer with `yes` | `1` or `no` | `0`!**') async def set_multiple_choice(self, ctx, force=None): """Determine if poll is multiple choice.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif not in_reply.isdigit(): raise ExpectedInteger elif int(in_reply) > self.options_reaction.__len__(): raise OutOfRange elif int(in_reply) <= self.options_reaction.__len__() >= 0: return int(in_reply) else: raise InvalidInput try: self.multiple_choice = await get_valid(force) return except InputError: pass text = ("**How many options should the voters be able choose?**\n" "\n" "`0 - No Limit: Multiple Choice`\n" "`1 - Single Choice`\n" "`2+ - Specify exactly how many Choices`\n" "\n" "If the maximum choices are reached for a voter, they have to unvote an option before being able to " "vote for a different one.") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) self.multiple_choice = await get_valid(reply) await self.add_vaild(message, f'{self.multiple_choice if self.multiple_choice > 0 else "No Limit"}') break except InvalidInput: await self.add_error(message, '**Invalid Input**') except ExpectedInteger: await self.add_error(message, '**Enter a positive number**') except OutOfRange: await self.add_error(message, '**You can\'t have more choices than options.**') async def set_options_reaction(self, ctx, force=None): """Set the answers / options of the Poll.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput preset = ['1','2','3','4'] split = [r.strip() for r in in_reply.split(",")] if split.__len__() == 1: if split[0] in preset: return int(split[0]) else: raise WrongNumberOfArguments elif 1 < split.__len__() <= 26: split = [self.sanitize_string(o) for o in split] if any([len(o) < 1 for o in split]): raise InvalidInput else: return split else: raise WrongNumberOfArguments def get_preset_options(number): if number == 1: return ['āœ…', 'āŽ'] elif number == 2: return ['šŸ‘', '🤐', 'šŸ‘Ž'] elif number == 3: return ['šŸ˜', 'šŸ‘', '🤐', 'šŸ‘Ž', '🤢'] elif number == 4: return ['in favour', 'against', 'abstaining'] try: options = await get_valid(force) self.options_reaction_default = False if isinstance(options, int): self.options_reaction = get_preset_options(options) if options <= 3: self.options_reaction_default = True else: self.options_reaction = options return except InputError: pass text = ("**Choose the options/answers for your poll.**\n" "Either chose a preset of options or type your own options, separated by commas.\n" "\n" "**1** - :white_check_mark: :negative_squared_cross_mark:\n" "**2** - :thumbsup: :zipper_mouth: :thumbsdown:\n" "**3** - :heart_eyes: :thumbsup: :zipper_mouth: :thumbsdown: :nauseated_face:\n" "**4** - in favour, against, abstaining\n" "\n" "Example for custom options:\n" "**apple juice, banana ice cream, kiwi slices** ") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) options = await get_valid(reply) self.options_reaction_default = False if isinstance(options, int): self.options_reaction = get_preset_options(options) if options <= 3: self.options_reaction_default = True else: self.options_reaction = options await self.add_vaild(message, f'{", ".join(self.options_reaction)}') break except InvalidInput: await self.add_error(message, '**Invalid entry. Type `1`, `2`, `3` or `4` or a comma separated list of ' 'up to 26 options.**') except WrongNumberOfArguments: await self.add_error(message, '**You need more than 1 option! Type them in a comma separated list.**') async def set_roles(self, ctx, force=None): """Set role restrictions for the Poll.""" async def get_valid(in_reply, roles): n_roles = roles.__len__() if not in_reply: raise InvalidInput split = [self.sanitize_string(r.strip()) for r in in_reply.split(",")] if split.__len__() == 1 and split[0] in ['0', 'all', 'everyone']: return ['@everyone'] if n_roles <= 20 and force is None: if not all([r.isdigit() for r in split]): raise ExpectedInteger elif any([int(r) > n_roles for r in split]): raise OutOfRange role_names = [r.name for r in roles] return [role_names[i - 1] for i in [int(r) for r in split]] else: invalid_roles = [] for r in split: if not any([r == sr for sr in [x.name for x in roles]]): invalid_roles.append(r) if invalid_roles.__len__() > 0: raise InvalidRoles(", ".join(invalid_roles)) else: return split roles = self.server.roles n_roles = roles.__len__() try: self.roles = await get_valid(force, roles) return except InputError: pass if n_roles <= 20: text = ("**Choose which roles are allowed to vote.**\n" "Type `0`, `all` or `everyone` to have no restrictions.\n" "If you want multiple roles to be able to vote, separate the numbers with a comma.\n") text += f'\n`{0} - no restrictions`' for i, role in enumerate([r.name for r in self.server.roles]): text += f'\n`{i+1} - {role}`' text += ("\n" "\n" " Example: `2, 3` \n") else: text = ("**Choose which roles are allowed to vote.**\n" "Type `0`, `all` or `everyone` to have no restrictions.\n" "Type out the role names, separated by a comma, to restrict voting to specific roles:\n" "`moderators, Editors, vips` (hint: role names are case sensitive!)\n") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) self.roles = await get_valid(reply, roles) await self.add_vaild(message, f'{", ".join(self.roles)}') break except InvalidInput: await self.add_error(message, '**I can\'t read this input.**') except ExpectedInteger: await self.add_error(message, '**Only type positive numbers separated by a comma.**') except OutOfRange: await self.add_error(message, '**Only type numbers you can see in the list.**') except InvalidRoles as e: await self.add_error(message, f'**The following roles are invalid: {e.roles}**') async def set_weights(self, ctx, force=None): """Set role weights for the poll.""" async def get_valid(in_reply, server_roles): if not in_reply: raise InvalidInput no_weights = ['0', 'none'] pairs = [self.sanitize_string(p.strip()) for p in in_reply.split(",")] if pairs.__len__() == 1 and pairs[0] in no_weights: return [[], []] if not all([":" in p for p in pairs]): raise ExpectedSeparator(":") roles = [] weights = [] for e in pairs: c = [x.strip() for x in e.rsplit(":", 1)] if not any([c[0] == sr for sr in [x.name for x in server_roles]]): raise InvalidRoles(c[0]) c[1] = float(c[1]) # Catch ValueError c[1] = int(c[1]) if c[1].is_integer() else c[1] roles.append(c[0]) weights.append(c[1]) if len(roles) > len(set(roles)): raise WrongNumberOfArguments return [roles, weights] try: w_n = await get_valid(force, self.server.roles) self.weights_roles = w_n[0] self.weights_numbers = w_n[1] return except InputError: pass text = ("Almost done.\n" "**Weights allow you to give certain roles more or less effective votes.\n" "Type `0` or `none` if you don't need any weights.**\n" "A weight for the role `moderator` of `2` for example will automatically count the votes of all the moderators twice.\n" "To assign weights type the role, followed by a colon, followed by the weight like this:\n" "`moderator: 2, newbie: 0.5`") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) w_n = await get_valid(reply, self.server.roles) self.weights_roles = w_n[0] self.weights_numbers = w_n[1] weights = [] for r, n in zip(self.weights_roles, self.weights_numbers): weights.append(f'{r}: {n}') await self.add_vaild(message, ", ".join(weights)) break except InvalidInput: await self.add_error(message, '**Can\'t read this input.**') except ExpectedSeparator as e: await self.add_error(message, f'**Expected roles and weights to be separated by {e.separator}**') except InvalidRoles as e: await self.add_error(message, f'**Invalid role found: {e.roles}**') except ValueError: await self.add_error(message, f'**Weights must be numbers.**') except WrongNumberOfArguments: await self.add_error(message, f'**Not every role has a weight assigned.**') async def set_duration(self, ctx, force=None): """Set the duration /deadline for the Poll.""" async def get_valid(in_reply): if not in_reply: raise InvalidInput in_reply = self.sanitize_string(in_reply) if not in_reply: raise InvalidInput elif in_reply == '0': return 0 dt = dateparser.parse(in_reply) if not isinstance(dt, datetime.datetime): raise InvalidInput if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: dt = dt.astimezone(pytz.utc) now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) if dt < now: raise DateOutOfRange(dt) return dt try: dt = await get_valid(force) self.duration = dt if self.duration != 0: self.duration_tz = dt.utcoffset().total_seconds() / 3600 return except InputError: pass text = ("Last step.\n" "**When should the poll be closed?**\n" "If you want the poll to last indefinitely (until you close it), type `0`." "Otherwise tell me when the poll should close in relative or absolute terms. " "You can specify a timezone if you want.\n" "\n" "Examples: `in 6 hours` or `next week CET` or `aug 15th 5:10` or `15.8.2019 11pm EST`") message = await self.wizard_says(ctx, text) while True: try: if force: reply = force force = None else: reply = await self.get_user_reply(ctx) dt = await get_valid(reply) self.duration = dt if self.duration == 0: await self.add_vaild(message, 'until closed manually') else: self.duration_tz = dt.utcoffset().total_seconds() / 3600 await self.add_vaild(message, self.duration.strftime('%d-%b-%Y %H:%M %Z')) break except InvalidInput: await self.add_error(message, '**I could not understand that format.**') except TypeError: await self.add_error(message, '**Type Error.**') except DateOutOfRange as e: await self.add_error(message, f'**{e.date.strftime("%d-%b-%Y %H:%M")} is in the past.**') def finalize(self): self.time_created = datetime.datetime.utcnow().replace(tzinfo=pytz.utc) async def to_dict(self): if self.channel is None: cid = 0 else: cid = self.channel.id if self.author is None: aid = 0 else: aid = self.author.id return { 'server_id': str(self.server.id), 'channel_id': str(cid), 'author': str(aid), 'name': self.name, 'short': self.short, 'anonymous': self.anonymous, 'reaction': self.reaction, 'multiple_choice': self.multiple_choice, 'options_reaction': self.options_reaction, 'reaction_default': self.options_reaction_default, #'options_traditional': self.options_traditional, 'roles': self.roles, 'weights_roles': self.weights_roles, 'weights_numbers': self.weights_numbers, 'duration': self.duration, 'duration_tz': self.duration_tz, 'time_created': self.time_created, 'open': self.open, 'active': self.active, 'activation': self.activation, 'activation_tz': self.activation_tz, 'votes': self.votes } async def to_export(self): """Create report and return string""" # build string for weights weight_str = 'No weights' if self.weights_roles.__len__() > 0: weights = [] for r, n in zip(self.weights_roles, self.weights_numbers): weights.append(f'{r}: {n}') weight_str = ', '.join(weights) # Determine the poll winner winning_options = [] winning_votes = 0 for i, o in enumerate(self.options_reaction): votes = self.count_votes(i, weighted=True) if votes > winning_votes: winning_options = [o] winning_votes = votes elif votes == winning_votes: winning_options.append(o) deadline_str = await self.get_deadline(string=True) export = (f'--------------------------------------------\n' f'POLLMASTER DISCORD EXPORT\n' f'--------------------------------------------\n' f'Server name (ID): {self.server.name} ({self.server.id})\n' f'Owner of the poll: {self.author.name}\n' f'Time of creation: {self.time_created.strftime("%d-%b-%Y %H:%M %Z")}\n' f'--------------------------------------------\n' f'POLL SETTINGS\n' f'--------------------------------------------\n' f'Question / Name: {self.name}\n' f'Label: {self.short}\n' f'Anonymous: {"Yes" if self.anonymous else "No"}\n' f'Multiple choice: {"Yes" if self.multiple_choice else "No"}\n' f'Answer options: {", ".join(self.options_reaction)}\n' f'Allowed roles: {", ".join(self.roles) if self.roles.__len__() > 0 else "@everyone"}\n' f'Weights for roles: {weight_str}\n' f'Deadline: {deadline_str}\n' f'--------------------------------------------\n' f'POLL RESULTS\n' f'--------------------------------------------\n' f'Number of participants: {self.votes.__len__()}\n' f'Raw results: {", ".join([str(o)+": "+str(self.count_votes(i, weighted=False)) for i,o in enumerate(self.options_reaction)])}\n' f'Weighted results: {", ".join([str(o)+": "+str(self.count_votes(i, weighted=True)) for i,o in enumerate(self.options_reaction)])}\n' f'Winning option{"s" if winning_options.__len__() > 1 else ""}: {", ".join(winning_options)} with {winning_votes} votes\n') if not self.anonymous: export += '--------------------------------------------\n' \ 'DETAILED POLL RESULTS\n' \ '--------------------------------------------' for user_id in self.votes: member = self.server.get_member(user_id) if member and self.votes[str(member.id)]['choices'].__len__() == 0: continue if not member: name = "" else: name = member.nick if not name: name = member.name export += f'\n{name}' if self.votes[str(user_id)]['weight'] != 1: export += f' (weight: {self.votes[str(user_id)]["weight"]})' export += ': ' + ', '.join([self.options_reaction[c] for c in self.votes[str(user_id)]['choices']]) export += '\n' else: export += '--------------------------------------------\n' \ 'LIST OF PARTICIPANTS\n' \ '--------------------------------------------' for user_id in self.votes: member = self.server.get_member(user_id) if member and self.votes[str(user_id)]['choices'].__len__() == 0: continue if not member: name = "" else: name = member.nick if not name: name = member.name export += f'\n{name}' if self.votes[str(user_id)]['weight'] != 1: export += f' (weight: {self.votes[str(user_id)]["weight"]})' # export += ': ' + ', '.join([self.options_reaction[c] for c in self.votes[str(user_id)]['choices']]) export += '\n' export += ('--------------------------------------------\n' 'BOT DETAILS\n' '--------------------------------------------\n' 'Creator: Newti#0654\n' 'Link to invite, vote for or support Pollmaster:\n' 'https://discordbots.org/bot/444514223075360800\n' '--------------------------------------------\n' 'END OF FILE\n' '--------------------------------------------\n') return export async def export(self): """Create export file and return path""" if not self.open: fn = 'export/' + str(self.server.id) + '_' + str(self.short) + '.txt' with codecs.open(fn, 'w', 'utf-8') as outfile: outfile.write(await self.to_export()) return fn else: return None async def from_dict(self, d): self.id = d['_id'] self.server = self.bot.get_guild(int(d['server_id'])) self.channel = self.bot.get_channel(int(d['channel_id'])) if self.server: self.author = self.server.get_member(int(d['author'])) else: self.author = None self.name = d['name'] self.short = d['short'] self.anonymous = d['anonymous'] self.reaction = d['reaction'] # backwards compatibility for multiple choice if isinstance(d['multiple_choice'], bool): if d['multiple_choice']: self.multiple_choice = 0 else: self.multiple_choice = 1 else: try: self.multiple_choice = int(d['multiple_choice']) except ValueError: logger.exception('Multiple Choice not an int or bool.') self.multiple_choice = 0 # default self.options_reaction = d['options_reaction'] self.options_reaction_default = d['reaction_default'] # self.options_traditional = d['options_traditional'] self.roles = d['roles'] self.weights_roles = d['weights_roles'] self.weights_numbers = d['weights_numbers'] self.duration = d['duration'] self.duration_tz = d['duration_tz'] self.time_created = d['time_created'] self.activation = d['activation'] self.activation_tz = d['activation_tz'] self.active = d['active'] self.open = d['open'] self.cursor_pos = 0 self.votes = d['votes'] self.open = await self.is_open() self.active = await self.is_active() async def save_to_db(self): await self.bot.db.polls.update_one({'server_id': str(self.server.id), 'short': str(self.short)}, {'$set': await self.to_dict()}, upsert=True) @staticmethod async def load_from_db(bot, server_id, short, ctx=None, ): query = await bot.db.polls.find_one({'server_id': str(server_id), 'short': short}) if query is not None: p = Poll(bot, ctx, load=True) await p.from_dict(query) return p else: return None async def add_field_custom(self, name, value, embed): """this is used to estimate the width of text and add empty embed fields for a cleaner report cursor_pos is used to track if we are at the start of a new line in the report. Each line has max 2 slots for info. If the line is short, we can fit a second field, if it is too long, we get an automatic linebreak. If it is in between, we create an empty field to prevent the inline from looking ugly""" name = str(name) value = str(value) nwidth = afm.string_width_height(unidecode(name)) vwidth = afm.string_width_height(unidecode(value)) w = max(nwidth[0], vwidth[0]) embed.add_field(name=name, value=value, inline=False if w > 12500 and self.cursor_pos % 2 == 1 else True) self.cursor_pos += 1 # create an empty field if we are at the second slot and the width of the first slot is between the critical values if self.cursor_pos % 2 == 1 and 11600 < w < 20000: embed.add_field(name='\u200b', value='\u200b', inline=True) self.cursor_pos += 1 return embed async def generate_embed(self): """Generate Discord Report""" self.cursor_pos = 0 embed = discord.Embed(title='', colour=SETTINGS.color) # f'Status: {"Open" if self.is_open() else "Closed"}' embed.set_author(name=f' >> {self.short} ', icon_url=SETTINGS.author_icon) embed.set_thumbnail(url=SETTINGS.report_icon) # ## adding fields with custom, length sensitive function if not await self.is_active(): embed = await self.add_field_custom(name='**INACTIVE**', value=f'This poll is inactive until ' f'{self.get_activation_date(string=True)}.', embed=embed ) embed = await self.add_field_custom(name='**Poll Question**', value=self.name, embed=embed) embed = await self.add_field_custom(name='**Roles**', value=', '.join(self.roles), embed=embed) if len(self.weights_roles) > 0: weights = [] for r, n in zip(self.weights_roles, self.weights_numbers): weights.append(f'{r}: {n}') embed = await self.add_field_custom(name='**Weights**', value=', '.join(weights), embed=embed) embed = await self.add_field_custom(name='**Anonymous**', value=self.anonymous, embed=embed) # embed = await self.add_field_custom(name='**Multiple Choice**', value=self.multiple_choice, embed=embed) embed = await self.add_field_custom(name='**Deadline**', value=await self.get_poll_status(), embed=embed) embed = await self.add_field_custom(name='**Author**', value=self.author.name, embed=embed) if self.reaction: if self.options_reaction_default: if await self.is_open(): text = f'**Score** ' if self.multiple_choice == 0: text += f'(Multiple Choice)' elif self.multiple_choice == 1: text += f'(Single Choice)' else: text += f'({self.multiple_choice} Choices)' else: text = f'**Final Score**' vote_display = [] for i, r in enumerate(self.options_reaction): vote_display.append(f'{r} {self.count_votes(i)}') embed = await self.add_field_custom(name=text, value=' '.join(vote_display), embed=embed) else: embed.add_field(name='\u200b', value='\u200b', inline=False) if await self.is_open(): text = f'*Vote by adding reactions to the poll*. ' if self.multiple_choice == 0: text += '*You can vote for multiple options.*' elif self.multiple_choice == 1: text += '*You have 1 vote, but can change it.*' else: text += f'*You have {self.multiple_choice} choices and can change them.*' else: text = f'*Final Results of the Poll* ' if self.multiple_choice == 0: text += '*(Multiple Choice).*' elif self.multiple_choice == 1: text += '*(Single Choice).*' else: text += f'*(With up to {self.multiple_choice} choices).*' embed = await self.add_field_custom(name='**Options**', value=text, embed=embed) for i, r in enumerate(self.options_reaction): embed = await self.add_field_custom( name=f':regional_indicator_{ascii_lowercase[i]}: {self.count_votes(i)}', value=r, embed=embed ) # else: # embed = await self.add_field_custom(name='**Options**', value=', '.join(self.get_options()), embed=embed) embed.set_footer(text='React with ā” to get info. It is not a vote option.') return embed async def post_embed(self, destination): msg = await destination.send(embed=await self.generate_embed()) if self.reaction and await self.is_open() and await self.is_active(): if self.options_reaction_default: for r in self.options_reaction: await msg.add_reaction(r) await msg.add_reaction('ā”') return msg else: for i, r in enumerate(self.options_reaction): await msg.add_reaction(AZ_EMOJIS[i]) await msg.add_reaction('ā”') return msg elif not await self.is_open(): await msg.add_reaction('ā”') await msg.add_reaction('šŸ“Ž') else: return msg def get_duration_with_tz(self): if self.duration == 0: return 0 elif isinstance(self.duration, datetime.datetime): dt = self.duration if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: dt = pytz.utc.localize(dt) if isinstance(self.duration_tz, float): tz = possible_timezones(self.duration_tz, common_only=True) if not tz: tz = pytz.timezone('UTC') else: # choose one valid timezone with the offset try: tz = pytz.timezone(tz[0]) except UnknownTimeZoneError: tz = pytz.UTC else: try: tz = pytz.timezone(self.duration_tz) except UnknownTimeZoneError: tz = pytz.UTC return dt.astimezone(tz) def get_activation_with_tz(self): if self.activation == 0: return 0 elif isinstance(self.activation, datetime.datetime): dt = self.activation if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: dt = pytz.utc.localize(dt) if isinstance(self.activation_tz, float): tz = possible_timezones(self.activation_tz, common_only=True) if not tz: tz = pytz.timezone('UTC') else: # choose one valid timezone with the offset tz = pytz.timezone(tz[0]) else: tz = pytz.timezone(self.activation_tz) return dt.astimezone(tz) async def get_deadline(self, string=False): if self.duration == 0: if string: return 'No deadline' else: return 0 else: deadline = self.get_duration_with_tz() if string: return deadline.strftime('%d-%b-%Y %H:%M %Z') else: return deadline def get_activation_date(self, string=False): if self.activation == 0: if string: return 'manually activated' else: return 0 else: activation_date = self.get_activation_with_tz() if string: return activation_date.strftime('%d-%b-%Y %H:%M %Z') else: return activation_date async def get_poll_status(self): if await self.is_open(): return await self.get_deadline(string=True) else: return 'Poll is closed.' def count_votes(self, option, weighted=True): '''option: number from 0 to n''' if weighted: return sum([self.votes[c]['weight'] for c in [u for u in self.votes] if option in self.votes[c]['choices']]) else: return sum([1 for c in [u for u in self.votes] if option in self.votes[c]['choices']]) async def vote(self, user, option, message, lock): if not await self.is_open(): # refresh to show closed poll await message.edit(embed=await self.generate_embed()) await message.clear_reactions() return elif not await self.is_active(): return choice = 'invalid' # refresh_poll = True # get weight weight = 1 if self.weights_roles.__len__() > 0: valid_weights = [self.weights_numbers[self.weights_roles.index(r)] for r in list(set([n.name for n in user.roles]).intersection(set(self.weights_roles)))] if valid_weights.__len__() > 0: weight = max(valid_weights) if str(user.id) not in self.votes: self.votes[str(user.id)] = {'weight': weight, 'choices': []} else: self.votes[str(user.id)]['weight'] = weight if self.options_reaction_default: if option in self.options_reaction: choice = self.options_reaction.index(option) else: if option in AZ_EMOJIS: choice = AZ_EMOJIS.index(option) if choice != 'invalid': # if self.multiple_choice != 1: # more than 1 choice (0 = no limit) if choice in self.votes[str(user.id)]['choices']: if self.anonymous: # anonymous multiple choice -> can't unreact so we toggle with react logger.warning("Unvoting, should not happen for non anon polls.") await self.unvote(user, option, message, lock) # refresh_poll = False else: if self.multiple_choice > 0 and self.votes[str(user.id)]['choices'].__len__() >= self.multiple_choice: say_text = f'You have reached the **maximum choices of {self.multiple_choice}** for this poll. ' \ f'Before you can vote again, you need to unvote one of your choices.' embed = discord.Embed(title='', description=say_text, colour=SETTINGS.color) embed.set_author(name='Pollmaster', icon_url=SETTINGS.author_icon) await user.send(embed=embed) # refresh_poll = False else: self.votes[str(user.id)]['choices'].append(choice) self.votes[str(user.id)]['choices'] = list(set(self.votes[str(user.id)]['choices'])) # else: # if [choice] == self.votes[user.id]['choices']: # # refresh_poll = False # # if self.anonymous: # # undo anonymous vote # await self.unvote(user, option, message, lock) # return # else: # self.votes[user.id]['choices'] = [choice] else: # unknow emoji return # commit await self.save_to_db() asyncio.ensure_future(message.edit(embed=await self.generate_embed())) async def unvote(self, user, option, message, lock): if not await self.is_open(): # refresh to show closed poll await message.edit(embed=await self.generate_embed()) if not isinstance(message.channel, discord.abc.PrivateChannel): await message.clear_reactions() return elif not await self.is_active(): return if str(user.id) not in self.votes: return choice = 'invalid' if self.options_reaction_default: if option in self.options_reaction: choice = self.options_reaction.index(option) else: if option in AZ_EMOJIS: choice = AZ_EMOJIS.index(option) if choice != 'invalid' and choice in self.votes[str(user.id)]['choices']: try: self.votes[str(user.id)]['choices'].remove(choice) await self.save_to_db() asyncio.ensure_future(message.edit(embed=await self.generate_embed())) except ValueError: pass async def has_required_role(self, user): return not set([r.name for r in user.roles]).isdisjoint(self.roles)