You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
588 lines
24 KiB
588 lines
24 KiB
#!/usr/bin/env python3
|
|
# Copyright 2020 The Pigweed Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
# use this file except in compliance with the License. You may obtain a copy of
|
|
# the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations under
|
|
# the License.
|
|
"""Tests for detokenize."""
|
|
|
|
import base64
|
|
import datetime as dt
|
|
import io
|
|
import os
|
|
from pathlib import Path
|
|
import struct
|
|
import tempfile
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
from pw_tokenizer import database
|
|
from pw_tokenizer import detokenize
|
|
from pw_tokenizer import elf_reader
|
|
from pw_tokenizer import tokens
|
|
|
|
|
|
# This function is not part of this test. It was used to generate the binary
|
|
# strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and
|
|
# returns a Python byte string suitable for copying into Python source code.
|
|
def path_to_byte_string(path):
|
|
with open(path, 'rb') as fd:
|
|
data = fd.read()
|
|
|
|
output = []
|
|
indices = iter(range(len(data)))
|
|
|
|
while True:
|
|
line = ''
|
|
|
|
while len(line) < 70:
|
|
try:
|
|
i = next(indices)
|
|
except StopIteration:
|
|
break
|
|
|
|
line += repr(data[i:i + 1])[2:-1].replace("'", r'\'')
|
|
|
|
if not line:
|
|
return ''.join(output)
|
|
|
|
output.append(" b'{}'\n".format(''.join(line)))
|
|
|
|
|
|
# This is an empty ELF file. It was created from the ELF file for
|
|
# tokenize_test.cc with the command:
|
|
#
|
|
# arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT>
|
|
#
|
|
# The resulting ELF was converted to a Python binary string using
|
|
# path_to_byte_string function above.
|
|
EMPTY_ELF = (
|
|
b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
|
|
b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05'
|
|
b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
|
|
b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
|
|
b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
|
|
b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
|
|
b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00'
|
|
b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
|
|
b'\x00\x00\x00')
|
|
|
|
# This is an ELF file with only the pw_tokenizer sections. It was created
|
|
# from a tokenize_test binary built for the STM32F429i Discovery board. The
|
|
# pw_tokenizer sections were extracted with this command:
|
|
#
|
|
# arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
|
|
#
|
|
ELF_WITH_TOKENIZER_SECTIONS = Path(__file__).parent.joinpath(
|
|
'example_binary_with_tokenized_strings.elf').read_bytes()
|
|
|
|
TOKENS_IN_ELF = 22
|
|
|
|
# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
|
|
JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
|
|
|
|
|
|
class DetokenizeTest(unittest.TestCase):
|
|
"""Tests the detokenize.Detokenizer."""
|
|
def test_simple(self):
|
|
detok = detokenize.Detokenizer(
|
|
tokens.Database([
|
|
tokens.TokenizedStringEntry(0xcdab,
|
|
'%02d %s %c%%',
|
|
date_removed=dt.datetime.now())
|
|
]))
|
|
self.assertEqual(str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')),
|
|
'01 Two 3%')
|
|
|
|
def test_detokenize_extra_data_is_unsuccessful(self):
|
|
detok = detokenize.Detokenizer(
|
|
tokens.Database([
|
|
tokens.TokenizedStringEntry(1,
|
|
'no args',
|
|
date_removed=dt.datetime(1, 1, 1))
|
|
]))
|
|
|
|
result = detok.detokenize(b'\x01\0\0\0\x04args')
|
|
self.assertEqual(len(result.failures), 1)
|
|
string, args, remaining = result.failures[0]
|
|
self.assertEqual('no args', string)
|
|
self.assertFalse(args)
|
|
self.assertEqual(b'\x04args', remaining)
|
|
self.assertEqual('no args', string)
|
|
self.assertEqual('no args', str(result))
|
|
|
|
def test_detokenize_missing_data_is_unsuccessful(self):
|
|
detok = detokenize.Detokenizer(
|
|
tokens.Database([
|
|
tokens.TokenizedStringEntry(2,
|
|
'%s',
|
|
date_removed=dt.datetime(1, 1, 1))
|
|
]))
|
|
|
|
result = detok.detokenize(b'\x02\0\0\0')
|
|
string, args, remaining = result.failures[0]
|
|
self.assertEqual('%s', string)
|
|
self.assertEqual(len(args), 1)
|
|
self.assertEqual(b'', remaining)
|
|
self.assertEqual(len(result.failures), 1)
|
|
self.assertEqual('%s', str(result))
|
|
|
|
def test_detokenize_missing_data_with_errors_is_unsuccessful(self):
|
|
detok = detokenize.Detokenizer(tokens.Database([
|
|
tokens.TokenizedStringEntry(2,
|
|
'%s',
|
|
date_removed=dt.datetime(1, 1, 1))
|
|
]),
|
|
show_errors=True)
|
|
|
|
result = detok.detokenize(b'\x02\0\0\0')
|
|
string, args, remaining = result.failures[0]
|
|
self.assertIn('%s MISSING', string)
|
|
self.assertEqual(len(args), 1)
|
|
self.assertEqual(b'', remaining)
|
|
self.assertEqual(len(result.failures), 1)
|
|
self.assertIn('%s MISSING', str(result))
|
|
|
|
def test_unparsed_data(self):
|
|
detok = detokenize.Detokenizer(
|
|
tokens.Database([
|
|
tokens.TokenizedStringEntry(1,
|
|
'no args',
|
|
date_removed=dt.datetime(
|
|
100, 1, 1)),
|
|
]))
|
|
result = detok.detokenize(b'\x01\0\0\0o_o')
|
|
self.assertFalse(result.ok())
|
|
self.assertEqual('no args', str(result))
|
|
self.assertIn('o_o', repr(result))
|
|
self.assertIn('decoding failed', result.error_message())
|
|
|
|
def test_empty_db(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
|
|
self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
|
|
self.assertIn('unknown token',
|
|
detok.detokenize(b'1234').error_message())
|
|
self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
|
|
|
|
self.assertEqual('$' + base64.b64encode(b'1234').decode(),
|
|
str(detok.detokenize(b'1234')))
|
|
|
|
self.assertIsNone(detok.detokenize(b'').token)
|
|
|
|
def test_empty_db_show_errors(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
|
|
self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
|
|
self.assertIn('unknown token',
|
|
detok.detokenize(b'1234').error_message())
|
|
self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
|
|
self.assertIn('unknown token', str(detok.detokenize(b'1234')))
|
|
|
|
self.assertIsNone(detok.detokenize(b'').token)
|
|
|
|
def test_missing_token_show_errors(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
|
|
self.assertIn('missing token', detok.detokenize(b'').error_message())
|
|
self.assertIn('missing token', str(detok.detokenize(b'')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'123')))
|
|
|
|
self.assertIn('missing token', detok.detokenize(b'1').error_message())
|
|
self.assertIn('missing token', str(detok.detokenize(b'1')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'1')))
|
|
|
|
self.assertIn('missing token',
|
|
detok.detokenize(b'123').error_message())
|
|
self.assertIn('missing token', str(detok.detokenize(b'123')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'123')))
|
|
|
|
def test_missing_token(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
|
|
self.assertIn('missing token', detok.detokenize(b'').error_message())
|
|
self.assertEqual('$', str(detok.detokenize(b'')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'123')))
|
|
|
|
self.assertIn('missing token', detok.detokenize(b'1').error_message())
|
|
self.assertEqual('$' + base64.b64encode(b'1').decode(),
|
|
str(detok.detokenize(b'1')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'1')))
|
|
|
|
self.assertIn('missing token',
|
|
detok.detokenize(b'123').error_message())
|
|
self.assertEqual('$' + base64.b64encode(b'123').decode(),
|
|
str(detok.detokenize(b'123')))
|
|
self.assertIn('missing token', repr(detok.detokenize(b'123')))
|
|
|
|
def test_decode_from_elf_data(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
self.assertEqual(str(detok.detokenize(JELLO_WORLD_TOKEN)),
|
|
'Jello, world!')
|
|
|
|
undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk')
|
|
self.assertFalse(undecoded_args.ok())
|
|
self.assertEqual(str(undecoded_args), 'Jello, world!')
|
|
|
|
self.assertTrue(detok.detokenize(b'\0\0\0\0').ok())
|
|
self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '')
|
|
|
|
def test_decode_from_elf_file(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
expected_tokens = frozenset(detok.database.token_to_entries.keys())
|
|
|
|
elf = tempfile.NamedTemporaryFile('wb', delete=False)
|
|
try:
|
|
elf.write(ELF_WITH_TOKENIZER_SECTIONS)
|
|
elf.close()
|
|
|
|
# Open ELF by file object
|
|
with open(elf.name, 'rb') as fd:
|
|
detok = detokenize.Detokenizer(fd)
|
|
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
|
|
# Open ELF by path
|
|
detok = detokenize.Detokenizer(elf.name)
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
|
|
# Open ELF by elf_reader.Elf
|
|
with open(elf.name, 'rb') as fd:
|
|
detok = detokenize.Detokenizer(elf_reader.Elf(fd))
|
|
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
finally:
|
|
os.unlink(elf.name)
|
|
|
|
def test_decode_from_csv_file(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
expected_tokens = frozenset(detok.database.token_to_entries.keys())
|
|
|
|
csv_database = str(detok.database)
|
|
self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF)
|
|
|
|
csv_file = tempfile.NamedTemporaryFile('w', delete=False)
|
|
try:
|
|
csv_file.write(csv_database)
|
|
csv_file.close()
|
|
|
|
# Open CSV by path
|
|
detok = detokenize.Detokenizer(csv_file.name)
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
|
|
# Open CSV by file object
|
|
with open(csv_file.name) as fd:
|
|
detok = detokenize.Detokenizer(fd)
|
|
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
finally:
|
|
os.unlink(csv_file.name)
|
|
|
|
def test_create_detokenizer_with_token_database(self):
|
|
detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
expected_tokens = frozenset(detok.database.token_to_entries.keys())
|
|
|
|
detok = detokenize.Detokenizer(detok.database)
|
|
self.assertEqual(expected_tokens,
|
|
frozenset(detok.database.token_to_entries.keys()))
|
|
|
|
|
|
class DetokenizeWithCollisions(unittest.TestCase):
|
|
"""Tests collision resolution."""
|
|
def setUp(self):
|
|
super().setUp()
|
|
token = 0xbaad
|
|
|
|
# Database with several conflicting tokens.
|
|
self.detok = detokenize.Detokenizer(tokens.Database([
|
|
tokens.TokenizedStringEntry(
|
|
token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)),
|
|
tokens.TokenizedStringEntry(token, 'newer'),
|
|
tokens.TokenizedStringEntry(
|
|
token, 'A: %d', date_removed=dt.datetime(30, 5, 9)),
|
|
tokens.TokenizedStringEntry(
|
|
token, 'B: %c', date_removed=dt.datetime(30, 5, 10)),
|
|
tokens.TokenizedStringEntry(token, 'C: %s'),
|
|
tokens.TokenizedStringEntry(token, '%d%u'),
|
|
tokens.TokenizedStringEntry(token, '%s%u %d'),
|
|
tokens.TokenizedStringEntry(1, '%s'),
|
|
tokens.TokenizedStringEntry(1, '%d'),
|
|
tokens.TokenizedStringEntry(2, 'Three %s %s %s'),
|
|
tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'),
|
|
])) # yapf: disable
|
|
|
|
def test_collision_no_args_favors_most_recently_present(self):
|
|
no_args = self.detok.detokenize(b'\xad\xba\0\0')
|
|
self.assertFalse(no_args.ok())
|
|
self.assertEqual(len(no_args.successes), 2)
|
|
self.assertEqual(len(no_args.failures), 5)
|
|
self.assertEqual(len(no_args.matches()), 7)
|
|
self.assertEqual(str(no_args), 'newer')
|
|
self.assertEqual(len(no_args.best_result()[1]), 0)
|
|
self.assertEqual(no_args.best_result()[0], 'newer')
|
|
|
|
def test_collision_one_integer_arg_favors_most_recently_present(self):
|
|
multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a')
|
|
self.assertFalse(multiple_correct.ok())
|
|
self.assertIn('ERROR', repr(multiple_correct))
|
|
self.assertEqual(len(multiple_correct.successes), 2)
|
|
self.assertEqual(len(multiple_correct.failures), 5)
|
|
self.assertEqual(len(multiple_correct.matches()), 7)
|
|
self.assertEqual(str(multiple_correct), 'B: =')
|
|
|
|
def test_collision_one_integer_arg_favor_successful_decode(self):
|
|
# One string decodes successfully, since the arg is out of range for %c.
|
|
int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f')
|
|
self.assertTrue(int_arg.ok())
|
|
self.assertEqual(str(int_arg), 'A: 2147483647')
|
|
|
|
def test_collision_one_string_arg_favors_successful_decode(self):
|
|
# One string decodes successfully, since decoding the argument as an
|
|
# integer does not decode all the data.
|
|
string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi')
|
|
self.assertTrue(string_arg.ok())
|
|
self.assertEqual(str(string_arg), 'C: Hi')
|
|
|
|
def test_collision_one_string_arg_favors_decoding_all_data(self):
|
|
result = self.detok.detokenize(b'\1\0\0\0\x83hi')
|
|
self.assertEqual(len(result.failures), 2)
|
|
# Should resolve to the string since %d would leave one byte behind.
|
|
self.assertEqual(str(result), '%s')
|
|
|
|
def test_collision_multiple_args_favors_decoding_more_arguments(self):
|
|
result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5')
|
|
self.assertEqual(len(result.matches()), 2)
|
|
self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s')
|
|
self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s')
|
|
|
|
def test_collision_multiple_args_favors_decoding_all_arguments(self):
|
|
unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01')
|
|
self.assertTrue(unambiguous.ok())
|
|
self.assertEqual(len(unambiguous.matches()), 7)
|
|
self.assertEqual('#0 -1', str(unambiguous))
|
|
self.assertIn('#0 -1', repr(unambiguous))
|
|
|
|
|
|
@mock.patch('os.path.getmtime')
|
|
class AutoUpdatingDetokenizerTest(unittest.TestCase):
|
|
"""Tests the AutoUpdatingDetokenizer class."""
|
|
def test_update(self, mock_getmtime):
|
|
"""Tests the update command."""
|
|
|
|
db = database.load_token_database(
|
|
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
self.assertEqual(len(db), TOKENS_IN_ELF)
|
|
|
|
the_time = [100]
|
|
|
|
def move_back_time_if_file_exists(path):
|
|
if os.path.exists(path):
|
|
the_time[0] -= 1
|
|
return the_time[0]
|
|
|
|
raise FileNotFoundError
|
|
|
|
mock_getmtime.side_effect = move_back_time_if_file_exists
|
|
|
|
file = tempfile.NamedTemporaryFile('wb', delete=False)
|
|
try:
|
|
file.close()
|
|
|
|
detok = detokenize.AutoUpdatingDetokenizer(file.name,
|
|
min_poll_period_s=0)
|
|
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
|
|
with open(file.name, 'wb') as fd:
|
|
tokens.write_binary(db, fd)
|
|
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
finally:
|
|
os.unlink(file.name)
|
|
|
|
# The database stays around if the file is deleted.
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
|
|
def test_no_update_if_time_is_same(self, mock_getmtime):
|
|
mock_getmtime.return_value = 100
|
|
|
|
file = tempfile.NamedTemporaryFile('wb', delete=False)
|
|
try:
|
|
tokens.write_csv(
|
|
database.load_token_database(
|
|
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)), file)
|
|
file.close()
|
|
|
|
detok = detokenize.AutoUpdatingDetokenizer(file,
|
|
min_poll_period_s=0)
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
|
|
# Empty the database, but keep the mock modified time the same.
|
|
with open(file.name, 'wb'):
|
|
pass
|
|
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
|
|
# Move back time so the now-empty file is reloaded.
|
|
mock_getmtime.return_value = 50
|
|
self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
|
|
finally:
|
|
os.unlink(file.name)
|
|
|
|
|
|
def _next_char(message: bytes) -> bytes:
|
|
return bytes(b + 1 for b in message)
|
|
|
|
|
|
class PrefixedMessageDecoderTest(unittest.TestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.decode = detokenize.PrefixedMessageDecoder('$', 'abcdefg')
|
|
|
|
def test_transform_single_message(self):
|
|
self.assertEqual(
|
|
b'%bcde',
|
|
b''.join(self.decode.transform(io.BytesIO(b'$abcd'), _next_char)))
|
|
|
|
def test_transform_message_amidst_other_only_affects_message(self):
|
|
self.assertEqual(
|
|
b'%%WHAT?%bcd%WHY? is this %ok %', b''.join(
|
|
self.decode.transform(
|
|
io.BytesIO(b'$$WHAT?$abc$WHY? is this $ok $'),
|
|
_next_char)))
|
|
|
|
def test_transform_empty_message(self):
|
|
self.assertEqual(
|
|
b'%1%',
|
|
b''.join(self.decode.transform(io.BytesIO(b'$1$'), _next_char)))
|
|
|
|
def test_transform_sequential_messages(self):
|
|
self.assertEqual(
|
|
b'%bcd%efghh', b''.join(
|
|
self.decode.transform(io.BytesIO(b'$abc$defgh'), _next_char)))
|
|
|
|
|
|
class DetokenizeBase64(unittest.TestCase):
|
|
"""Tests detokenizing Base64 messages."""
|
|
|
|
JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN)
|
|
|
|
RECURSION_STRING = f'The secret message is "{JELLO.decode()}"'
|
|
RECURSION = b'$' + base64.b64encode(
|
|
struct.pack('I', tokens.default_hash(RECURSION_STRING)))
|
|
|
|
RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy."
|
|
RECURSION_2 = b'$' + base64.b64encode(
|
|
struct.pack('I', tokens.default_hash(RECURSION_STRING_2)))
|
|
|
|
TEST_CASES = (
|
|
(b'', b''),
|
|
(b'nothing here', b'nothing here'),
|
|
(JELLO, b'Jello, world!'),
|
|
(JELLO + b'a', b'Jello, world!a'),
|
|
(JELLO + b'abc', b'Jello, world!abc'),
|
|
(JELLO + b'abc=', b'Jello, world!abc='),
|
|
(b'$a' + JELLO + b'a', b'$aJello, world!a'),
|
|
(b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'),
|
|
(b'$' + JELLO, b'$Jello, world!'),
|
|
(JELLO + JELLO, b'Jello, world!Jello, world!'),
|
|
(JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'),
|
|
(JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'),
|
|
(b'$3141', b'$3141'),
|
|
(JELLO + b'$3141', b'Jello, world!$3141'),
|
|
(RECURSION, b'The secret message is "Jello, world!"'),
|
|
(RECURSION_2,
|
|
b'\'The secret message is "Jello, world!"\', said the spy.'),
|
|
)
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
db = database.load_token_database(
|
|
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
|
|
db.add(
|
|
tokens.TokenizedStringEntry(tokens.default_hash(s), s)
|
|
for s in [self.RECURSION_STRING, self.RECURSION_STRING_2])
|
|
self.detok = detokenize.Detokenizer(db)
|
|
|
|
def test_detokenize_base64_live(self):
|
|
for data, expected in self.TEST_CASES:
|
|
output = io.BytesIO()
|
|
detokenize.detokenize_base64_live(self.detok, io.BytesIO(data),
|
|
output, '$')
|
|
|
|
self.assertEqual(expected, output.getvalue())
|
|
|
|
def test_detokenize_base64_to_file(self):
|
|
for data, expected in self.TEST_CASES:
|
|
output = io.BytesIO()
|
|
detokenize.detokenize_base64_to_file(self.detok, data, output, '$')
|
|
|
|
self.assertEqual(expected, output.getvalue())
|
|
|
|
def test_detokenize_base64(self):
|
|
for data, expected in self.TEST_CASES:
|
|
self.assertEqual(
|
|
expected, detokenize.detokenize_base64(self.detok, data, b'$'))
|
|
|
|
|
|
class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
|
|
"""Tests that infinite Bas64 token recursion resolves."""
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.detok = detokenize.Detokenizer(
|
|
tokens.Database([
|
|
tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0
|
|
tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2
|
|
tokens.TokenizedStringEntry(2, '$AwAAAA=='), # token for 3
|
|
tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2
|
|
]))
|
|
|
|
def test_detokenize_self_recursion(self):
|
|
for depth in range(5):
|
|
self.assertEqual(
|
|
detokenize.detokenize_base64(self.detok,
|
|
b'This one is deep: $AAAAAA==',
|
|
recursion=depth),
|
|
b'This one is deep: $AAAAAA==')
|
|
|
|
def test_detokenize_self_recursion_default(self):
|
|
self.assertEqual(
|
|
detokenize.detokenize_base64(self.detok,
|
|
b'This one is deep: $AAAAAA=='),
|
|
b'This one is deep: $AAAAAA==')
|
|
|
|
def test_detokenize_cyclic_recursion_even(self):
|
|
self.assertEqual(
|
|
detokenize.detokenize_base64(self.detok,
|
|
b'I said "$AQAAAA=="',
|
|
recursion=2), b'I said "$AgAAAA=="')
|
|
|
|
def test_detokenize_cyclic_recursion_odd(self):
|
|
self.assertEqual(
|
|
detokenize.detokenize_base64(self.detok,
|
|
b'I said "$AQAAAA=="',
|
|
recursion=3), b'I said "$AwAAAA=="')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|