commit 99a9f9b57e0a3cea0e48b7ef43f9a6175deba43b Author: Kris Crawford Date: Mon Oct 30 07:54:47 2023 -0400 Initial save diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3552b9d --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__/ +bin/ +include/ +lib/ +.vscode/ +lib64 +share/ +pyvenv.cfg +output.log +.env +state.db diff --git a/collectChallenges.py b/collectChallenges.py new file mode 100644 index 0000000..18cc0f8 --- /dev/null +++ b/collectChallenges.py @@ -0,0 +1,257 @@ +import os +import logging +import argparse +from dotenv import load_dotenv +import requests +import json +import sqlite3 +import hashlib +from sqlite3 import Error +from datetime import datetime +from dateutil import tz + + +### ------------------ +# Vars and inital setup + +# Hard-code from/to timezones +fromZone = tz.gettz('UTC') +toZone = tz.gettz('America/New_York') +# These are currently not used + +# load_dotenv will look for a .env file and if it finds one it will load the environment variables from it +# Ex: TOKEN=123 +load_dotenv() +JOPLIN_TOKEN = os.getenv("JOPLIN_TOKEN") +NOTEBOOK_ID = os.getenv("NOTEBOOK_ID") +SOLVED_ID = os.getenv("SOLVED_ID") +CTF_TOKEN = os.getenv("CTF_TOKEN") +CTF_URL = os.getenv("CTF_URL") + +# Argparse +parser = argparse.ArgumentParser(prog=__file__, description='Collects CTF Challenge details and stores them in Joplin') +parser.add_argument('-v', '--verbose', action='store_true') +args = parser.parse_args() + +# Configure logger +logging.basicConfig(filename="output.log", format='%(asctime)s %(message)s', filemode='a') +logger=logging.getLogger() +if args.verbose: + logger.setLevel(logging.DEBUG) +else: + logger.setLevel(logging.INFO) + +# Setup requests header for CTF platform +HEADER = {"content-type": "application/json",'Authorization':'Token {}'.format(CTF_TOKEN)} + +### ------------------ + +### ------------------ +# DB Functions + +def dbCheck(): + if os.path.isfile('state.db'): + logger.debug('DB file exists') + else: + logger.debug('Creating empty DB file') + conn = None + try: + conn = sqlite3.connect('state.db') + conn.execute('CREATE TABLE IF NOT EXISTS ctfState (Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, url text, challengeID integer, noteID text, hash text, PRIMARY KEY (url, challengeID, hash))') + conn.commit() + except Error as e: + logger.info(e) + finally: + if conn: + conn.close() + +# Returns challenge ID's from the database as a list +def getStoredID(): + conn = None + try: + conn = sqlite3.connect('state.db') + cursor = conn.execute("select challengeID from ctfState where url = '{}'".format(CTF_URL)) + return [r[0] for r in cursor.fetchall()] + except Error as e: + logger.info(e) + finally: + if conn: + conn.close() + +def getStoredHash(id): + conn = None + try: + conn = sqlite3.connect('state.db') + sql = "select hash from ctfState where url = '{}' and challengeID = '{}'".format(CTF_URL, id) + cursor = conn.execute(sql) + # Return back zero if no hash is present + res = cursor.fetchone() + if res is not None: + # We get back a tuple from the db + storedHash = res[0] + else: + storedHash = 0 + return storedHash + except Error as e: + logger.info(e) + finally: + if conn: + conn.close() + +def getNoteID(dbHash): + conn = None + try: + conn = sqlite3.connect('state.db') + sql = "select noteID from ctfState where url = '{}' and hash = '{}'".format(CTF_URL, dbHash) + cursor = conn.execute(sql) + # We get back a tuple from the db + noteID = cursor.fetchone()[0] + return noteID + except Error as e: + logger.info(e) + finally: + if conn: + conn.close() + + +def updateDB(challengeID, noteID, hash): + conn = None + try: + conn = sqlite3.connect('state.db') + sql = "insert into ctfState (url, challengeId, noteID, hash) values ('{}', '{}', '{}', '{}')".format(CTF_URL, challengeID, noteID, hash) + logger.debug(sql) + cur = conn.cursor() + cur.execute(sql) + conn.commit() + except Error as e: + logger.info(e) + finally: + if conn: + conn.close() + +### ------------------ + +### ------------------ +# Misc functions + +# Converts 2021-11-18T19:00:24+00:00 to epoch milliseconds (1637262024000.0) +def toEpoch(dt): + epoch = datetime.utcfromtimestamp(0).replace(tzinfo=fromZone) + newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone) + return (newDT - epoch).total_seconds() * 1000.0 + +# Converts 2021-11-18T19:00:24+00:00 to November 18, 2021 2:00 PM +def toLocal(dt): + newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone) + return newDT.strftime("%B %d, %Y %-I:%M %p") + +def getChallenges(): + r = requests.get("{}/api/v1/challenges".format(CTF_URL)) + if r.status_code == 200: + allChallenges = json.loads(r.text) + logger.debug(allChallenges['data']) + return allChallenges + else: + logger.info("API returned a status code of {}.".format(r.status_code)) + +# Returns a list of solved challenge ID's +def getSolvedChallenges(): + r = requests.get("{}/api/v1/teams/me/solves".format(CTF_URL), headers=HEADER) + if r.status_code == 200: + solvedChallenges = json.loads(r.text) + logger.debug(solvedChallenges['data']) + solvedChallenges = [i['challenge_id'] for i in solvedChallenges['data']] + return solvedChallenges + else: + logger.info("API returned a status code of {}.".format(r.status_code)) + +## Loop vs list compreshension +## >>> for id in range(len(solvedChallenges['data'])): +## ... print(solvedChallenges['data'][id]['challenge_id']) +## ... +## +## [i['challenge_id'] for i in solvedChallenges['data']] +# https://stackoverflow.com/questions/39189272/python-list-comprehension-and-json-parsing + + +def getChallengeDetails(challengeID): + r = requests.get("{}/api/v1/challenges/{}".format(CTF_URL, challengeID)) + if r.status_code == 200: + challengeDetails = json.loads(r.text) + logger.debug(challengeDetails['data']) + return challengeDetails + else: + logger.info('Could not get challenge details for {}'.format(challengeID)) + + +def publishNote(challengeDetails): + # Format note + name = challengeDetails['data']['name'] + category = challengeDetails['data']['category'].capitalize() + # Some CTF's use decaying point values for challenges + #points = challengeDetails['data']['value'] + points = challengeDetails['data']['initial'] + link = challengeDetails['data']['connection_info'] + noteBody = "# Challenge Details" + noteBody += "\n\nValue: {} points".format(points) + noteBody += "\nCategory: {}".format(category) + noteBody += "\nDescription: {}".format(challengeDetails['data']['description']) + if link: + noteBody += "\nLink: {}".format(link) + noteBody += "\n\n# Solution" + noteDetails = { "title": name, "parent_id": NOTEBOOK_ID, "body": noteBody} + # Push to api + r = requests.post("http://localhost:41184/notes?token={}".format(JOPLIN_TOKEN), json = noteDetails) + if r.status_code == 200: + noteID = r.json()['id'] + logger.debug("Details of new note are: {}".format(noteDetails)) + return noteID + else: + logger.info('Could not create note in Joplin') + +def tagSolvedChallenge(noteID): + data = {"id": noteID} + r = requests.post("http://localhost:41184/tags/{}/notes?token={}".format(SOLVED_ID, JOPLIN_TOKEN), json = data) + +### ------------------ + + + +def main(): + dbCheck() + + allChallenges = getChallenges() + solvedChallenges = getSolvedChallenges() + + # List of challenge ID's already processed + #knownIDList = getStoredID(CTF_URL) + #logger.debug("knownIDList from db is {}".format(knownIDList)) + # Unused, can be removed/refactored + + for i in range(len(allChallenges['data'])): + nextID = allChallenges['data'][i]['id'] + challengeDetails = getChallengeDetails(nextID) + # Hash the description to detect if it changes + apiHash = hashlib.sha256(str(challengeDetails['data']['description']).encode('utf-8')).hexdigest() + logger.debug("Hash of challenge from api is: {}".format(apiHash)) + + # Check if the current loop iteration needs to write state + dbHash = getStoredHash(nextID) + logger.debug("DB hash is {}".format(dbHash)) + + if dbHash == 0: + noteID = publishNote(challengeDetails) + updateDB( nextID, noteID, apiHash) + elif dbHash != apiHash: + logger.info('Challenge details have changed on the remote end') + else: + logger.info('Challenge details match what is currently stored.') + # TODO: Determine if this is the best place to add tags + if nextID in solvedChallenges: + logger.debug("Tagging challenge id {} as solved.".format(nextID)) + noteID = getNoteID(dbHash) + tagSolvedChallenge(noteID) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..218647c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +certifi==2023.7.22 +charset-normalizer==3.3.1 +idna==3.4 +python-dateutil==2.8.2 +python-dotenv==1.0.0 +requests==2.31.0 +six==1.16.0 +urllib3==2.0.7