ctfChallenges/collectChallenges.py

258 lines
8.5 KiB
Python
Raw Permalink Normal View History

2023-10-30 11:54:47 +00:00
import os
import logging
import argparse
from dotenv import load_dotenv
import requests
import json
import sqlite3
import hashlib
from sqlite3 import Error
from datetime import datetime
from dateutil import tz
### ------------------
# Vars and inital setup
# Hard-code from/to timezones
fromZone = tz.gettz('UTC')
toZone = tz.gettz('America/New_York')
# These are currently not used
# load_dotenv will look for a .env file and if it finds one it will load the environment variables from it
# Ex: TOKEN=123
load_dotenv()
JOPLIN_TOKEN = os.getenv("JOPLIN_TOKEN")
NOTEBOOK_ID = os.getenv("NOTEBOOK_ID")
SOLVED_ID = os.getenv("SOLVED_ID")
CTF_TOKEN = os.getenv("CTF_TOKEN")
CTF_URL = os.getenv("CTF_URL")
# Argparse
parser = argparse.ArgumentParser(prog=__file__, description='Collects CTF Challenge details and stores them in Joplin')
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
# Configure logger
logging.basicConfig(filename="output.log", format='%(asctime)s %(message)s', filemode='a')
logger=logging.getLogger()
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# Setup requests header for CTF platform
HEADER = {"content-type": "application/json",'Authorization':'Token {}'.format(CTF_TOKEN)}
### ------------------
### ------------------
# DB Functions
def dbCheck():
if os.path.isfile('state.db'):
logger.debug('DB file exists')
else:
logger.debug('Creating empty DB file')
conn = None
try:
conn = sqlite3.connect('state.db')
conn.execute('CREATE TABLE IF NOT EXISTS ctfState (Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, url text, challengeID integer, noteID text, hash text, PRIMARY KEY (url, challengeID, hash))')
conn.commit()
except Error as e:
logger.info(e)
finally:
if conn:
conn.close()
# Returns challenge ID's from the database as a list
def getStoredID():
conn = None
try:
conn = sqlite3.connect('state.db')
cursor = conn.execute("select challengeID from ctfState where url = '{}'".format(CTF_URL))
return [r[0] for r in cursor.fetchall()]
except Error as e:
logger.info(e)
finally:
if conn:
conn.close()
def getStoredHash(id):
conn = None
try:
conn = sqlite3.connect('state.db')
sql = "select hash from ctfState where url = '{}' and challengeID = '{}'".format(CTF_URL, id)
cursor = conn.execute(sql)
# Return back zero if no hash is present
res = cursor.fetchone()
if res is not None:
# We get back a tuple from the db
storedHash = res[0]
else:
storedHash = 0
return storedHash
except Error as e:
logger.info(e)
finally:
if conn:
conn.close()
def getNoteID(dbHash):
conn = None
try:
conn = sqlite3.connect('state.db')
sql = "select noteID from ctfState where url = '{}' and hash = '{}'".format(CTF_URL, dbHash)
cursor = conn.execute(sql)
# We get back a tuple from the db
noteID = cursor.fetchone()[0]
return noteID
except Error as e:
logger.info(e)
finally:
if conn:
conn.close()
def updateDB(challengeID, noteID, hash):
conn = None
try:
conn = sqlite3.connect('state.db')
sql = "insert into ctfState (url, challengeId, noteID, hash) values ('{}', '{}', '{}', '{}')".format(CTF_URL, challengeID, noteID, hash)
logger.debug(sql)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
except Error as e:
logger.info(e)
finally:
if conn:
conn.close()
### ------------------
### ------------------
# Misc functions
# Converts 2021-11-18T19:00:24+00:00 to epoch milliseconds (1637262024000.0)
def toEpoch(dt):
epoch = datetime.utcfromtimestamp(0).replace(tzinfo=fromZone)
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
return (newDT - epoch).total_seconds() * 1000.0
# Converts 2021-11-18T19:00:24+00:00 to November 18, 2021 2:00 PM
def toLocal(dt):
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
return newDT.strftime("%B %d, %Y %-I:%M %p")
def getChallenges():
r = requests.get("{}/api/v1/challenges".format(CTF_URL))
if r.status_code == 200:
allChallenges = json.loads(r.text)
logger.debug(allChallenges['data'])
return allChallenges
else:
logger.info("API returned a status code of {}.".format(r.status_code))
# Returns a list of solved challenge ID's
def getSolvedChallenges():
r = requests.get("{}/api/v1/teams/me/solves".format(CTF_URL), headers=HEADER)
if r.status_code == 200:
solvedChallenges = json.loads(r.text)
logger.debug(solvedChallenges['data'])
solvedChallenges = [i['challenge_id'] for i in solvedChallenges['data']]
return solvedChallenges
else:
logger.info("API returned a status code of {}.".format(r.status_code))
## Loop vs list compreshension
## >>> for id in range(len(solvedChallenges['data'])):
## ... print(solvedChallenges['data'][id]['challenge_id'])
## ...
##
## [i['challenge_id'] for i in solvedChallenges['data']]
# https://stackoverflow.com/questions/39189272/python-list-comprehension-and-json-parsing
def getChallengeDetails(challengeID):
r = requests.get("{}/api/v1/challenges/{}".format(CTF_URL, challengeID))
if r.status_code == 200:
challengeDetails = json.loads(r.text)
logger.debug(challengeDetails['data'])
return challengeDetails
else:
logger.info('Could not get challenge details for {}'.format(challengeID))
def publishNote(challengeDetails):
# Format note
name = challengeDetails['data']['name']
category = challengeDetails['data']['category'].capitalize()
# Some CTF's use decaying point values for challenges
#points = challengeDetails['data']['value']
points = challengeDetails['data']['initial']
link = challengeDetails['data']['connection_info']
noteBody = "# Challenge Details"
noteBody += "\n\nValue: {} points".format(points)
noteBody += "\nCategory: {}".format(category)
noteBody += "\nDescription: {}".format(challengeDetails['data']['description'])
if link:
noteBody += "\nLink: {}".format(link)
noteBody += "\n\n# Solution"
noteDetails = { "title": name, "parent_id": NOTEBOOK_ID, "body": noteBody}
# Push to api
r = requests.post("http://localhost:41184/notes?token={}".format(JOPLIN_TOKEN), json = noteDetails)
if r.status_code == 200:
noteID = r.json()['id']
logger.debug("Details of new note are: {}".format(noteDetails))
return noteID
else:
logger.info('Could not create note in Joplin')
def tagSolvedChallenge(noteID):
data = {"id": noteID}
r = requests.post("http://localhost:41184/tags/{}/notes?token={}".format(SOLVED_ID, JOPLIN_TOKEN), json = data)
### ------------------
def main():
dbCheck()
allChallenges = getChallenges()
solvedChallenges = getSolvedChallenges()
# List of challenge ID's already processed
#knownIDList = getStoredID(CTF_URL)
#logger.debug("knownIDList from db is {}".format(knownIDList))
# Unused, can be removed/refactored
for i in range(len(allChallenges['data'])):
nextID = allChallenges['data'][i]['id']
challengeDetails = getChallengeDetails(nextID)
# Hash the description to detect if it changes
apiHash = hashlib.sha256(str(challengeDetails['data']['description']).encode('utf-8')).hexdigest()
logger.debug("Hash of challenge from api is: {}".format(apiHash))
# Check if the current loop iteration needs to write state
dbHash = getStoredHash(nextID)
logger.debug("DB hash is {}".format(dbHash))
if dbHash == 0:
noteID = publishNote(challengeDetails)
updateDB( nextID, noteID, apiHash)
elif dbHash != apiHash:
logger.info('Challenge details have changed on the remote end')
else:
logger.info('Challenge details match what is currently stored.')
# TODO: Determine if this is the best place to add tags
if nextID in solvedChallenges:
logger.debug("Tagging challenge id {} as solved.".format(nextID))
noteID = getNoteID(dbHash)
tagSolvedChallenge(noteID)
if __name__ == "__main__":
main()