diff --git a/.gitignore b/.gitignore index f70d73c..c0f11a7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ build/ dist/ *.egg-info/ -.eggs/ \ No newline at end of file +.eggs/ +.pytest_cache/ +__pycache__/ diff --git a/Pipfile b/Pipfile deleted file mode 100644 index bf7f6e4..0000000 --- a/Pipfile +++ /dev/null @@ -1,13 +0,0 @@ -[[source]] -url = "https://pypi.org/simple" -verify_ssl = true -name = "pypi" - -[dev-packages] - -[packages] -chardet = "*" -pprp = "*" - -[requires] -python_version = "3.6" diff --git a/eac_logchecker.py b/eac_logchecker.py index 4a2c281..892208c 100755 --- a/eac_logchecker.py +++ b/eac_logchecker.py @@ -1,16 +1,27 @@ #!/usr/bin/env python3 import argparse -import contextlib import json from pathlib import Path import re import pprp CHECKSUM_MIN_VERSION = ('V1.0', 'beta', '1') +EAC_KEY = '9378716cf13e4265ae55338e940b376184da389e50647726b35f6f341ee3efd9' -def eac_checksum(text): +class Log: + def __init__(self, text): + self.text = text + self.unsigned_text = self.text + self.version = None + self.modified = False + self.old_checksum = None + self.checksum = None + + +def eac_checksum(log): + text = log.unsigned_text # Ignore newlines text = text.replace('\r', '').replace('\n', '') @@ -19,8 +30,9 @@ def eac_checksum(text): # Setup Rijndael-256 with a 256-bit blocksize cipher = pprp.crypto_3.rijndael( - # Probably SHA256('super secret password') but it doesn't actually matter - key=bytes.fromhex('9378716cf13e4265ae55338e940b376184da389e50647726b35f6f341ee3efd9'), + # Probably SHA256('super secret password') but it doesn't + # actually matter + key=bytes.fromhex(EAC_KEY), block_size=256 // 8 ) @@ -28,7 +40,7 @@ def eac_checksum(text): plaintext = text.encode('utf-16-le') # The IV is all zeroes so we don't have to handle it - signature = b'\x00' * 32 + checksum = b'\x00' * 32 # Process it block-by-block for i in range(0, len(plaintext), 32): @@ -36,39 +48,37 @@ def eac_checksum(text): plaintext_block = plaintext[i:i + 32].ljust(32, b'\x00') # CBC mode (XOR the previous ciphertext block into the plaintext) - cbc_plaintext = bytes(a ^ b for a, b in zip(signature, plaintext_block)) + cbc_plaintext = bytes( + a ^ b for a, b in zip(checksum, plaintext_block) + ) - # New signature is the ciphertext. - signature = cipher.encrypt(cbc_plaintext) + # New checksum is the ciphertext. + checksum = cipher.encrypt(cbc_plaintext) - # Textual signature is just the hex representation - return signature.hex().upper() + # Textual checksum is just the hex representation + log.checksum = checksum.hex().upper() -def extract_info(text): - if len(text) == 0: - return text, None, None +def extract_info(log): + if len(log.text) == 0: + return log - version = None - for line in text.splitlines(): + for line in log.text.splitlines(): if line.startswith('Exact Audio Copy'): - version = tuple(line.split()[3:6]) + log.version = tuple(line.split()[3:6]) elif re.match(r'[a-zA-Z]', line): break - match = re.search('\n\n==== (.*) ([A-Z0-9]+) ====', text) + match = re.search('\n\n==== (.*) ([A-Z0-9]+) ====', log.text) if match: - text, signature_parts = re.split('\n\n==== {}'.format(match.group(1)), text) - signature = signature_parts.split()[0].strip() - else: - signature = None - - return text, version, signature + search = '\n\n==== {}'.format(match.group(1)) + log.unsigned_text, checksum_parts = re.split(search, log.text) + log.old_checksum = checksum_parts.split()[0].strip() -def eac_verify(text): - unsigned_text, version, old_signature = extract_info(text) - return unsigned_text, version, old_signature, eac_checksum(unsigned_text) +def eac_verify(log): + extract_info(log) + eac_checksum(log) def get_logs(data): @@ -85,35 +95,64 @@ def get_logs(data): # Null bytes screw it up if '\x00' in text: text = text[:text.index('\x00')] - + # EAC crashes if there are more than 2^14 bytes in a line if any(len(l) + 1 > 2**13 for l in text.split('\n')): raise RuntimeError('EAC cannot handle lines longer than 2^13 chars') - return [x.strip() for x in re.split(r'[^-]-{60}[^-]', text)] + splits = re.split('(\n\n==== .* [A-Z0-9]+ ====)', text) + logs = [] + for split in splits: + if split.strip() != '': + logs.append(split) + + if len(logs) > 1: + length = len(logs) - 1 if len(logs) % 2 == 1 else len(logs) + return_logs = [] + for i in range(0, length, 2): + log = Log(logs[i] + logs[i+1]) + if i > 0: + (log.text, matches) = re.subn( + r'[^-]-{60}[^-]', + '', + log.text, + 1 + ) + if matches == 0: + log.modified = True + return_logs.append(log) + for i in range(length, len(logs)): + return_logs.append(Log(logs[i])) + else: + return_logs = [Log(logs[0])] + + return return_logs -def check_checksum(arg_file, arg_json): +def check_checksum(arg_file): if not isinstance(arg_file, Path): arg_file = Path(arg_file) - + output = [] if not arg_file.exists(): - if not arg_json: - print('Could not find logfile to examine.') + output.append({ + 'status': 'ERROR', + 'message': 'Could not find logfile to examine.' + }) + return output try: with arg_file.open('rb') as open_file: logs = get_logs(open_file.read()) for log in logs: - data, version, old_signature, actual_signature = eac_verify(log) + eac_verify(log) - if version is None or old_signature is None: + if log.version is None or log.old_checksum is None: message = 'Log entry has no checksum!' status = "NO" - elif old_signature != actual_signature: + elif log.modified or log.old_checksum != log.checksum: message = 'Log entry was modified, checksum incorrect!' status = "BAD" else: @@ -135,7 +174,9 @@ def check_checksum(arg_file, arg_json): def main(): - parser = argparse.ArgumentParser(description='Verifies and resigns EAC logs') + parser = argparse.ArgumentParser( + description='Verifies and resigns EAC logs' + ) parser.add_argument('--json', action='store_true', help='Output as JSON') parser.add_argument('file', type=Path, help='input log file') @@ -145,12 +186,13 @@ def main(): if not args.json: print('Log Integrity Checker (C) 2010 by Andre Wiethoff') print('') - output = check_checksum(args.file, args.json) + output = check_checksum(args.file) if args.json: print(json.dumps(output)) else: for i in range(len(output)): print('{:d}. {:s}'.format(i+1, output[i]['message'])) + if __name__ == '__main__': main() diff --git a/logs/27.log b/logs/27.log new file mode 100644 index 0000000..46cff3e Binary files /dev/null and b/logs/27.log differ diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..a788fc7 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[aliases] +test=pytest + +[tool:pytest] +addopts = --verbose \ No newline at end of file diff --git a/setup.py b/setup.py index c428424..ea6130a 100644 --- a/setup.py +++ b/setup.py @@ -82,6 +82,8 @@ setup( ] }, install_requires=['pprp'], + setup_requires=['pytest-runner'], + tests_require=['pytest'], license='MIT', classifiers=[ 'Development Status :: 4 - Beta', diff --git a/test.py b/test.py deleted file mode 100755 index d2ffee6..0000000 --- a/test.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 - -from pathlib import Path -import unittest - -import eac_logchecker - -TESTS = [ - (Path('logs/01.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/02.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/03.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/04.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/05.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/06.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/07.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/08.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]), - (Path('logs/09.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/10.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/11.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/12.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/13.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/14.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/15.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]), - (Path('logs/16.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]), - (Path('logs/17.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/18.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]), - (Path('logs/19.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/20.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/21.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/22.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/23.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/24.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/25.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry has no checksum!', 'status': 'NO'}]), - (Path('logs/26.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]) -] - -class TestLogchecker(unittest.TestCase): - def test_logs(self): - for log_file, expected in TESTS: - with self.subTest(log=str(log_file)): - actual = eac_logchecker.check_checksum(log_file, True) - self.assertEqual(expected, actual) - - -if __name__ == "__main__": - unittest.main() diff --git a/test_eac_logchecker.py b/test_eac_logchecker.py new file mode 100755 index 0000000..e6d91e2 --- /dev/null +++ b/test_eac_logchecker.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +from pathlib import Path +import pytest + +import eac_logchecker + +LOG_GOOD = {'message': 'Log entry is fine!', 'status': 'OK'} +LOG_NO = {'message': 'Log entry has no checksum!', 'status': 'NO'} +LOG_BAD = { + 'message': 'Log entry was modified, checksum incorrect!', + 'status': 'BAD' +} + + +@pytest.mark.parametrize("log_path, log_statuses", [ + (Path('logs/01.log'), [LOG_GOOD, LOG_GOOD]), + (Path('logs/02.log'), [LOG_GOOD]), + (Path('logs/03.log'), [LOG_NO]), + (Path('logs/04.log'), [LOG_GOOD, LOG_NO]), + (Path('logs/05.log'), [LOG_GOOD, LOG_GOOD]), + (Path('logs/06.log'), [LOG_NO]), + (Path('logs/07.log'), [LOG_NO]), + (Path('logs/08.log'), [LOG_BAD]), + (Path('logs/09.log'), [LOG_NO]), + (Path('logs/10.log'), [LOG_NO]), + (Path('logs/11.log'), [LOG_NO]), + (Path('logs/12.log'), [LOG_GOOD]), + (Path('logs/13.log'), [LOG_GOOD]), + (Path('logs/14.log'), [LOG_NO]), + (Path('logs/15.log'), [LOG_BAD]), + (Path('logs/16.log'), [LOG_BAD]), + (Path('logs/17.log'), [LOG_NO]), + (Path('logs/18.log'), [LOG_GOOD]), + (Path('logs/19.log'), [LOG_NO]), + (Path('logs/20.log'), [LOG_NO]), + (Path('logs/21.log'), [LOG_NO]), + (Path('logs/22.log'), [LOG_NO]), + (Path('logs/23.log'), [LOG_NO]), + (Path('logs/24.log'), [LOG_NO]), + (Path('logs/25.log'), [LOG_GOOD, LOG_NO]), + (Path('logs/26.log'), [LOG_BAD]), + (Path('logs/27.log'), [LOG_GOOD, LOG_BAD, LOG_BAD, LOG_BAD]) +]) +def test_log(log_path, log_statuses): + actual = eac_logchecker.check_checksum(log_path) + assert log_statuses == actual