diff --git a/tools/run-mypy b/tools/run-mypy index b2b25e1981..4a967ca8d4 100755 --- a/tools/run-mypy +++ b/tools/run-mypy @@ -38,7 +38,6 @@ exclude_py2 = [] # type: List[str] exclude_py3 = """ bots/process_ccache -zerver/lib/ccache.py """.split() parser = argparse.ArgumentParser(description="Run mypy on files tracked by git.") diff --git a/zerver/lib/ccache.py b/zerver/lib/ccache.py index bcf2b9e484..985fa57036 100644 --- a/zerver/lib/ccache.py +++ b/zerver/lib/ccache.py @@ -27,6 +27,7 @@ from typing import Any, Dict, Optional, Text # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from zerver.lib.str_utils import force_bytes import base64 import struct import six @@ -38,22 +39,22 @@ import six # limiting MIT Kerberos's exposure to malformed ccaches, encode it # ourselves. To that end, here's the laziest DER encoder ever. def der_encode_length(length): - # type: (int) -> str + # type: (int) -> bytes if length <= 127: - return chr(length) - out = "" + return force_bytes(chr(length)) + out = b"" while length > 0: - out = chr(length & 0xff) + out + out = force_bytes(chr(length & 0xff)) + out length >>= 8 - out = chr(len(out) | 0x80) + out + out = force_bytes(chr(len(out) | 0x80)) + out return out def der_encode_tlv(tag, value): - # type: (int, str) -> str - return chr(tag) + der_encode_length(len(value)) + value + # type: (int, bytes) -> bytes + return force_bytes(chr(tag)) + der_encode_length(len(value)) + value def der_encode_integer_value(val): - # type: (int) -> str + # type: (int) -> bytes if not isinstance(val, six.integer_types): raise TypeError("int") # base 256, MSB first, two's complement, minimum number of octets @@ -64,45 +65,45 @@ def der_encode_integer_value(val): # Special-case to avoid an empty encoding. if val == 0: - return "\x00" + return b"\x00" sign = 0 # What you would get if you sign-extended the current high bit. - out = "" + out = b"" # We can stop once sign-extension matches the remaining value. while val != sign: byte = val & 0xff - out = chr(byte) + out + out = force_bytes(chr(byte)) + out sign = -1 if byte & 0x80 == 0x80 else 0 val >>= 8 return out def der_encode_integer(val): - # type: (int) -> str + # type: (int) -> bytes return der_encode_tlv(0x02, der_encode_integer_value(val)) def der_encode_int32(val): - # type: (int) -> str + # type: (int) -> bytes if val < -2147483648 or val > 2147483647: raise ValueError("Bad value") return der_encode_integer(val) def der_encode_uint32(val): - # type: (int) -> str + # type: (int) -> bytes if val < 0 or val > 4294967295: raise ValueError("Bad value") return der_encode_integer(val) def der_encode_string(val): - # type: (Text) -> str + # type: (Text) -> bytes if not isinstance(val, Text): raise TypeError("unicode") return der_encode_tlv(0x1b, val.encode("utf-8")) def der_encode_octet_string(val): - # type: (str) -> str - if not isinstance(val, str): - raise TypeError("str") + # type: (bytes) -> bytes + if not isinstance(val, bytes): + raise TypeError("bytes") return der_encode_tlv(0x04, val) def der_encode_sequence(tlvs, tagged=True): - # type: (List[str], Optional[bool]) -> str + # type: (List[bytes], Optional[bool]) -> bytes body = [] for i, tlv in enumerate(tlvs): # Missing optional elements represented as None. @@ -112,10 +113,10 @@ def der_encode_sequence(tlvs, tagged=True): # Assume kerberos-style explicit tagging of components. tlv = der_encode_tlv(0xa0 | i, tlv) body.append(tlv) - return der_encode_tlv(0x30, "".join(body)) + return der_encode_tlv(0x30, b"".join(body)) def der_encode_ticket(tkt): - # type: (Dict[str, Any]) -> str + # type: (Dict[str, Any]) -> bytes return der_encode_tlv( 0x61, # Ticket der_encode_sequence( @@ -138,17 +139,17 @@ def der_encode_ticket(tkt): # http://www.gnu.org/software/shishi/manual/html_node/The-Credential-Cache-Binary-File-Format.html def ccache_counted_octet_string(data): - # type: (str) -> bytes - if not isinstance(data, str): - raise TypeError("str") + # type: (bytes) -> bytes + if not isinstance(data, bytes): + raise TypeError("bytes") return struct.pack("!I", len(data)) + data def ccache_principal(name, realm): - # type: (Dict[str, str], str) -> str + # type: (Dict[str, str], str) -> bytes header = struct.pack("!II", name["nameType"], len(name["nameString"])) - return (header + ccache_counted_octet_string(realm.encode("utf-8")) + - "".join(ccache_counted_octet_string(c.encode("utf-8")) - for c in name["nameString"])) + return (header + ccache_counted_octet_string(force_bytes(realm)) + + b"".join(ccache_counted_octet_string(force_bytes(c)) + for c in name["nameString"])) def ccache_key(key): # type: (Dict[str, str]) -> bytes @@ -164,7 +165,7 @@ def flags_to_uint32(flags): return ret def ccache_credential(cred): - # type: (Dict[str, Any]) -> str + # type: (Dict[str, Any]) -> bytes out = ccache_principal(cred["cname"], cred["crealm"]) out += ccache_principal(cred["sname"], cred["srealm"]) out += ccache_key(cred["key"]) @@ -179,11 +180,11 @@ def ccache_credential(cred): out += struct.pack("!II", 0, 0) out += ccache_counted_octet_string(der_encode_ticket(cred["ticket"])) # No second_ticket. - out += ccache_counted_octet_string("") + out += ccache_counted_octet_string(b"") return out def make_ccache(cred): - # type: (Dict[str, Any]) -> str + # type: (Dict[str, Any]) -> bytes # Do we need a DeltaTime header? The ccache I get just puts zero # in there, so do the same. out = struct.pack("!HHHHII",