import os import logging import argparse from dotenv import load_dotenv import requests import json import sqlite3 import hashlib from sqlite3 import Error from datetime import datetime from dateutil import tz ### ------------------ # Vars and inital setup # Hard-code from/to timezones fromZone = tz.gettz('UTC') toZone = tz.gettz('America/New_York') # These are currently not used # load_dotenv will look for a .env file and if it finds one it will load the environment variables from it # Ex: TOKEN=123 load_dotenv() JOPLIN_TOKEN = os.getenv("JOPLIN_TOKEN") NOTEBOOK_ID = os.getenv("NOTEBOOK_ID") SOLVED_ID = os.getenv("SOLVED_ID") CTF_TOKEN = os.getenv("CTF_TOKEN") CTF_URL = os.getenv("CTF_URL") # Argparse parser = argparse.ArgumentParser(prog=__file__, description='Collects CTF Challenge details and stores them in Joplin') parser.add_argument('-v', '--verbose', action='store_true') args = parser.parse_args() # Configure logger logging.basicConfig(filename="output.log", format='%(asctime)s %(message)s', filemode='a') logger=logging.getLogger() if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # Setup requests header for CTF platform HEADER = {"content-type": "application/json",'Authorization':'Token {}'.format(CTF_TOKEN)} ### ------------------ ### ------------------ # DB Functions def dbCheck(): if os.path.isfile('state.db'): logger.debug('DB file exists') else: logger.debug('Creating empty DB file') conn = None try: conn = sqlite3.connect('state.db') conn.execute('CREATE TABLE IF NOT EXISTS ctfState (Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, url text, challengeID integer, noteID text, hash text, PRIMARY KEY (url, challengeID, hash))') conn.commit() except Error as e: logger.info(e) finally: if conn: conn.close() # Returns challenge ID's from the database as a list def getStoredID(): conn = None try: conn = sqlite3.connect('state.db') cursor = conn.execute("select challengeID from ctfState where url = '{}'".format(CTF_URL)) return [r[0] for r in cursor.fetchall()] except Error as e: logger.info(e) finally: if conn: conn.close() def getStoredHash(id): conn = None try: conn = sqlite3.connect('state.db') sql = "select hash from ctfState where url = '{}' and challengeID = '{}'".format(CTF_URL, id) cursor = conn.execute(sql) # Return back zero if no hash is present res = cursor.fetchone() if res is not None: # We get back a tuple from the db storedHash = res[0] else: storedHash = 0 return storedHash except Error as e: logger.info(e) finally: if conn: conn.close() def getNoteID(dbHash): conn = None try: conn = sqlite3.connect('state.db') sql = "select noteID from ctfState where url = '{}' and hash = '{}'".format(CTF_URL, dbHash) cursor = conn.execute(sql) # We get back a tuple from the db noteID = cursor.fetchone()[0] return noteID except Error as e: logger.info(e) finally: if conn: conn.close() def updateDB(challengeID, noteID, hash): conn = None try: conn = sqlite3.connect('state.db') sql = "insert into ctfState (url, challengeId, noteID, hash) values ('{}', '{}', '{}', '{}')".format(CTF_URL, challengeID, noteID, hash) logger.debug(sql) cur = conn.cursor() cur.execute(sql) conn.commit() except Error as e: logger.info(e) finally: if conn: conn.close() ### ------------------ ### ------------------ # Misc functions # Converts 2021-11-18T19:00:24+00:00 to epoch milliseconds (1637262024000.0) def toEpoch(dt): epoch = datetime.utcfromtimestamp(0).replace(tzinfo=fromZone) newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone) return (newDT - epoch).total_seconds() * 1000.0 # Converts 2021-11-18T19:00:24+00:00 to November 18, 2021 2:00 PM def toLocal(dt): newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone) return newDT.strftime("%B %d, %Y %-I:%M %p") def getChallenges(): r = requests.get("{}/api/v1/challenges".format(CTF_URL)) if r.status_code == 200: allChallenges = json.loads(r.text) logger.debug(allChallenges['data']) return allChallenges else: logger.info("API returned a status code of {}.".format(r.status_code)) # Returns a list of solved challenge ID's def getSolvedChallenges(): r = requests.get("{}/api/v1/teams/me/solves".format(CTF_URL), headers=HEADER) if r.status_code == 200: solvedChallenges = json.loads(r.text) logger.debug(solvedChallenges['data']) solvedChallenges = [i['challenge_id'] for i in solvedChallenges['data']] return solvedChallenges else: logger.info("API returned a status code of {}.".format(r.status_code)) ## Loop vs list compreshension ## >>> for id in range(len(solvedChallenges['data'])): ## ... print(solvedChallenges['data'][id]['challenge_id']) ## ... ## ## [i['challenge_id'] for i in solvedChallenges['data']] # https://stackoverflow.com/questions/39189272/python-list-comprehension-and-json-parsing def getChallengeDetails(challengeID): r = requests.get("{}/api/v1/challenges/{}".format(CTF_URL, challengeID)) if r.status_code == 200: challengeDetails = json.loads(r.text) logger.debug(challengeDetails['data']) return challengeDetails else: logger.info('Could not get challenge details for {}'.format(challengeID)) def publishNote(challengeDetails): # Format note name = challengeDetails['data']['name'] category = challengeDetails['data']['category'].capitalize() # Some CTF's use decaying point values for challenges #points = challengeDetails['data']['value'] points = challengeDetails['data']['initial'] link = challengeDetails['data']['connection_info'] noteBody = "# Challenge Details" noteBody += "\n\nValue: {} points".format(points) noteBody += "\nCategory: {}".format(category) noteBody += "\nDescription: {}".format(challengeDetails['data']['description']) if link: noteBody += "\nLink: {}".format(link) noteBody += "\n\n# Solution" noteDetails = { "title": name, "parent_id": NOTEBOOK_ID, "body": noteBody} # Push to api r = requests.post("http://localhost:41184/notes?token={}".format(JOPLIN_TOKEN), json = noteDetails) if r.status_code == 200: noteID = r.json()['id'] logger.debug("Details of new note are: {}".format(noteDetails)) return noteID else: logger.info('Could not create note in Joplin') def tagSolvedChallenge(noteID): data = {"id": noteID} r = requests.post("http://localhost:41184/tags/{}/notes?token={}".format(SOLVED_ID, JOPLIN_TOKEN), json = data) ### ------------------ def main(): dbCheck() allChallenges = getChallenges() solvedChallenges = getSolvedChallenges() # List of challenge ID's already processed #knownIDList = getStoredID(CTF_URL) #logger.debug("knownIDList from db is {}".format(knownIDList)) # Unused, can be removed/refactored for i in range(len(allChallenges['data'])): nextID = allChallenges['data'][i]['id'] challengeDetails = getChallengeDetails(nextID) # Hash the description to detect if it changes apiHash = hashlib.sha256(str(challengeDetails['data']['description']).encode('utf-8')).hexdigest() logger.debug("Hash of challenge from api is: {}".format(apiHash)) # Check if the current loop iteration needs to write state dbHash = getStoredHash(nextID) logger.debug("DB hash is {}".format(dbHash)) if dbHash == 0: noteID = publishNote(challengeDetails) updateDB( nextID, noteID, apiHash) elif dbHash != apiHash: logger.info('Challenge details have changed on the remote end') else: logger.info('Challenge details match what is currently stored.') # TODO: Determine if this is the best place to add tags if nextID in solvedChallenges: logger.debug("Tagging challenge id {} as solved.".format(nextID)) noteID = getNoteID(dbHash) tagSolvedChallenge(noteID) if __name__ == "__main__": main()