Added black formatting

This commit is contained in:
Jacob Mastel
2025-09-17 09:39:57 -07:00
parent 629942d447
commit 6cf6ecc78e

View File

@@ -13,38 +13,40 @@ SCRIPTPATH = os.path.dirname(os.path.abspath(__file__))
# parse config file
config = {}
configfile = configparser.ConfigParser()
configfile.read(SCRIPTPATH + '/mail-filter.conf')
config['FILTERS_FILE'] = configfile.get('main', 'Filters')
config['IS_DEBUG'] = configfile.getboolean('main', 'EnableDebugging')
config['CHECK_INTERVAL'] = int(configfile.get('main', 'MailCheckInterval'))
config['LOG_DIR'] = configfile.get('logging', 'LogDir')
config['TIMEZONE'] = configfile.get('logging', 'Timezone')
config['APP_CLIENT_ID'] = os.getenv('APP_CLIENT_ID')
config['APP_SECRET_KEY'] = os.getenv('APP_SECRET_KEY')
config['APP_TENANT_ID'] = os.getenv('APP_TENANT_ID')
configfile.read(SCRIPTPATH + "/mail-filter.conf")
config["FILTERS_FILE"] = configfile.get("main", "Filters")
config["IS_DEBUG"] = configfile.getboolean("main", "EnableDebugging")
config["CHECK_INTERVAL"] = int(configfile.get("main", "MailCheckInterval"))
config["LOG_DIR"] = configfile.get("logging", "LogDir")
config["TIMEZONE"] = configfile.get("logging", "Timezone")
config["APP_CLIENT_ID"] = os.getenv("APP_CLIENT_ID")
config["APP_SECRET_KEY"] = os.getenv("APP_SECRET_KEY")
config["APP_TENANT_ID"] = os.getenv("APP_TENANT_ID")
# convert timestamp to local time
def local_time(record, datefmt=None):
return pendulum.from_timestamp(
record.created,
tz=pendulum.timezone(config['TIMEZONE'])
).strftime('%Y-%m-%d %H:%M:%S %z')
record.created, tz=pendulum.timezone(config["TIMEZONE"])
).strftime("%Y-%m-%d %H:%M:%S %z")
# set up logger
logger = logging.getLogger('o365mf')
if config['IS_DEBUG']:
logger = logging.getLogger("o365mf")
if config["IS_DEBUG"]:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s %(module)s [%(levelname)s] %(message)s')
formatter = logging.Formatter("%(asctime)s %(module)s [%(levelname)s] %(message)s")
formatter.formatTime = local_time
log_filename = f"{config['LOG_DIR']}/mail-filter.log"
handler = logging.handlers.TimedRotatingFileHandler(
log_filename, when='midnight', backupCount=5)
log_filename, when="midnight", backupCount=5
)
handler.setFormatter(formatter)
logger.addHandler(handler)
# helper function for logging
def log(msg, level=logging.INFO):
print(msg)
@@ -53,9 +55,9 @@ def log(msg, level=logging.INFO):
class O365MailFilter(object):
_scopes = [
'basic',
'https://graph.microsoft.com/Mail.ReadWrite',
'https://graph.microsoft.com/MailboxSettings.Read'
"basic",
"https://graph.microsoft.com/Mail.ReadWrite",
"https://graph.microsoft.com/MailboxSettings.Read",
]
def __init__(self, config):
@@ -63,32 +65,30 @@ class O365MailFilter(object):
self._is_canceled = False
self._folders = {}
self._categories = {}
self._filtered_cache = {
'last_reset': None,
'ids': set()
}
self._filtered_cache = {"last_reset": None, "ids": set()}
# auth with O365
self._authenticate()
def _authenticate(self):
token_backend = FileSystemTokenBackend(token_path='.cache',
token_filename='token.txt')
token_backend = FileSystemTokenBackend(
token_path=".cache", token_filename="token.txt"
)
self._account = Account(
(self._config['APP_CLIENT_ID'], self._config['APP_SECRET_KEY']),
tenant_id=self._config['APP_TENANT_ID'],
token_backend=token_backend
(self._config["APP_CLIENT_ID"], self._config["APP_SECRET_KEY"]),
tenant_id=self._config["APP_TENANT_ID"],
token_backend=token_backend,
)
if not self._account.is_authenticated:
self._account.authenticate(scopes=self._scopes)
log('Authentication successful')
log("Authentication successful")
def _load_filters(self):
""" load filter code from a file on disk """
loader = SourceFileLoader('filters', self._config['FILTERS_FILE'])
"""load filter code from a file on disk"""
loader = SourceFileLoader("filters", self._config["FILTERS_FILE"])
module = loader.load_module()
module.init_filters(self)
# make 'filter_message()' implemented in the file available for use
@@ -96,14 +96,14 @@ class O365MailFilter(object):
self._filter_message = module.filter_message
def _load_folders(self, mailbox, folders=None, folder_path=None):
""" recursively cache folder IDs for this mailbox """
"""recursively cache folder IDs for this mailbox"""
if folders is None:
folders = mailbox.get_folders()
self._folders = {}
folder_path = ''
folder_path = ""
for folder in folders:
if folder_path == '':
if folder_path == "":
current_folder_path = f"{folder.name}"
else:
current_folder_path = f"{folder_path}/{folder.name}"
@@ -113,26 +113,22 @@ class O365MailFilter(object):
else:
# add child folders to the cache, because get_folders() doesn't
# descend into sub-folders by default
self._load_folders(mailbox, folder.get_folders(),
current_folder_path)
self._load_folders(mailbox, folder.get_folders(), current_folder_path)
def _load_categories(self):
""" cache Outlook categories for this account """
"""cache Outlook categories for this account"""
oc = self._account.outlook_categories()
categories = oc.get_categories()
for category in categories:
self._categories[category.name] = category
def _clear_cache(self):
""" clear the filtered message cache """
log('Clearing filtered message cache...', logging.DEBUG)
self._filtered_cache = {
'last_reset': pendulum.now(),
'ids': set()
}
"""clear the filtered message cache"""
log("Clearing filtered message cache...", logging.DEBUG)
self._filtered_cache = {"last_reset": pendulum.now(), "ids": set()}
def _repr_message(self, message):
""" returns a str representation of a message suitable for logging """
"""returns a str representation of a message suitable for logging"""
# to = ','.join([r.address for r in message.to])
return f"[FROM: {message.sender.address} SUBJ: {message.subject}]"
@@ -157,50 +153,56 @@ class O365MailFilter(object):
# the O365 library will not paginate results correctly
limit = self._account.protocol.max_top_value
query = inbox.new_query()
query = query.on_attribute('isRead').equals(False).select(
'to_recipients', 'from', 'subject', 'body',
'internet_message_headers'
query = (
query.on_attribute("isRead")
.equals(False)
.select(
"to_recipients", "from", "subject", "body", "internet_message_headers"
)
)
log(f" {pendulum.now()} Getting messages from inbox...", logging.DEBUG)
messages = inbox.get_messages(query=query, limit=limit, batch=25)
for message in messages:
log(f" {pendulum.now()} {message}", logging.DEBUG)
if message.object_id in self._filtered_cache['ids']:
if message.object_id in self._filtered_cache["ids"]:
# we've already filtered this message, so skip it
continue
self._filter_message(self, message)
self._filtered_cache['ids'].add(message.object_id)
self._filtered_cache["ids"].add(message.object_id)
def run(self):
""" run filter as a loop """
"""run filter as a loop"""
is_first_run = True
while not self._is_canceled:
# clear the filtered message cache if it's older than 4 hours
if (self._filtered_cache['last_reset'] is None or
self._filtered_cache['last_reset'] < pendulum.now().subtract(hours=4)):
if self._filtered_cache["last_reset"] is None or self._filtered_cache[
"last_reset"
] < pendulum.now().subtract(hours=4):
self._clear_cache()
log(f"Filtering the sludge @ {pendulum.now()}...", logging.DEBUG)
self.filter(is_first_run)
is_first_run = False
time.sleep(self._config['CHECK_INTERVAL'])
time.sleep(self._config["CHECK_INTERVAL"])
log('Done.')
log("Done.")
def exit(self):
self._is_canceled = True
log('Initializing O365 mail filter...')
log("Initializing O365 mail filter...")
o365mf = O365MailFilter(config)
def exit(signum, frame):
""" signal handler for a clean exit """
"""signal handler for a clean exit"""
log(f"Caught signal {signum}, exiting...")
o365mf.exit()
if __name__ == '__main__':
if __name__ == "__main__":
# register signal handlers
signal.signal(signal.SIGTERM, exit)
signal.signal(signal.SIGHUP, exit)