Fix for logs manually appended to each other

This commit is contained in:
itismadness
2019-06-08 10:53:54 -12:00
parent 0cf27512ed
commit a4760b83c1
8 changed files with 136 additions and 97 deletions

4
.gitignore vendored
View File

@@ -2,4 +2,6 @@
build/
dist/
*.egg-info/
.eggs/
.eggs/
.pytest_cache/
__pycache__/

13
Pipfile
View File

@@ -1,13 +0,0 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[dev-packages]
[packages]
chardet = "*"
pprp = "*"
[requires]
python_version = "3.6"

View File

@@ -1,16 +1,27 @@
#!/usr/bin/env python3
import argparse
import contextlib
import json
from pathlib import Path
import re
import pprp
CHECKSUM_MIN_VERSION = ('V1.0', 'beta', '1')
EAC_KEY = '9378716cf13e4265ae55338e940b376184da389e50647726b35f6f341ee3efd9'
def eac_checksum(text):
class Log:
def __init__(self, text):
self.text = text
self.unsigned_text = self.text
self.version = None
self.modified = False
self.old_checksum = None
self.checksum = None
def eac_checksum(log):
text = log.unsigned_text
# Ignore newlines
text = text.replace('\r', '').replace('\n', '')
@@ -19,8 +30,9 @@ def eac_checksum(text):
# Setup Rijndael-256 with a 256-bit blocksize
cipher = pprp.crypto_3.rijndael(
# Probably SHA256('super secret password') but it doesn't actually matter
key=bytes.fromhex('9378716cf13e4265ae55338e940b376184da389e50647726b35f6f341ee3efd9'),
# Probably SHA256('super secret password') but it doesn't
# actually matter
key=bytes.fromhex(EAC_KEY),
block_size=256 // 8
)
@@ -28,7 +40,7 @@ def eac_checksum(text):
plaintext = text.encode('utf-16-le')
# The IV is all zeroes so we don't have to handle it
signature = b'\x00' * 32
checksum = b'\x00' * 32
# Process it block-by-block
for i in range(0, len(plaintext), 32):
@@ -36,39 +48,37 @@ def eac_checksum(text):
plaintext_block = plaintext[i:i + 32].ljust(32, b'\x00')
# CBC mode (XOR the previous ciphertext block into the plaintext)
cbc_plaintext = bytes(a ^ b for a, b in zip(signature, plaintext_block))
cbc_plaintext = bytes(
a ^ b for a, b in zip(checksum, plaintext_block)
)
# New signature is the ciphertext.
signature = cipher.encrypt(cbc_plaintext)
# New checksum is the ciphertext.
checksum = cipher.encrypt(cbc_plaintext)
# Textual signature is just the hex representation
return signature.hex().upper()
# Textual checksum is just the hex representation
log.checksum = checksum.hex().upper()
def extract_info(text):
if len(text) == 0:
return text, None, None
def extract_info(log):
if len(log.text) == 0:
return log
version = None
for line in text.splitlines():
for line in log.text.splitlines():
if line.startswith('Exact Audio Copy'):
version = tuple(line.split()[3:6])
log.version = tuple(line.split()[3:6])
elif re.match(r'[a-zA-Z]', line):
break
match = re.search('\n\n==== (.*) ([A-Z0-9]+) ====', text)
match = re.search('\n\n==== (.*) ([A-Z0-9]+) ====', log.text)
if match:
text, signature_parts = re.split('\n\n==== {}'.format(match.group(1)), text)
signature = signature_parts.split()[0].strip()
else:
signature = None
return text, version, signature
search = '\n\n==== {}'.format(match.group(1))
log.unsigned_text, checksum_parts = re.split(search, log.text)
log.old_checksum = checksum_parts.split()[0].strip()
def eac_verify(text):
unsigned_text, version, old_signature = extract_info(text)
return unsigned_text, version, old_signature, eac_checksum(unsigned_text)
def eac_verify(log):
extract_info(log)
eac_checksum(log)
def get_logs(data):
@@ -85,35 +95,64 @@ def get_logs(data):
# Null bytes screw it up
if '\x00' in text:
text = text[:text.index('\x00')]
# EAC crashes if there are more than 2^14 bytes in a line
if any(len(l) + 1 > 2**13 for l in text.split('\n')):
raise RuntimeError('EAC cannot handle lines longer than 2^13 chars')
return [x.strip() for x in re.split(r'[^-]-{60}[^-]', text)]
splits = re.split('(\n\n==== .* [A-Z0-9]+ ====)', text)
logs = []
for split in splits:
if split.strip() != '':
logs.append(split)
if len(logs) > 1:
length = len(logs) - 1 if len(logs) % 2 == 1 else len(logs)
return_logs = []
for i in range(0, length, 2):
log = Log(logs[i] + logs[i+1])
if i > 0:
(log.text, matches) = re.subn(
r'[^-]-{60}[^-]',
'',
log.text,
1
)
if matches == 0:
log.modified = True
return_logs.append(log)
for i in range(length, len(logs)):
return_logs.append(Log(logs[i]))
else:
return_logs = [Log(logs[0])]
return return_logs
def check_checksum(arg_file, arg_json):
def check_checksum(arg_file):
if not isinstance(arg_file, Path):
arg_file = Path(arg_file)
output = []
if not arg_file.exists():
if not arg_json:
print('Could not find logfile to examine.')
output.append({
'status': 'ERROR',
'message': 'Could not find logfile to examine.'
})
return output
try:
with arg_file.open('rb') as open_file:
logs = get_logs(open_file.read())
for log in logs:
data, version, old_signature, actual_signature = eac_verify(log)
eac_verify(log)
if version is None or old_signature is None:
if log.version is None or log.old_checksum is None:
message = 'Log entry has no checksum!'
status = "NO"
elif old_signature != actual_signature:
elif log.modified or log.old_checksum != log.checksum:
message = 'Log entry was modified, checksum incorrect!'
status = "BAD"
else:
@@ -135,7 +174,9 @@ def check_checksum(arg_file, arg_json):
def main():
parser = argparse.ArgumentParser(description='Verifies and resigns EAC logs')
parser = argparse.ArgumentParser(
description='Verifies and resigns EAC logs'
)
parser.add_argument('--json', action='store_true', help='Output as JSON')
parser.add_argument('file', type=Path, help='input log file')
@@ -145,12 +186,13 @@ def main():
if not args.json:
print('Log Integrity Checker (C) 2010 by Andre Wiethoff')
print('')
output = check_checksum(args.file, args.json)
output = check_checksum(args.file)
if args.json:
print(json.dumps(output))
else:
for i in range(len(output)):
print('{:d}. {:s}'.format(i+1, output[i]['message']))
if __name__ == '__main__':
main()

