Source code for bushel.directory.document

"""
The :mod:`bushel.directory.document` module provides base classes and utility
methods for handling documents that implement the Tor directory protocol
version 3 meta format (§1.2 [dir-spec]_).

For specific document types, see:

.. toctree::
   :maxdepth: 2

   detached_signatures.rst
   network_status.rst
   server_descriptor.rst
   extra_info_descriptor.rst
"""

import base64
import collections
import datetime
import enum
import logging
import re
import textwrap

import nacl.signing
import nacl.encoding

from bushel.document import BaseDocument

LOG = logging.getLogger('bushel')

[docs]def parse_timestamp(item, argindex=0): """ Parses a timestamp from a directory document's item using the common format from [dir-spec]_. This format is not defined explicitly but is used with many keywords including ``valid-after``, ``fresh-until``, and ``valid-until``. .. note:: Due to the way the tokenizer works, timestamps are parsed as two arguments split by whitespace. This function takes this into account when parsing the timestamp. Most items will have the timestamp as the first argument on the keyword line. At the time of writing, there are no keywords defined that expect timestamps at other indexes. Should this be required though, *argindex* may be used to parse a timestamp from a later argument. :param DirectoryDocumentItem item: the directory document item :param int argindex: zero-indexed index of date portion of timestamp, the time portion is expected in ``argindex+1`` :returns: the parsed timestamp :rtype: ~datetime.datetime """ timestamp = f"{item.arguments[argindex]} {item.arguments[argindex+1]}" return datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
[docs]def decode_object_data(lines): """ Decodes the base64 encoded data found within directory document objects. :param list(str) lines: the lines as found in a directory document object, not including newlines or the begin/end lines :returns: the decoded data :rtype: bytes """ return base64.b64decode("".join(lines))
[docs]def encode_object_data(data): """ Encodes bytes using base64 and wraps the lines at 64 charachters. :param bytes data: the data to be encoded :returns: the line-wrapped base64 encoded data as a list of strings, one string per line :rtype: list(str) """ encoded_data = base64.b64encode(data).decode("ascii") return textwrap.wrap(encoded_data, width=64, break_long_words=True)
def expect_arguments(minargs, maxargs, strictmax=False): def expect_arguments_decorator(parser_func): def function_wrapper(self, item): if len(item.arguments) < minargs: raise RuntimeError( "Incorrect number of arguments found for " f"{item.keyword} item in document. Expected at " f"least {minargs} but found {len(item.arguments)}") elif len(item.arguments) > maxargs: msg = ( f"Found additional arguments for {item.keyword} item in " f"document. Expected no more than {maxargs} but found " f"{len(item.arguments)}.") if strictmax: raise RuntimeError(msg) else: LOG.warning(msg) return parser_func(self, item) return function_wrapper return expect_arguments_decorator
[docs]class DirectoryCertificateExtension( collections.namedtuple('DirectoryCertificateExtension', ['type', 'flags', 'data'])): """ A Tor Ed25519 certificate extension as specified by [cert-spec]_. .. graphviz:: digraph g { rankdir=LR; certificate [label="Certificate",shape="box"]; extension [label="Extension",shape="box",style="filled",fillcolor="yellow"]; certificate->extension [label="has zero or more"]; } :var int type: extension type :var int flags: extension flags :var bytes data: extension data .. seealso:: These will be found in :class:`DirectoryCertificate` s. """
[docs]class DirectoryCertificate: """ A Tor Ed25519 certificate as specified by [cert-spec]_. It is not the only certificate format that Tor uses. Typically these are found as the data contained within :class:`DirectoryDocumentObject` s. .. graphviz:: digraph g { rankdir=LR; certificate [label="Certificate",shape="box",style="filled",fillcolor="yellow"]; extension [label="Extension",shape="box"]; certificate->extension [label="has zero or more"]; } :param bytes raw_content: raw certificate contents :var bytes data: raw certificate contents :var int version: version of the certificate format (currently always 1) :var int cert_type: type of certificate :var ~datetime.datetime expiration_date: expiration date of certificate :var int cert_key_type: type of certified key :var bytes certified_key: an Ed25519 public key if cert_key_type is 1, or a SHA256 hash of some other key type depending on the value of cert_key_type :var int n_extensions: declared number of extensions :var list(DirectoryCertificateExtension) extensions: parsed extensions :var bytes signature: certificate signature """ def __init__(self, raw_content): self.raw_content = raw_content self.version = None self.cert_type = None self.expiration_date = None self.cert_key_type = None self.certified_key = None self.n_extensions = None self.extensions = None self.signature = None
[docs] def parse(self): """ Parses the certificate to make the fields available via instance attributes. This does not validate or verify the certificate, but must be called before making calls to :meth:`~DirectoryCertificate.is_valid` or :meth:`~DirectoryCertificate.verify`. """ # TODO: check that the data is at least long enough for zero extensions self.version = int.from_bytes(self.raw_content[0:1], "big") self.cert_type = int.from_bytes(self.raw_content[1:2], "big") self.expiration_date = datetime.datetime.utcfromtimestamp( int.from_bytes(self.raw_content[2:6], "big") * 3600) self.cert_key_type = int.from_bytes(self.raw_content[6:7], "big") self.certified_key = self.raw_content[7:39] self.n_extensions = int.from_bytes(self.raw_content[39:40], "big") index = self._parse_extensions() # end of extensions if len(self.raw_content) - index == 64: self.signature = self.raw_content[-64:] else: pass
# TODO: throw parse error if it went wrong def _parse_extensions(self): self.extensions = [] index = 40 for _ in range(self.n_extensions): # len(length + kind + flags) = 4 length = int.from_bytes(self.raw_content[index:index + 2], "big") kind = int.from_bytes(self.raw_content[index + 2:index + 3], "big") flags = int.from_bytes(self.raw_content[index + 3:index + 4], "big") data = self.raw_content[index + 4:index + 4 + length] self.extensions.append( DirectoryCertificateExtension(kind, flags, data)) index += 4 + length return index
[docs] def is_valid(self): """ Checks that the certificate is valid. This is the counterpart to :meth:`~DirectoryCertificate.verify` that checks that the certificate data conforms to the specification. The two checks performed are: * expiration date is not passed * there are no extensions that affect validation that we do not understand .. note:: In the Tor Metrics use case, we need to check that certificates were valid at the time they were expected to be valid, but the current API does not support this. """ if self.expiration_date > datetime.datetime.utcnow(): # TODO: Need to check based on provided time, not just now raise RuntimeError("Attempted to validate a certificate but it " "has expired.") known_extension_kinds = [4] # TODO: make this more global for extension in self.extensions: if extension.kind not in known_extension_kinds: raise RuntimeError("Certificate has unknown extensions that " "affect validation, so cannot validate.")
[docs] def verify(self, verify_key_data=None): """ Verify the certificate using the verification key. Optionally provide key material, otherwise the key found in the "signed-with-ed25519-key" (type 4) extension will be used. This only verifies the signature. To validate the certificate data the seperate :meth:`DirectoryCertificate.is_valid` method must be used. .. warning:: This verifies the raw data that the object was initialized with, the fields may have been played with since parsing and the parser may also have unknown bugs. :param bytes verify_key_data: an Ed25519 verification key """ if not verify_key_data: for extension in self.extensions: if extension.type == 4: # Signed-with-ed25519-key extension verify_key_data = extension.data break verify_key = nacl.signing.VerifyKey(verify_key_data, nacl.encoding.RawEncoder) verify_key.verify(self.raw_content[:-64], self.signature) return True
[docs]class DirectoryDocumentItem: """ A directory document item as described in the Tor directory protocol meta format (§1.2 [dir-spec]_). .. graphviz:: digraph g { rankdir=LR; document [label="Document",shape="box"]; item [label="Item",style="filled",fillcolor="yellow",shape="box"]; object [label="Object",shape="box"]; document->item [label="has one or more"]; item->object [label="has zero or more"]; } :param bytes keyword: the item keyword :param list(bytes) arguments: list of item arguments :param list(tuple(bytes,bytes)) objects: list of item objects as tuples of (object keyword, decoded object data) :param list(DirectoryDocumentItemError) errors: list of errors found during item parsing :var bytes keyword: the item keyword :var list(bytes) arguments: list of item arguments :var list(tuple(bytes,bytes)) objects: list of item objects as tuples of (object keyword, decoded object data) :var list(DirectoryDocumentItemError) errors: list of errors found during item parsing """ def __init__(self, keyword, arguments, objects, errors): self.keyword = keyword self.arguments = arguments self.objects = objects self.errors = errors def __str__(self): if self.arguments: arguments = " " + " ".join(self.arguments) else: arguments = "" object_lines = [] if self.objects: for obj in self.objects: object_lines.append(f"-----BEGIN {obj.keyword}-----") object_lines.extend(encode_object_data(obj.data)) object_lines.append(f"-----END {obj.keyword}-----") lines = [f"{self.keyword}{arguments}"] lines.extend(object_lines) return "\n".join(lines)
[docs]class DirectoryDocumentItemError(enum.Enum): """ Enumeration of forgivable errors that may be encountered during itemization of a directory document. ======================= =========== Name Description ======================= =========== TRAILING_WHITESPACE Trailing whitespace on KeywordLines https://bugs.torproject.org/30105 ======================= =========== """ TRAILING_WHITESPACE = 'trailing-whitespace'
[docs]class DirectoryDocumentItemizer: """ Parses :class:`DirectoryDocumentToken` s into :class:`DirectoryDocumentItem` s. By default this is a strict implementation of the Tor directory protocol meta format (§1.2 [dir-spec]_), but this can be relaxed to account for implementation bugs in known Tor implementations. Items are produced by processing tokens according to a state machine: .. graphviz:: digraph g { start [label="START"]; keyword_line [label="KEYWORD-LINE"]; keyword_line_ws [label="KEYWORD-LINE-WS"]; keyword_line_end [label="KEYWORD-LINE-END"]; object_data [label="OBJECT-DATA"]; object_data_eol [label="OBJECT-DATA-EOL"]; start -> keyword_line [label="PRINATABLE"]; keyword_line -> keyword_line_end [label="NL"]; keyword_line -> keyword_line_ws [label="WS"]; keyword_line_ws -> keyword_line [label="PRINTABLE"]; keyword_line_ws -> keyword_line_end [label="NL", color="red"]; keyword_line_end -> object_data [label="BEGIN"]; keyword_line_end -> start [label="EOF"]; keyword_line_end -> keyword_line [label="PRINTABLE"]; object_data -> object_data_eol [label="PRINTABLE"]; object_data_eol -> object_data [label="NL"]; object_data -> keyword_line_end [label="END"]; } State transitions shown in red would ideally not be needed as they are protocol violations, but implementations of the protocol exist that produce documents requiring these transitions and we need to be bug compatible. .. warning:: All printable strings are treated equally right now, so we're not testing for keywords being the restricted set, nor are we decoding object data yet. :param allowed_errors: A list of errors that will be considered non-fatal during itemization. :type allowed_errors: list(DirectoryDocumentItemError) """ def __init__(self, allowed_errors=None): self.state = 'START' self.allowed_errors = allowed_errors or [] self.token = None self.token_handlers = { 'START': self.token_start, 'KEYWORD-LINE': self.token_keyword_line, 'KEYWORD-LINE-WS': self.token_keyword_line_ws, 'KEYWORD-LINE-END': self.token_keyword_line_end, 'OBJECT-DATA': self.token_object_data, 'OBJECT-DATA-EOL': self.token_object_data_eol, } # item state follows self.keyword = None self.arguments = [] self.objects = [] self.errors = [] # object state follows self.object_keyword = None self.object_data = [] def reset_item_state(self, next_keyword=None): self.keyword = next_keyword self.arguments = [] self.objects = [] self.errors = [] self.reset_object_state() def reset_object_state(self): self.object_keyword = None self.object_data = [] def item_done(self, next_keyword=None): done_item = self.item() self.reset_item_state(next_keyword=next_keyword) self.state = 'KEYWORD-LINE' if next_keyword else 'START' return done_item def error(self, error): if error in self.allowed_errors: self.errors.append(error) else: raise RuntimeError(f"Encountered a {error.value} error on line " f"{self.token.line} at col {self.token.column}") def expected_not_found(self, expected): raise RuntimeError(f"Expected {expected} on line " f"{self.token.line} at " f"col {self.token.column}, but found " f"{self.token.kind} {self.token.value}") def item(self): return DirectoryDocumentItem(self.keyword, self.arguments, self.objects, self.errors) def eat(self, token): #LOG.info("Itemizer state is %s", self.state) #LOG.info("Next token is %s", token) self.token = token return self.token_handlers[self.state]() def token_start(self): if self.token.kind == 'PRINTABLE': self.keyword = self.token.value self.state = 'KEYWORD-LINE' else: self.expected_not_found("keyword") def token_keyword_line(self): if self.token.kind == 'NL': self.state = 'KEYWORD-LINE-END' elif self.token.kind == 'WS': self.state = 'KEYWORD-LINE-WS' else: self.expected_not_found("whitespace or newline") def token_keyword_line_ws(self): if self.token.kind == 'NL': self.error(DirectoryDocumentItemError.TRAILING_WHITESPACE) self.state = 'KEYWORD-LINE-END' elif self.token.kind == 'PRINTABLE': self.arguments.append(self.token.value) self.state = 'KEYWORD-LINE' else: self.expected_not_found("argument") def token_keyword_line_end(self): if self.token.kind == 'BEGIN': self.object_keyword = self.token.value self.state = 'OBJECT-DATA' elif self.token.kind == 'PRINTABLE': return self.item_done(next_keyword=self.token.value) # TODO: Why am I passing this? elif self.token.kind == 'EOF': return self.item_done() else: self.expected_not_found("begin line, keyword or EOF") return None def token_object_data(self): if self.token.kind == 'END': self.objects.append(DirectoryDocumentObject( self.object_keyword, decode_object_data(self.object_data) )) self.reset_object_state() self.state = 'KEYWORD-LINE-END' elif self.token.kind == 'PRINTABLE': self.object_data.append(self.token.value) self.state = 'OBJECT-DATA-EOL' else: self.expected_not_found("object data or end line") def token_object_data_eol(self): if self.token.kind == 'NL': self.state = 'OBJECT-DATA' else: self.expected_not_found("newline")
[docs]class DirectoryDocument(BaseDocument): """ A directory document as described in the Tor directory protocol meta format (§1.2 [dir-spec]_). .. graphviz:: digraph g { rankdir=LR; document [label="Document",shape="box",style="filled",fillcolor="yellow"]; item [label="Item",shape="box"]; object [label="Object",shape="box"]; document->item [label="has one or more"]; item->object [label="has zero or more"]; } :param bytes raw_content: raw document contents """ def __init__(self, raw_content): super().__init__(raw_content) self.PARSE_FUNCTIONS = dict() def parse(self): for item in self.items(): if item.keyword in self.PARSE_FUNCTIONS: self.PARSE_FUNCTIONS[item.keyword](item) def items(self, allowed_errors=None): itemizer = DirectoryDocumentItemizer(allowed_errors) for token in self.tokenize(): item = itemizer.eat(token) if item: yield item
[docs] def tokenize(self): """ Tokenizes the document using the following tokens: ================== ======================================= ======== Kind Matches on Value ================== ======================================= ======== END ``"-----END " Keyword "-----"`` Keyword BEGIN ``"-----BEGIN " Keyword "-----"`` Keyword NL The ascii LF character (hex value 0x0a) Raw data PRINTABLE Printing, non-whitespace, UTF-8 Raw data WS Space or tab Raw data MISMATCH Anything else (likely binary nonsense) Raw data ================== ======================================= ======== Note that these tokens do not match the non-terminals exactly as they are specified in the Tor directory protocol meta format. In particular, the PRINTABLE token is used for both keywords and arguments (and object data). It is up to whatever is processing these tokens to decide if something is valid keyword or argument. >>> document_bytes = b'''super-keyword 3 ... onion-magic ... -----BEGIN ONION MAGIC----- ... AQQABp6MAT7yJjlcuWLDbr8A5J8YgyDh5SPYkLpj7fmcBaFbKekjAQAgBADKnR/C ... -----END ONION MAGIC----- ... ''' >>> for token in DirectoryDocument(document_bytes).tokenize(): ... print(token) # doctest: +ELLIPSIS DirectoryDocumentToken(kind='PRINTABLE', value='super-keyword', line=1, column=0) DirectoryDocumentToken(kind='WS', value=' ', line=1, column=13) DirectoryDocumentToken(kind='PRINTABLE', value='3', line=1, column=14) DirectoryDocumentToken(kind='NL', value='\\n', line=1, column=15) DirectoryDocumentToken(kind='PRINTABLE', value='onion-magic', line=2, column=0) DirectoryDocumentToken(kind='NL', value='\\n', line=2, column=11) DirectoryDocumentToken(kind='BEGIN', value='ONION MAGIC', line=3, column=0) DirectoryDocumentToken(kind='PRINTABLE', value='AQQ...DKnR/C', line=4, column=0) DirectoryDocumentToken(kind='NL', value='\\n', line=4, column=64) DirectoryDocumentToken(kind='END', value='ONION MAGIC', line=5, column=0) DirectoryDocumentToken(kind='EOF', value=None, line=6, column=0) :returns: iterator for :class:`DirectoryDocumentToken` """ token_specification = [('END', r'-----END [A-Za-z0-9- ]+-----\n'), ('BEGIN', r'-----BEGIN [A-Za-z0-9- ]+-----\n'), ('NL', r'\n'), ('PRINTABLE', r'\S+'), ('WS', r'[ \t]+'), ('MISMATCH', r'.')] tok_regex = '|'.join( '(?P<%s>%s)' % pair for pair in token_specification) line_num = 1 line_start = 0 for mo in re.finditer(tok_regex, self.raw_content.decode('utf-8')): kind = mo.lastgroup value = mo.group() column = mo.start() - line_start if kind == 'BEGIN': value = value[11:-6] elif kind == 'END': value = value[9:-6] elif kind == 'MISMATCH': raise RuntimeError( f'{value!r} unexpected on line {line_num} at col {column}') yield DirectoryDocumentToken(kind, value, line_num, column) if kind in ['NL', 'BEGIN', 'END']: line_start = mo.end() line_num += 1 column = mo.end() - line_start yield DirectoryDocumentToken('EOF', None, line_num, column)
[docs]class DirectoryDocumentToken(collections.namedtuple('DirectoryDocumentToken', ['kind', 'value', 'line', 'column'])): """ :var str kind: the kind of token :var bytes value: kind-dependent value :var int line: line number :var int column: column number """
[docs]class DirectoryDocumentObject(collections.namedtuple('DirectoryDocumentObject', ['keyword', 'data'])): """ A directory document item as described in the Tor directory protocol meta format (§1.2 [dir-spec]_). .. graphviz:: digraph g { rankdir=LR; document [label="Document",shape="box"]; item [label="Item",shape="box"]; object [label="Object",shape="box",style="filled",fillcolor="yellow"]; document->item [label="has one or more"]; item->object [label="has zero or more"]; } :var bytes keyword: object keyword :var bytes data: decoded object data """