258 lines
8.5 KiB
Python
258 lines
8.5 KiB
Python
import os
|
|
import logging
|
|
import argparse
|
|
from dotenv import load_dotenv
|
|
import requests
|
|
import json
|
|
import sqlite3
|
|
import hashlib
|
|
from sqlite3 import Error
|
|
from datetime import datetime
|
|
from dateutil import tz
|
|
|
|
|
|
### ------------------
|
|
# Vars and inital setup
|
|
|
|
# Hard-code from/to timezones
|
|
fromZone = tz.gettz('UTC')
|
|
toZone = tz.gettz('America/New_York')
|
|
# These are currently not used
|
|
|
|
# load_dotenv will look for a .env file and if it finds one it will load the environment variables from it
|
|
# Ex: TOKEN=123
|
|
load_dotenv()
|
|
JOPLIN_TOKEN = os.getenv("JOPLIN_TOKEN")
|
|
NOTEBOOK_ID = os.getenv("NOTEBOOK_ID")
|
|
SOLVED_ID = os.getenv("SOLVED_ID")
|
|
CTF_TOKEN = os.getenv("CTF_TOKEN")
|
|
CTF_URL = os.getenv("CTF_URL")
|
|
|
|
# Argparse
|
|
parser = argparse.ArgumentParser(prog=__file__, description='Collects CTF Challenge details and stores them in Joplin')
|
|
parser.add_argument('-v', '--verbose', action='store_true')
|
|
args = parser.parse_args()
|
|
|
|
# Configure logger
|
|
logging.basicConfig(filename="output.log", format='%(asctime)s %(message)s', filemode='a')
|
|
logger=logging.getLogger()
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
else:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Setup requests header for CTF platform
|
|
HEADER = {"content-type": "application/json",'Authorization':'Token {}'.format(CTF_TOKEN)}
|
|
|
|
### ------------------
|
|
|
|
### ------------------
|
|
# DB Functions
|
|
|
|
def dbCheck():
|
|
if os.path.isfile('state.db'):
|
|
logger.debug('DB file exists')
|
|
else:
|
|
logger.debug('Creating empty DB file')
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect('state.db')
|
|
conn.execute('CREATE TABLE IF NOT EXISTS ctfState (Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, url text, challengeID integer, noteID text, hash text, PRIMARY KEY (url, challengeID, hash))')
|
|
conn.commit()
|
|
except Error as e:
|
|
logger.info(e)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
# Returns challenge ID's from the database as a list
|
|
def getStoredID():
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect('state.db')
|
|
cursor = conn.execute("select challengeID from ctfState where url = '{}'".format(CTF_URL))
|
|
return [r[0] for r in cursor.fetchall()]
|
|
except Error as e:
|
|
logger.info(e)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def getStoredHash(id):
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect('state.db')
|
|
sql = "select hash from ctfState where url = '{}' and challengeID = '{}'".format(CTF_URL, id)
|
|
cursor = conn.execute(sql)
|
|
# Return back zero if no hash is present
|
|
res = cursor.fetchone()
|
|
if res is not None:
|
|
# We get back a tuple from the db
|
|
storedHash = res[0]
|
|
else:
|
|
storedHash = 0
|
|
return storedHash
|
|
except Error as e:
|
|
logger.info(e)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def getNoteID(dbHash):
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect('state.db')
|
|
sql = "select noteID from ctfState where url = '{}' and hash = '{}'".format(CTF_URL, dbHash)
|
|
cursor = conn.execute(sql)
|
|
# We get back a tuple from the db
|
|
noteID = cursor.fetchone()[0]
|
|
return noteID
|
|
except Error as e:
|
|
logger.info(e)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
def updateDB(challengeID, noteID, hash):
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect('state.db')
|
|
sql = "insert into ctfState (url, challengeId, noteID, hash) values ('{}', '{}', '{}', '{}')".format(CTF_URL, challengeID, noteID, hash)
|
|
logger.debug(sql)
|
|
cur = conn.cursor()
|
|
cur.execute(sql)
|
|
conn.commit()
|
|
except Error as e:
|
|
logger.info(e)
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
### ------------------
|
|
|
|
### ------------------
|
|
# Misc functions
|
|
|
|
# Converts 2021-11-18T19:00:24+00:00 to epoch milliseconds (1637262024000.0)
|
|
def toEpoch(dt):
|
|
epoch = datetime.utcfromtimestamp(0).replace(tzinfo=fromZone)
|
|
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
|
|
return (newDT - epoch).total_seconds() * 1000.0
|
|
|
|
# Converts 2021-11-18T19:00:24+00:00 to November 18, 2021 2:00 PM
|
|
def toLocal(dt):
|
|
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
|
|
return newDT.strftime("%B %d, %Y %-I:%M %p")
|
|
|
|
def getChallenges():
|
|
r = requests.get("{}/api/v1/challenges".format(CTF_URL))
|
|
if r.status_code == 200:
|
|
allChallenges = json.loads(r.text)
|
|
logger.debug(allChallenges['data'])
|
|
return allChallenges
|
|
else:
|
|
logger.info("API returned a status code of {}.".format(r.status_code))
|
|
|
|
# Returns a list of solved challenge ID's
|
|
def getSolvedChallenges():
|
|
r = requests.get("{}/api/v1/teams/me/solves".format(CTF_URL), headers=HEADER)
|
|
if r.status_code == 200:
|
|
solvedChallenges = json.loads(r.text)
|
|
logger.debug(solvedChallenges['data'])
|
|
solvedChallenges = [i['challenge_id'] for i in solvedChallenges['data']]
|
|
return solvedChallenges
|
|
else:
|
|
logger.info("API returned a status code of {}.".format(r.status_code))
|
|
|
|
## Loop vs list compreshension
|
|
## >>> for id in range(len(solvedChallenges['data'])):
|
|
## ... print(solvedChallenges['data'][id]['challenge_id'])
|
|
## ...
|
|
##
|
|
## [i['challenge_id'] for i in solvedChallenges['data']]
|
|
# https://stackoverflow.com/questions/39189272/python-list-comprehension-and-json-parsing
|
|
|
|
|
|
def getChallengeDetails(challengeID):
|
|
r = requests.get("{}/api/v1/challenges/{}".format(CTF_URL, challengeID))
|
|
if r.status_code == 200:
|
|
challengeDetails = json.loads(r.text)
|
|
logger.debug(challengeDetails['data'])
|
|
return challengeDetails
|
|
else:
|
|
logger.info('Could not get challenge details for {}'.format(challengeID))
|
|
|
|
|
|
def publishNote(challengeDetails):
|
|
# Format note
|
|
name = challengeDetails['data']['name']
|
|
category = challengeDetails['data']['category'].capitalize()
|
|
# Some CTF's use decaying point values for challenges
|
|
#points = challengeDetails['data']['value']
|
|
points = challengeDetails['data']['initial']
|
|
link = challengeDetails['data']['connection_info']
|
|
noteBody = "# Challenge Details"
|
|
noteBody += "\n\nValue: {} points".format(points)
|
|
noteBody += "\nCategory: {}".format(category)
|
|
noteBody += "\nDescription: {}".format(challengeDetails['data']['description'])
|
|
if link:
|
|
noteBody += "\nLink: {}".format(link)
|
|
noteBody += "\n\n# Solution"
|
|
noteDetails = { "title": name, "parent_id": NOTEBOOK_ID, "body": noteBody}
|
|
# Push to api
|
|
r = requests.post("http://localhost:41184/notes?token={}".format(JOPLIN_TOKEN), json = noteDetails)
|
|
if r.status_code == 200:
|
|
noteID = r.json()['id']
|
|
logger.debug("Details of new note are: {}".format(noteDetails))
|
|
return noteID
|
|
else:
|
|
logger.info('Could not create note in Joplin')
|
|
|
|
def tagSolvedChallenge(noteID):
|
|
data = {"id": noteID}
|
|
r = requests.post("http://localhost:41184/tags/{}/notes?token={}".format(SOLVED_ID, JOPLIN_TOKEN), json = data)
|
|
|
|
### ------------------
|
|
|
|
|
|
|
|
def main():
|
|
dbCheck()
|
|
|
|
allChallenges = getChallenges()
|
|
solvedChallenges = getSolvedChallenges()
|
|
|
|
# List of challenge ID's already processed
|
|
#knownIDList = getStoredID(CTF_URL)
|
|
#logger.debug("knownIDList from db is {}".format(knownIDList))
|
|
# Unused, can be removed/refactored
|
|
|
|
for i in range(len(allChallenges['data'])):
|
|
nextID = allChallenges['data'][i]['id']
|
|
challengeDetails = getChallengeDetails(nextID)
|
|
# Hash the description to detect if it changes
|
|
apiHash = hashlib.sha256(str(challengeDetails['data']['description']).encode('utf-8')).hexdigest()
|
|
logger.debug("Hash of challenge from api is: {}".format(apiHash))
|
|
|
|
# Check if the current loop iteration needs to write state
|
|
dbHash = getStoredHash(nextID)
|
|
logger.debug("DB hash is {}".format(dbHash))
|
|
|
|
if dbHash == 0:
|
|
noteID = publishNote(challengeDetails)
|
|
updateDB( nextID, noteID, apiHash)
|
|
elif dbHash != apiHash:
|
|
logger.info('Challenge details have changed on the remote end')
|
|
else:
|
|
logger.info('Challenge details match what is currently stored.')
|
|
# TODO: Determine if this is the best place to add tags
|
|
if nextID in solvedChallenges:
|
|
logger.debug("Tagging challenge id {} as solved.".format(nextID))
|
|
noteID = getNoteID(dbHash)
|
|
tagSolvedChallenge(noteID)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|