BIN
logs/27.log Normal file

Binary file not shown.

5
setup.cfg Normal file
View File

@@ -0,0 +1,5 @@
[aliases]
test=pytest
[tool:pytest]
addopts = --verbose

View File

@@ -82,6 +82,8 @@ setup(
]
},
install_requires=['pprp'],
setup_requires=['pytest-runner'],
tests_require=['pytest'],
license='MIT',
classifiers=[
'Development Status :: 4 - Beta',

46
test.py
View File

@@ -1,46 +0,0 @@
#!/usr/bin/env python3
from pathlib import Path
import unittest
import eac_logchecker
TESTS = [
(Path('logs/01.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/02.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/03.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/04.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/05.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/06.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/07.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/08.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]),
(Path('logs/09.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/10.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/11.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/12.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/13.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/14.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/15.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]),
(Path('logs/16.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}]),
(Path('logs/17.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/18.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}]),
(Path('logs/19.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/20.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/21.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/22.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/23.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/24.log'), [{'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/25.log'), [{'message': 'Log entry is fine!', 'status': 'OK'}, {'message': 'Log entry has no checksum!', 'status': 'NO'}]),
(Path('logs/26.log'), [{'message': 'Log entry was modified, checksum incorrect!', 'status': 'BAD'}])
]
class TestLogchecker(unittest.TestCase):
def test_logs(self):
for log_file, expected in TESTS:
with self.subTest(log=str(log_file)):
actual = eac_logchecker.check_checksum(log_file, True)
self.assertEqual(expected, actual)
if __name__ == "__main__":
unittest.main()

47
test_eac_logchecker.py Executable file
View File

@@ -0,0 +1,47 @@
#!/usr/bin/env python3
from pathlib import Path
import pytest
import eac_logchecker
LOG_GOOD = {'message': 'Log entry is fine!', 'status': 'OK'}
LOG_NO = {'message': 'Log entry has no checksum!', 'status': 'NO'}
LOG_BAD = {
'message': 'Log entry was modified, checksum incorrect!',
'status': 'BAD'
}
@pytest.mark.parametrize("log_path, log_statuses", [
(Path('logs/01.log'), [LOG_GOOD, LOG_GOOD]),
(Path('logs/02.log'), [LOG_GOOD]),
(Path('logs/03.log'), [LOG_NO]),
(Path('logs/04.log'), [LOG_GOOD, LOG_NO]),
(Path('logs/05.log'), [LOG_GOOD, LOG_GOOD]),
(Path('logs/06.log'), [LOG_NO]),
(Path('logs/07.log'), [LOG_NO]),
(Path('logs/08.log'), [LOG_BAD]),
(Path('logs/09.log'), [LOG_NO]),
(Path('logs/10.log'), [LOG_NO]),
(Path('logs/11.log'), [LOG_NO]),
(Path('logs/12.log'), [LOG_GOOD]),
(Path('logs/13.log'), [LOG_GOOD]),
(Path('logs/14.log'), [LOG_NO]),
(Path('logs/15.log'), [LOG_BAD]),
(Path('logs/16.log'), [LOG_BAD]),
(Path('logs/17.log'), [LOG_NO]),
(Path('logs/18.log'), [LOG_GOOD]),
(Path('logs/19.log'), [LOG_NO]),
(Path('logs/20.log'), [LOG_NO]),
(Path('logs/21.log'), [LOG_NO]),
(Path('logs/22.log'), [LOG_NO]),
(Path('logs/23.log'), [LOG_NO]),
(Path('logs/24.log'), [LOG_NO]),
(Path('logs/25.log'), [LOG_GOOD, LOG_NO]),
(Path('logs/26.log'), [LOG_BAD]),
(Path('logs/27.log'), [LOG_GOOD, LOG_BAD, LOG_BAD, LOG_BAD])
])
def test_log(log_path, log_statuses):
actual = eac_logchecker.check_checksum(log_path)
assert log_statuses == actual