pollmaster-docker/cogs/poll.py
Matthias Nadler dae2a6e494 Migration to rewrite
some performance chances with message caching/fetching and user fetching
2019-04-12 20:30:36 +02:00

1210 lines
49 KiB
Python

import asyncio
import codecs
import datetime
import logging
import os
import re
import dateparser
import pytz
import regex
import discord
from uuid import uuid4
from string import ascii_lowercase
from matplotlib import rcParams
from matplotlib.afm import AFM
from pytz import UnknownTimeZoneError
from unidecode import unidecode
from essentials.multi_server import get_pre
from essentials.exceptions import *
from essentials.settings import SETTINGS
from utils.misc import possible_timezones
logger = logging.getLogger('bot')
## Helvetica is the closest font to Whitney (discord uses Whitney) in afm
## This is used to estimate text width and adjust the layout of the embeds
afm_fname = os.path.join(rcParams['datapath'], 'fonts', 'afm', 'phvr8a.afm')
with open(afm_fname, 'rb') as fh:
afm = AFM(fh)
## A-Z Emojis for Discord
AZ_EMOJIS = [(b'\\U0001f1a'.replace(b'a', bytes(hex(224 + (6 + i))[2:], "utf-8"))).decode("unicode-escape") for i in
range(26)]
class Poll:
def __init__(self, bot, ctx=None, server=None, channel=None, load=False):
self.bot = bot
self.cursor_pos = 0
if not load and ctx:
if server is None:
server = ctx.message.guild
if channel is None:
channel = ctx.message.channel
self.id = None
self.author = ctx.message.author
self.server = server
self.channel = channel
self.name = "Quick Poll"
self.short = str(uuid4())[0:23]
self.anonymous = False
self.reaction = True
self.multiple_choice = 1
self.options_reaction = ['yes', 'no']
self.options_reaction_default = False
# self.options_traditional = []
# self.options_traditional_default = False
self.roles = ['@everyone']
self.weights_roles = []
self.weights_numbers = []
self.duration = 0
self.duration_tz = 0.0
self.time_created = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
self.open = True
self.active = True
self.activation = 0
self.activation_tz = 0.0
self.votes = {}
async def is_open(self, update_db=True):
if self.server is None:
self.open = True
return
if self.open and self.duration != 0 \
and datetime.datetime.utcnow().replace(tzinfo=pytz.utc) > self.get_duration_with_tz():
self.open = False
if update_db:
await self.save_to_db()
return self.open
async def is_active(self, update_db=True):
if self.server is None:
self.active = False
return
if not self.active and self.activation != 0 \
and datetime.datetime.utcnow().replace(tzinfo=pytz.utc) > self.get_activation_with_tz():
self.active = True
if update_db:
await self.save_to_db()
return self.active
async def wizard_says(self, ctx, text, footer=True):
embed = discord.Embed(title="Poll creation Wizard", description=text, color=SETTINGS.color)
if footer:
embed.set_footer(text="Type `stop` to cancel the wizard.")
return await ctx.send(embed=embed)
async def wizard_says_edit(self, message, text, add=False):
if add and message.embeds.__len__() > 0:
text = message.embeds[0].description + text
embed = discord.Embed(title="Poll creation Wizard", description=text, color=SETTINGS.color)
embed.set_footer(text="Type `stop` to cancel the wizard.")
return await message.edit(embed=embed)
async def add_error(self, message, error):
text = ''
if message.embeds.__len__() > 0:
text = message.embeds[0].description + '\n\n:exclamation: ' + error
return await self.wizard_says_edit(message, text)
async def add_vaild(self, message, string):
text = ''
if message.embeds.__len__() > 0:
text = message.embeds[0].description + '\n\n' + string
return await self.wizard_says_edit(message, text)
async def get_user_reply(self, ctx):
"""Pre-parse user input for wizard"""
def check(m):
return m.author == self.author
reply = await self.bot.wait_for('message', check=check)
if reply and reply.content:
if reply.content.startswith(await get_pre(self.bot, reply)):
await self.wizard_says(ctx, f'You can\'t use bot commands during the Poll Creation Wizard.\n'
f'Stopping the Wizard and then executing the command:\n`{reply.content}`',
footer=False)
raise StopWizard
elif reply.content.lower() == 'stop':
await self.wizard_says(ctx, 'Poll Wizard stopped.', footer=False)
raise StopWizard
else:
return reply.content
else:
raise InvalidInput
def sanitize_string(self, string):
"""Sanitize user input for wizard"""
# sanitize input
if string is None:
raise InvalidInput
string = regex.sub("\p{C}+", "", string)
if set(string).issubset(set(' ')):
raise InvalidInput
return string
async def set_name(self, ctx, force=None):
"""Set the Question / Name of the Poll."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
min_len = 3
max_len = 200
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif min_len <= in_reply.__len__() <= max_len:
return in_reply
else:
raise InvalidInput
try:
self.name = await get_valid(force)
return
except InputError:
pass
text = ("**What is the question of your poll?**\n"
"Try to be descriptive without writing more than one sentence.")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
self.name = await get_valid(reply)
await self.add_vaild(message, self.name)
break
except InvalidInput:
await self.add_error(message, '**Keep the name between 3 and 200 valid characters**')
async def set_short(self, ctx, force=None):
"""Set the label of the Poll."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
min_len = 2
max_len = 25
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif in_reply in ['open', 'closed', 'prepared']:
raise ReservedInput
elif await self.bot.db.polls.find_one({'server_id': str(self.server.id), 'short': in_reply}) is not None:
raise DuplicateInput
elif min_len <= in_reply.__len__() <= max_len and in_reply.split(" ").__len__() == 1:
return in_reply
else:
raise InvalidInput
try:
self.short = await get_valid(force)
return
except InputError:
pass
text = """Great. **Now type a unique one word identifier, a label, for your poll.**
This label will be used to refer to the poll. Keep it short and significant."""
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
self.short = await get_valid(reply)
await self.add_vaild(message, self.short)
break
except InvalidInput:
await self.add_error(message, '**Only one word between 2 and 25 valid characters!**')
except ReservedInput:
await self.add_error(message, '**Can\'t use reserved words (open, closed, prepared) as label!**')
except DuplicateInput:
await self.add_error(message,
f'**The label `{reply}` is not unique on this server. Choose a different one!**')
async def set_preparation(self, ctx, force=None):
"""Set the preparation conditions for the Poll."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif in_reply == '0':
return 0
dt = dateparser.parse(in_reply)
if not isinstance(dt, datetime.datetime):
raise InvalidInput
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = dt.astimezone(pytz.utc)
now = datetime.datetime.utcnow().astimezone(pytz.utc)
# print(now, now.tzinfo)
# print("orig",dt , dt.tzinfo)
# dt = dt.astimezone(pytz.utc)
# print("converted", dt, dt.tzinfo)
if dt < now:
raise DateOutOfRange(dt)
return dt
try:
dt = await get_valid(force)
self.activation = dt
if self.activation != 0:
self.activation_tz = dt.utcoffset().total_seconds() / 3600
self.active = False
return
except InputError:
pass
text = ("This poll will be created inactive. You can either schedule activation at a certain date or activate "
"it manually. **Type `0` to activate it manually or tell me when you want to activate it** by "
"typing an absolute or relative date. You can specify a timezone if you want.\n"
"Examples: `in 2 days`, `next week CET`, `may 3rd 2019`, `9.11.2019 9pm EST` ")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
dt = await get_valid(reply)
self.activation = dt
if self.activation == 0:
await self.add_vaild(message, 'manually activated')
else:
self.activation_tz = dt.utcoffset().total_seconds() / 3600
await self.add_vaild(message, self.activation.strftime('%d-%b-%Y %H:%M %Z'))
self.active = False
break
except InvalidInput:
await self.add_error(message, '**I could not understand that format.**')
except TypeError:
await self.add_error(message, '**Type Error.**')
except DateOutOfRange as e:
await self.add_error(message, f'**{e.date.strftime("%d-%b-%Y %H:%M")} is in the past.**')
async def set_anonymous(self, ctx, force=None):
"""Determine if poll is anonymous."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
is_true = ['yes', '1']
is_false = ['no', '0']
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif in_reply.lower() in is_true:
return True
elif in_reply.lower() in is_false:
return False
else:
raise InvalidInput
try:
self.anonymous = await get_valid(force)
return
except InputError:
pass
text = ("Next you need to decide: **Do you want your poll to be anonymous?**\n"
"\n"
"`0 - No`\n"
"`1 - Yes`\n"
"\n"
"An anonymous poll has the following effects:\n"
"🔹 You will never see who voted for which option\n"
"🔹 Once the poll is closed, you will see who participated (but not their choice)")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
self.anonymous = await get_valid(reply)
await self.add_vaild(message, f'{"Yes" if self.anonymous else "No"}')
break
except InvalidInput:
await self.add_error(message, '**You can only answer with `yes` | `1` or `no` | `0`!**')
async def set_multiple_choice(self, ctx, force=None):
"""Determine if poll is multiple choice."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif not in_reply.isdigit():
raise ExpectedInteger
elif int(in_reply) > self.options_reaction.__len__():
raise OutOfRange
elif int(in_reply) <= self.options_reaction.__len__() >= 0:
return int(in_reply)
else:
raise InvalidInput
try:
self.multiple_choice = await get_valid(force)
return
except InputError:
pass
text = ("**How many options should the voters be able choose?**\n"
"\n"
"`0 - No Limit: Multiple Choice`\n"
"`1 - Single Choice`\n"
"`2+ - Specify exactly how many Choices`\n"
"\n"
"If the maximum choices are reached for a voter, they have to unvote an option before being able to "
"vote for a different one.")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
self.multiple_choice = await get_valid(reply)
await self.add_vaild(message, f'{self.multiple_choice if self.multiple_choice > 0 else "No Limit"}')
break
except InvalidInput:
await self.add_error(message, '**Invalid Input**')
except ExpectedInteger:
await self.add_error(message, '**Enter a positive number**')
except OutOfRange:
await self.add_error(message, '**You can\'t have more choices than options.**')
async def set_options_reaction(self, ctx, force=None):
"""Set the answers / options of the Poll."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
preset = ['1','2','3','4']
split = [r.strip() for r in in_reply.split(",")]
if split.__len__() == 1:
if split[0] in preset:
return int(split[0])
else:
raise WrongNumberOfArguments
elif 1 < split.__len__() <= 26:
split = [self.sanitize_string(o) for o in split]
if any([len(o) < 1 for o in split]):
raise InvalidInput
else:
return split
else:
raise WrongNumberOfArguments
def get_preset_options(number):
if number == 1:
return ['', '']
elif number == 2:
return ['👍', '🤐', '👎']
elif number == 3:
return ['😍', '👍', '🤐', '👎', '🤢']
elif number == 4:
return ['in favour', 'against', 'abstaining']
try:
options = await get_valid(force)
self.options_reaction_default = False
if isinstance(options, int):
self.options_reaction = get_preset_options(options)
if options <= 3:
self.options_reaction_default = True
else:
self.options_reaction = options
return
except InputError:
pass
text = ("**Choose the options/answers for your poll.**\n"
"Either chose a preset of options or type your own options, separated by commas.\n"
"\n"
"**1** - :white_check_mark: :negative_squared_cross_mark:\n"
"**2** - :thumbsup: :zipper_mouth: :thumbsdown:\n"
"**3** - :heart_eyes: :thumbsup: :zipper_mouth: :thumbsdown: :nauseated_face:\n"
"**4** - in favour, against, abstaining\n"
"\n"
"Example for custom options:\n"
"**apple juice, banana ice cream, kiwi slices** ")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
options = await get_valid(reply)
self.options_reaction_default = False
if isinstance(options, int):
self.options_reaction = get_preset_options(options)
if options <= 3:
self.options_reaction_default = True
else:
self.options_reaction = options
await self.add_vaild(message, f'{", ".join(self.options_reaction)}')
break
except InvalidInput:
await self.add_error(message,
'**Invalid entry. Type `1`, `2`, `3` or `4` or a comma separated list of '
'up to 26 options.**')
except WrongNumberOfArguments:
await self.add_error(message,
'**You need more than 1 option! Type them in a comma separated list.**')
async def set_roles(self, ctx, force=None):
"""Set role restrictions for the Poll."""
async def get_valid(in_reply, roles):
n_roles = roles.__len__()
if not in_reply:
raise InvalidInput
split = [self.sanitize_string(r.strip()) for r in in_reply.split(",")]
if split.__len__() == 1 and split[0] in ['0', 'all', 'everyone']:
return ['@everyone']
if n_roles <= 20 and force is None:
if not all([r.isdigit() for r in split]):
raise ExpectedInteger
elif any([int(r) > n_roles for r in split]):
raise OutOfRange
role_names = [r.name for r in roles]
return [role_names[i - 1] for i in [int(r) for r in split]]
else:
invalid_roles = []
for r in split:
if not any([r == sr for sr in [x.name for x in roles]]):
invalid_roles.append(r)
if invalid_roles.__len__() > 0:
raise InvalidRoles(", ".join(invalid_roles))
else:
return split
roles = self.server.roles
n_roles = roles.__len__()
try:
self.roles = await get_valid(force, roles)
return
except InputError:
pass
if n_roles <= 20:
text = ("**Choose which roles are allowed to vote.**\n"
"Type `0`, `all` or `everyone` to have no restrictions.\n"
"If you want multiple roles to be able to vote, separate the numbers with a comma.\n")
text += f'\n`{0} - no restrictions`'
for i, role in enumerate([r.name for r in self.server.roles]):
text += f'\n`{i+1} - {role}`'
text += ("\n"
"\n"
" Example: `2, 3` \n")
else:
text = ("**Choose which roles are allowed to vote.**\n"
"Type `0`, `all` or `everyone` to have no restrictions.\n"
"Type out the role names, separated by a comma, to restrict voting to specific roles:\n"
"`moderators, Editors, vips` (hint: role names are case sensitive!)\n")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
self.roles = await get_valid(reply, roles)
await self.add_vaild(message, f'{", ".join(self.roles)}')
break
except InvalidInput:
await self.add_error(message, '**I can\'t read this input.**')
except ExpectedInteger:
await self.add_error(message, '**Only type positive numbers separated by a comma.**')
except OutOfRange:
await self.add_error(message, '**Only type numbers you can see in the list.**')
except InvalidRoles as e:
await self.add_error(message, f'**The following roles are invalid: {e.roles}**')
async def set_weights(self, ctx, force=None):
"""Set role weights for the poll."""
async def get_valid(in_reply, server_roles):
if not in_reply:
raise InvalidInput
no_weights = ['0', 'none']
pairs = [self.sanitize_string(p.strip()) for p in in_reply.split(",")]
if pairs.__len__() == 1 and pairs[0] in no_weights:
return [[], []]
if not all([":" in p for p in pairs]):
raise ExpectedSeparator(":")
roles = []
weights = []
for e in pairs:
c = [x.strip() for x in e.rsplit(":", 1)]
if not any([c[0] == sr for sr in [x.name for x in server_roles]]):
raise InvalidRoles(c[0])
c[1] = float(c[1]) # Catch ValueError
c[1] = int(c[1]) if c[1].is_integer() else c[1]
roles.append(c[0])
weights.append(c[1])
if len(roles) > len(set(roles)):
raise WrongNumberOfArguments
return [roles, weights]
try:
w_n = await get_valid(force, self.server.roles)
self.weights_roles = w_n[0]
self.weights_numbers = w_n[1]
return
except InputError:
pass
text = ("Almost done.\n"
"**Weights allow you to give certain roles more or less effective votes.\n"
"Type `0` or `none` if you don't need any weights.**\n"
"A weight for the role `moderator` of `2` for example will automatically count the votes of all the moderators twice.\n"
"To assign weights type the role, followed by a colon, followed by the weight like this:\n"
"`moderator: 2, newbie: 0.5`")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
w_n = await get_valid(reply, self.server.roles)
self.weights_roles = w_n[0]
self.weights_numbers = w_n[1]
weights = []
for r, n in zip(self.weights_roles, self.weights_numbers):
weights.append(f'{r}: {n}')
await self.add_vaild(message, ", ".join(weights))
break
except InvalidInput:
await self.add_error(message, '**Can\'t read this input.**')
except ExpectedSeparator as e:
await self.add_error(message, f'**Expected roles and weights to be separated by {e.separator}**')
except InvalidRoles as e:
await self.add_error(message, f'**Invalid role found: {e.roles}**')
except ValueError:
await self.add_error(message, f'**Weights must be numbers.**')
except WrongNumberOfArguments:
await self.add_error(message, f'**Not every role has a weight assigned.**')
async def set_duration(self, ctx, force=None):
"""Set the duration /deadline for the Poll."""
async def get_valid(in_reply):
if not in_reply:
raise InvalidInput
in_reply = self.sanitize_string(in_reply)
if not in_reply:
raise InvalidInput
elif in_reply == '0':
return 0
dt = dateparser.parse(in_reply)
if not isinstance(dt, datetime.datetime):
raise InvalidInput
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = dt.astimezone(pytz.utc)
now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
if dt < now:
raise DateOutOfRange(dt)
return dt
try:
dt = await get_valid(force)
self.duration = dt
if self.duration != 0:
self.duration_tz = dt.utcoffset().total_seconds() / 3600
return
except InputError:
pass
text = ("Last step.\n"
"**When should the poll be closed?**\n"
"If you want the poll to last indefinitely (until you close it), type `0`."
"Otherwise tell me when the poll should close in relative or absolute terms. "
"You can specify a timezone if you want.\n"
"\n"
"Examples: `in 6 hours` or `next week CET` or `aug 15th 5:10` or `15.8.2019 11pm EST`")
message = await self.wizard_says(ctx, text)
while True:
try:
if force:
reply = force
force = None
else:
reply = await self.get_user_reply(ctx)
dt = await get_valid(reply)
self.duration = dt
if self.duration == 0:
await self.add_vaild(message, 'until closed manually')
else:
self.duration_tz = dt.utcoffset().total_seconds() / 3600
await self.add_vaild(message, self.duration.strftime('%d-%b-%Y %H:%M %Z'))
break
except InvalidInput:
await self.add_error(message, '**I could not understand that format.**')
except TypeError:
await self.add_error(message, '**Type Error.**')
except DateOutOfRange as e:
await self.add_error(message, f'**{e.date.strftime("%d-%b-%Y %H:%M")} is in the past.**')
def finalize(self):
self.time_created = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
async def to_dict(self):
if self.channel is None:
cid = 0
else:
cid = self.channel.id
if self.author is None:
aid = 0
else:
aid = self.author.id
return {
'server_id': str(self.server.id),
'channel_id': str(cid),
'author': str(aid),
'name': self.name,
'short': self.short,
'anonymous': self.anonymous,
'reaction': self.reaction,
'multiple_choice': self.multiple_choice,
'options_reaction': self.options_reaction,
'reaction_default': self.options_reaction_default,
#'options_traditional': self.options_traditional,
'roles': self.roles,
'weights_roles': self.weights_roles,
'weights_numbers': self.weights_numbers,
'duration': self.duration,
'duration_tz': self.duration_tz,
'time_created': self.time_created,
'open': self.open,
'active': self.active,
'activation': self.activation,
'activation_tz': self.activation_tz,
'votes': self.votes
}
async def to_export(self):
"""Create report and return string"""
# build string for weights
weight_str = 'No weights'
if self.weights_roles.__len__() > 0:
weights = []
for r, n in zip(self.weights_roles, self.weights_numbers):
weights.append(f'{r}: {n}')
weight_str = ', '.join(weights)
# Determine the poll winner
winning_options = []
winning_votes = 0
for i, o in enumerate(self.options_reaction):
votes = self.count_votes(i, weighted=True)
if votes > winning_votes:
winning_options = [o]
winning_votes = votes
elif votes == winning_votes:
winning_options.append(o)
deadline_str = await self.get_deadline(string=True)
export = (f'--------------------------------------------\n'
f'POLLMASTER DISCORD EXPORT\n'
f'--------------------------------------------\n'
f'Server name (ID): {self.server.name} ({self.server.id})\n'
f'Owner of the poll: {self.author.name}\n'
f'Time of creation: {self.time_created.strftime("%d-%b-%Y %H:%M %Z")}\n'
f'--------------------------------------------\n'
f'POLL SETTINGS\n'
f'--------------------------------------------\n'
f'Question / Name: {self.name}\n'
f'Label: {self.short}\n'
f'Anonymous: {"Yes" if self.anonymous else "No"}\n'
f'Multiple choice: {"Yes" if self.multiple_choice else "No"}\n'
f'Answer options: {", ".join(self.options_reaction)}\n'
f'Allowed roles: {", ".join(self.roles) if self.roles.__len__() > 0 else "@everyone"}\n'
f'Weights for roles: {weight_str}\n'
f'Deadline: {deadline_str}\n'
f'--------------------------------------------\n'
f'POLL RESULTS\n'
f'--------------------------------------------\n'
f'Number of participants: {self.votes.__len__()}\n'
f'Raw results: {", ".join([str(o)+": "+str(self.count_votes(i, weighted=False)) for i,o in enumerate(self.options_reaction)])}\n'
f'Weighted results: {", ".join([str(o)+": "+str(self.count_votes(i, weighted=True)) for i,o in enumerate(self.options_reaction)])}\n'
f'Winning option{"s" if winning_options.__len__() > 1 else ""}: {", ".join(winning_options)} with {winning_votes} votes\n')
if not self.anonymous:
export += '--------------------------------------------\n' \
'DETAILED POLL RESULTS\n' \
'--------------------------------------------'
for user_id in self.votes:
member = self.server.get_member(user_id)
if self.votes[str(member.id)]['choices'].__len__() == 0:
continue
name = member.nick
if not name:
name = member.name
export += f'\n{name}'
if self.votes[str(member.id)]['weight'] != 1:
export += f' (weight: {self.votes[str(user_id)]["weight"]})'
export += ': ' + ', '.join([self.options_reaction[c] for c in self.votes[str(user_id)]['choices']])
export += '\n'
else:
export += '--------------------------------------------\n' \
'LIST OF PARTICIPANTS\n' \
'--------------------------------------------'
for user_id in self.votes:
member = self.server.get_member(user_id)
if self.votes[str(user_id)]['choices'].__len__() == 0:
continue
name = member.nick
if not name:
name = member.name
export += f'\n{name}'
if self.votes[str(user_id)]['weight'] != 1:
export += f' (weight: {self.votes[str(user_id)]["weight"]})'
# export += ': ' + ', '.join([self.options_reaction[c] for c in self.votes[str(user_id)]['choices']])
export += '\n'
export += ('--------------------------------------------\n'
'BOT DETAILS\n'
'--------------------------------------------\n'
'Creator: Newti#0654\n'
'Link to invite, vote for or support Pollmaster:\n'
'https://discordbots.org/bot/444514223075360800\n'
'--------------------------------------------\n'
'END OF FILE\n'
'--------------------------------------------\n')
return export
async def export(self):
"""Create export file and return path"""
if not self.open:
fn = 'export/' + str(self.server.id) + '_' + str(self.short) + '.txt'
with codecs.open(fn, 'w', 'utf-8') as outfile:
outfile.write(await self.to_export())
return fn
else:
return None
async def from_dict(self, d):
self.id = d['_id']
self.server = self.bot.get_guild(int(d['server_id']))
self.channel = self.bot.get_channel(int(d['channel_id']))
if self.server:
self.author = self.server.get_member(int(d['author']))
else:
self.author = None
self.name = d['name']
self.short = d['short']
self.anonymous = d['anonymous']
self.reaction = d['reaction']
# backwards compatibility for multiple choice
if isinstance(d['multiple_choice'], bool):
if d['multiple_choice']:
self.multiple_choice = 0
else:
self.multiple_choice = 1
else:
try:
self.multiple_choice = int(d['multiple_choice'])
except ValueError:
logger.exception('Multiple Choice not an int or bool.')
self.multiple_choice = 0 # default
self.options_reaction = d['options_reaction']
self.options_reaction_default = d['reaction_default']
# self.options_traditional = d['options_traditional']
self.roles = d['roles']
self.weights_roles = d['weights_roles']
self.weights_numbers = d['weights_numbers']
self.duration = d['duration']
self.duration_tz = d['duration_tz']
self.time_created = d['time_created']
self.activation = d['activation']
self.activation_tz = d['activation_tz']
self.active = d['active']
self.open = d['open']
self.cursor_pos = 0
self.votes = d['votes']
self.open = await self.is_open()
self.active = await self.is_active()
async def save_to_db(self):
await self.bot.db.polls.update_one({'server_id': str(self.server.id), 'short': str(self.short)},
{'$set': await self.to_dict()}, upsert=True)
@staticmethod
async def load_from_db(bot, server_id, short, ctx=None, ):
query = await bot.db.polls.find_one({'server_id': str(server_id), 'short': short})
if query is not None:
p = Poll(bot, ctx, load=True)
await p.from_dict(query)
return p
else:
return None
async def add_field_custom(self, name, value, embed):
"""this is used to estimate the width of text and add empty embed fields for a cleaner report
cursor_pos is used to track if we are at the start of a new line in the report. Each line has max 2 slots for info.
If the line is short, we can fit a second field, if it is too long, we get an automatic linebreak.
If it is in between, we create an empty field to prevent the inline from looking ugly"""
name = str(name)
value = str(value)
nwidth = afm.string_width_height(unidecode(name))
vwidth = afm.string_width_height(unidecode(value))
w = max(nwidth[0], vwidth[0])
embed.add_field(name=name, value=value, inline=False if w > 12500 and self.cursor_pos % 2 == 1 else True)
self.cursor_pos += 1
# create an empty field if we are at the second slot and the width of the first slot is between the critical values
if self.cursor_pos % 2 == 1 and 11600 < w < 20000:
embed.add_field(name='\u200b', value='\u200b', inline=True)
self.cursor_pos += 1
return embed
async def generate_embed(self):
"""Generate Discord Report"""
self.cursor_pos = 0
embed = discord.Embed(title='', colour=SETTINGS.color) # f'Status: {"Open" if self.is_open() else "Closed"}'
embed.set_author(name=f' >> {self.short} ',
icon_url=SETTINGS.author_icon)
embed.set_thumbnail(url=SETTINGS.report_icon)
# ## adding fields with custom, length sensitive function
if not await self.is_active():
embed = await self.add_field_custom(name='**INACTIVE**',
value=f'This poll is inactive until '
f'{self.get_activation_date(string=True)}.',
embed=embed
)
embed = await self.add_field_custom(name='**Poll Question**', value=self.name, embed=embed)
embed = await self.add_field_custom(name='**Roles**', value=', '.join(self.roles), embed=embed)
if len(self.weights_roles) > 0:
weights = []
for r, n in zip(self.weights_roles, self.weights_numbers):
weights.append(f'{r}: {n}')
embed = await self.add_field_custom(name='**Weights**', value=', '.join(weights), embed=embed)
embed = await self.add_field_custom(name='**Anonymous**', value=self.anonymous, embed=embed)
# embed = await self.add_field_custom(name='**Multiple Choice**', value=self.multiple_choice, embed=embed)
embed = await self.add_field_custom(name='**Deadline**', value=await self.get_poll_status(), embed=embed)
embed = await self.add_field_custom(name='**Author**', value=self.author.name, embed=embed)
if self.reaction:
if self.options_reaction_default:
if await self.is_open():
text = f'**Score** '
if self.multiple_choice == 0:
text += f'(Multiple Choice)'
elif self.multiple_choice == 1:
text += f'(Single Choice)'
else:
text += f'({self.multiple_choice} Choices)'
else:
text = f'**Final Score**'
vote_display = []
for i, r in enumerate(self.options_reaction):
vote_display.append(f'{r} {self.count_votes(i)}')
embed = await self.add_field_custom(name=text, value=' '.join(vote_display), embed=embed)
else:
embed.add_field(name='\u200b', value='\u200b', inline=False)
if await self.is_open():
text = f'*Vote by adding reactions to the poll*. '
if self.multiple_choice == 0:
text += '*You can vote for multiple options.*'
elif self.multiple_choice == 1:
text += '*You have 1 vote, but can change it.*'
else:
text += f'*You have {self.multiple_choice} choices and can change them.*'
else:
text = f'*Final Results of the Poll* '
if self.multiple_choice == 0:
text += '*(Multiple Choice).*'
elif self.multiple_choice == 1:
text += '*(Single Choice).*'
else:
text += f'*(With up to {self.multiple_choice} choices).*'
embed = await self.add_field_custom(name='**Options**', value=text, embed=embed)
for i, r in enumerate(self.options_reaction):
embed = await self.add_field_custom(
name=f':regional_indicator_{ascii_lowercase[i]}: {self.count_votes(i)}',
value=r,
embed=embed
)
# else:
# embed = await self.add_field_custom(name='**Options**', value=', '.join(self.get_options()), embed=embed)
embed.set_footer(text='React with ❔ to get info. It is not a vote option.')
return embed
async def post_embed(self, destination):
msg = await destination.send(embed=await self.generate_embed())
if self.reaction and await self.is_open() and await self.is_active():
if self.options_reaction_default:
for r in self.options_reaction:
await msg.add_reaction(r)
await msg.add_reaction('')
return msg
else:
for i, r in enumerate(self.options_reaction):
await msg.add_reaction(AZ_EMOJIS[i])
await msg.add_reaction('')
return msg
elif not await self.is_open():
await msg.add_reaction('')
await msg.add_reaction('📎')
else:
return msg
def get_duration_with_tz(self):
if self.duration == 0:
return 0
elif isinstance(self.duration, datetime.datetime):
dt = self.duration
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = pytz.utc.localize(dt)
if isinstance(self.duration_tz, float):
tz = possible_timezones(self.duration_tz, common_only=True)
if not tz:
tz = pytz.timezone('UTC')
else:
# choose one valid timezone with the offset
try:
tz = pytz.timezone(tz[0])
except UnknownTimeZoneError:
tz = pytz.UTC
else:
try:
tz = pytz.timezone(self.duration_tz)
except UnknownTimeZoneError:
tz = pytz.UTC
return dt.astimezone(tz)
def get_activation_with_tz(self):
if self.activation == 0:
return 0
elif isinstance(self.activation, datetime.datetime):
dt = self.activation
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = pytz.utc.localize(dt)
if isinstance(self.activation_tz, float):
tz = possible_timezones(self.activation_tz, common_only=True)
if not tz:
tz = pytz.timezone('UTC')
else:
# choose one valid timezone with the offset
tz = pytz.timezone(tz[0])
else:
tz = pytz.timezone(self.activation_tz)
return dt.astimezone(tz)
async def get_deadline(self, string=False):
if self.duration == 0:
if string:
return 'No deadline'
else:
return 0
else:
deadline = self.get_duration_with_tz()
if string:
return deadline.strftime('%d-%b-%Y %H:%M %Z')
else:
return deadline
def get_activation_date(self, string=False):
if self.activation == 0:
if string:
return 'manually activated'
else:
return 0
else:
activation_date = self.get_activation_with_tz()
if string:
return activation_date.strftime('%d-%b-%Y %H:%M %Z')
else:
return activation_date
async def get_poll_status(self):
if await self.is_open():
return await self.get_deadline(string=True)
else:
return 'Poll is closed.'
def count_votes(self, option, weighted=True):
'''option: number from 0 to n'''
if weighted:
return sum([self.votes[c]['weight'] for c in [u for u in self.votes] if option in self.votes[c]['choices']])
else:
return sum([1 for c in [u for u in self.votes] if option in self.votes[c]['choices']])
async def vote(self, user, option, message, lock):
if not await self.is_open():
# refresh to show closed poll
await message.edit(embed=await self.generate_embed())
await message.clear_reactions()
return
elif not await self.is_active():
return
choice = 'invalid'
# refresh_poll = True
# get weight
weight = 1
if self.weights_roles.__len__() > 0:
valid_weights = [self.weights_numbers[self.weights_roles.index(r)] for r in
list(set([n.name for n in user.roles]).intersection(set(self.weights_roles)))]
if valid_weights.__len__() > 0:
weight = max(valid_weights)
if str(user.id) not in self.votes:
self.votes[str(user.id)] = {'weight': weight, 'choices': []}
else:
self.votes[str(user.id)]['weight'] = weight
if self.options_reaction_default:
if option in self.options_reaction:
choice = self.options_reaction.index(option)
else:
if option in AZ_EMOJIS:
choice = AZ_EMOJIS.index(option)
if choice != 'invalid':
# if self.multiple_choice != 1: # more than 1 choice (0 = no limit)
if choice in self.votes[str(user.id)]['choices']:
if self.anonymous:
# anonymous multiple choice -> can't unreact so we toggle with react
logger.warning("Unvoting, should not happen for non anon polls.")
await self.unvote(user, option, message, lock)
# refresh_poll = False
else:
if self.multiple_choice > 0 and self.votes[str(user.id)]['choices'].__len__() >= self.multiple_choice:
say_text = f'You have reached the **maximum choices of {self.multiple_choice}** for this poll. ' \
f'Before you can vote again, you need to unvote one of your choices.'
embed = discord.Embed(title='', description=say_text, colour=SETTINGS.color)
embed.set_author(name='Pollmaster', icon_url=SETTINGS.author_icon)
await user.send(embed=embed)
# refresh_poll = False
else:
self.votes[str(user.id)]['choices'].append(choice)
self.votes[str(user.id)]['choices'] = list(set(self.votes[str(user.id)]['choices']))
# else:
# if [choice] == self.votes[user.id]['choices']:
# # refresh_poll = False
# # if self.anonymous:
# # undo anonymous vote
# await self.unvote(user, option, message, lock)
# return
# else:
# self.votes[user.id]['choices'] = [choice]
else:
# unknow emoji
return
# commit
await self.save_to_db()
asyncio.ensure_future(message.edit(embed=await self.generate_embed()))
async def unvote(self, user, option, message, lock):
if not await self.is_open():
# refresh to show closed poll
await message.edit(embed=await self.generate_embed())
if not isinstance(message.channel, discord.abc.PrivateChannel):
await message.clear_reactions()
return
elif not await self.is_active():
return
if str(user.id) not in self.votes:
return
choice = 'invalid'
if self.options_reaction_default:
if option in self.options_reaction:
choice = self.options_reaction.index(option)
else:
if option in AZ_EMOJIS:
choice = AZ_EMOJIS.index(option)
if choice != 'invalid' and choice in self.votes[str(user.id)]['choices']:
try:
self.votes[str(user.id)]['choices'].remove(choice)
await self.save_to_db()
asyncio.ensure_future(message.edit(embed=await self.generate_embed()))
except ValueError:
pass
async def has_required_role(self, user):
return not set([r.name for r in user.roles]).isdisjoint(self.roles)