Initial save
This commit is contained in:
commit
99a9f9b57e
11
.gitignore
vendored
Normal file
11
.gitignore
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
__pycache__/
|
||||
bin/
|
||||
include/
|
||||
lib/
|
||||
.vscode/
|
||||
lib64
|
||||
share/
|
||||
pyvenv.cfg
|
||||
output.log
|
||||
.env
|
||||
state.db
|
257
collectChallenges.py
Normal file
257
collectChallenges.py
Normal file
@ -0,0 +1,257 @@
|
||||
import os
|
||||
import logging
|
||||
import argparse
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
import json
|
||||
import sqlite3
|
||||
import hashlib
|
||||
from sqlite3 import Error
|
||||
from datetime import datetime
|
||||
from dateutil import tz
|
||||
|
||||
|
||||
### ------------------
|
||||
# Vars and inital setup
|
||||
|
||||
# Hard-code from/to timezones
|
||||
fromZone = tz.gettz('UTC')
|
||||
toZone = tz.gettz('America/New_York')
|
||||
# These are currently not used
|
||||
|
||||
# load_dotenv will look for a .env file and if it finds one it will load the environment variables from it
|
||||
# Ex: TOKEN=123
|
||||
load_dotenv()
|
||||
JOPLIN_TOKEN = os.getenv("JOPLIN_TOKEN")
|
||||
NOTEBOOK_ID = os.getenv("NOTEBOOK_ID")
|
||||
SOLVED_ID = os.getenv("SOLVED_ID")
|
||||
CTF_TOKEN = os.getenv("CTF_TOKEN")
|
||||
CTF_URL = os.getenv("CTF_URL")
|
||||
|
||||
# Argparse
|
||||
parser = argparse.ArgumentParser(prog=__file__, description='Collects CTF Challenge details and stores them in Joplin')
|
||||
parser.add_argument('-v', '--verbose', action='store_true')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Configure logger
|
||||
logging.basicConfig(filename="output.log", format='%(asctime)s %(message)s', filemode='a')
|
||||
logger=logging.getLogger()
|
||||
if args.verbose:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Setup requests header for CTF platform
|
||||
HEADER = {"content-type": "application/json",'Authorization':'Token {}'.format(CTF_TOKEN)}
|
||||
|
||||
### ------------------
|
||||
|
||||
### ------------------
|
||||
# DB Functions
|
||||
|
||||
def dbCheck():
|
||||
if os.path.isfile('state.db'):
|
||||
logger.debug('DB file exists')
|
||||
else:
|
||||
logger.debug('Creating empty DB file')
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect('state.db')
|
||||
conn.execute('CREATE TABLE IF NOT EXISTS ctfState (Timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, url text, challengeID integer, noteID text, hash text, PRIMARY KEY (url, challengeID, hash))')
|
||||
conn.commit()
|
||||
except Error as e:
|
||||
logger.info(e)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
# Returns challenge ID's from the database as a list
|
||||
def getStoredID():
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect('state.db')
|
||||
cursor = conn.execute("select challengeID from ctfState where url = '{}'".format(CTF_URL))
|
||||
return [r[0] for r in cursor.fetchall()]
|
||||
except Error as e:
|
||||
logger.info(e)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
def getStoredHash(id):
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect('state.db')
|
||||
sql = "select hash from ctfState where url = '{}' and challengeID = '{}'".format(CTF_URL, id)
|
||||
cursor = conn.execute(sql)
|
||||
# Return back zero if no hash is present
|
||||
res = cursor.fetchone()
|
||||
if res is not None:
|
||||
# We get back a tuple from the db
|
||||
storedHash = res[0]
|
||||
else:
|
||||
storedHash = 0
|
||||
return storedHash
|
||||
except Error as e:
|
||||
logger.info(e)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
def getNoteID(dbHash):
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect('state.db')
|
||||
sql = "select noteID from ctfState where url = '{}' and hash = '{}'".format(CTF_URL, dbHash)
|
||||
cursor = conn.execute(sql)
|
||||
# We get back a tuple from the db
|
||||
noteID = cursor.fetchone()[0]
|
||||
return noteID
|
||||
except Error as e:
|
||||
logger.info(e)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
def updateDB(challengeID, noteID, hash):
|
||||
conn = None
|
||||
try:
|
||||
conn = sqlite3.connect('state.db')
|
||||
sql = "insert into ctfState (url, challengeId, noteID, hash) values ('{}', '{}', '{}', '{}')".format(CTF_URL, challengeID, noteID, hash)
|
||||
logger.debug(sql)
|
||||
cur = conn.cursor()
|
||||
cur.execute(sql)
|
||||
conn.commit()
|
||||
except Error as e:
|
||||
logger.info(e)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
### ------------------
|
||||
|
||||
### ------------------
|
||||
# Misc functions
|
||||
|
||||
# Converts 2021-11-18T19:00:24+00:00 to epoch milliseconds (1637262024000.0)
|
||||
def toEpoch(dt):
|
||||
epoch = datetime.utcfromtimestamp(0).replace(tzinfo=fromZone)
|
||||
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
|
||||
return (newDT - epoch).total_seconds() * 1000.0
|
||||
|
||||
# Converts 2021-11-18T19:00:24+00:00 to November 18, 2021 2:00 PM
|
||||
def toLocal(dt):
|
||||
newDT = datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S+00:00').replace(tzinfo=fromZone).astimezone(toZone)
|
||||
return newDT.strftime("%B %d, %Y %-I:%M %p")
|
||||
|
||||
def getChallenges():
|
||||
r = requests.get("{}/api/v1/challenges".format(CTF_URL))
|
||||
if r.status_code == 200:
|
||||
allChallenges = json.loads(r.text)
|
||||
logger.debug(allChallenges['data'])
|
||||
return allChallenges
|
||||
else:
|
||||
logger.info("API returned a status code of {}.".format(r.status_code))
|
||||
|
||||
# Returns a list of solved challenge ID's
|
||||
def getSolvedChallenges():
|
||||
r = requests.get("{}/api/v1/teams/me/solves".format(CTF_URL), headers=HEADER)
|
||||
if r.status_code == 200:
|
||||
solvedChallenges = json.loads(r.text)
|
||||
logger.debug(solvedChallenges['data'])
|
||||
solvedChallenges = [i['challenge_id'] for i in solvedChallenges['data']]
|
||||
return solvedChallenges
|
||||
else:
|
||||
logger.info("API returned a status code of {}.".format(r.status_code))
|
||||
|
||||
## Loop vs list compreshension
|
||||
## >>> for id in range(len(solvedChallenges['data'])):
|
||||
## ... print(solvedChallenges['data'][id]['challenge_id'])
|
||||
## ...
|
||||
##
|
||||
## [i['challenge_id'] for i in solvedChallenges['data']]
|
||||
# https://stackoverflow.com/questions/39189272/python-list-comprehension-and-json-parsing
|
||||
|
||||
|
||||
def getChallengeDetails(challengeID):
|
||||
r = requests.get("{}/api/v1/challenges/{}".format(CTF_URL, challengeID))
|
||||
if r.status_code == 200:
|
||||
challengeDetails = json.loads(r.text)
|
||||
logger.debug(challengeDetails['data'])
|
||||
return challengeDetails
|
||||
else:
|
||||
logger.info('Could not get challenge details for {}'.format(challengeID))
|
||||
|
||||
|
||||
def publishNote(challengeDetails):
|
||||
# Format note
|
||||
name = challengeDetails['data']['name']
|
||||
category = challengeDetails['data']['category'].capitalize()
|
||||
# Some CTF's use decaying point values for challenges
|
||||
#points = challengeDetails['data']['value']
|
||||
points = challengeDetails['data']['initial']
|
||||
link = challengeDetails['data']['connection_info']
|
||||
noteBody = "# Challenge Details"
|
||||
noteBody += "\n\nValue: {} points".format(points)
|
||||
noteBody += "\nCategory: {}".format(category)
|
||||
noteBody += "\nDescription: {}".format(challengeDetails['data']['description'])
|
||||
if link:
|
||||
noteBody += "\nLink: {}".format(link)
|
||||
noteBody += "\n\n# Solution"
|
||||
noteDetails = { "title": name, "parent_id": NOTEBOOK_ID, "body": noteBody}
|
||||
# Push to api
|
||||
r = requests.post("http://localhost:41184/notes?token={}".format(JOPLIN_TOKEN), json = noteDetails)
|
||||
if r.status_code == 200:
|
||||
noteID = r.json()['id']
|
||||
logger.debug("Details of new note are: {}".format(noteDetails))
|
||||
return noteID
|
||||
else:
|
||||
logger.info('Could not create note in Joplin')
|
||||
|
||||
def tagSolvedChallenge(noteID):
|
||||
data = {"id": noteID}
|
||||
r = requests.post("http://localhost:41184/tags/{}/notes?token={}".format(SOLVED_ID, JOPLIN_TOKEN), json = data)
|
||||
|
||||
### ------------------
|
||||
|
||||
|
||||
|
||||
def main():
|
||||
dbCheck()
|
||||
|
||||
allChallenges = getChallenges()
|
||||
solvedChallenges = getSolvedChallenges()
|
||||
|
||||
# List of challenge ID's already processed
|
||||
#knownIDList = getStoredID(CTF_URL)
|
||||
#logger.debug("knownIDList from db is {}".format(knownIDList))
|
||||
# Unused, can be removed/refactored
|
||||
|
||||
for i in range(len(allChallenges['data'])):
|
||||
nextID = allChallenges['data'][i]['id']
|
||||
challengeDetails = getChallengeDetails(nextID)
|
||||
# Hash the description to detect if it changes
|
||||
apiHash = hashlib.sha256(str(challengeDetails['data']['description']).encode('utf-8')).hexdigest()
|
||||
logger.debug("Hash of challenge from api is: {}".format(apiHash))
|
||||
|
||||
# Check if the current loop iteration needs to write state
|
||||
dbHash = getStoredHash(nextID)
|
||||
logger.debug("DB hash is {}".format(dbHash))
|
||||
|
||||
if dbHash == 0:
|
||||
noteID = publishNote(challengeDetails)
|
||||
updateDB( nextID, noteID, apiHash)
|
||||
elif dbHash != apiHash:
|
||||
logger.info('Challenge details have changed on the remote end')
|
||||
else:
|
||||
logger.info('Challenge details match what is currently stored.')
|
||||
# TODO: Determine if this is the best place to add tags
|
||||
if nextID in solvedChallenges:
|
||||
logger.debug("Tagging challenge id {} as solved.".format(nextID))
|
||||
noteID = getNoteID(dbHash)
|
||||
tagSolvedChallenge(noteID)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
8
requirements.txt
Normal file
8
requirements.txt
Normal file
@ -0,0 +1,8 @@
|
||||
certifi==2023.7.22
|
||||
charset-normalizer==3.3.1
|
||||
idna==3.4
|
||||
python-dateutil==2.8.2
|
||||
python-dotenv==1.0.0
|
||||
requests==2.31.0
|
||||
six==1.16.0
|
||||
urllib3==2.0.7
|
Loading…
Reference in New Issue
Block a user