mypy: Make zerver/lib/ccache.py support python 3.

This commit is contained in:
Tim Abbott
2017-02-10 18:57:14 -08:00
parent 923f4ddd80
commit 650469ead6
2 changed files with 32 additions and 32 deletions

View File

@@ -38,7 +38,6 @@ exclude_py2 = [] # type: List[str]
exclude_py3 = """ exclude_py3 = """
bots/process_ccache bots/process_ccache
zerver/lib/ccache.py
""".split() """.split()
parser = argparse.ArgumentParser(description="Run mypy on files tracked by git.") parser = argparse.ArgumentParser(description="Run mypy on files tracked by git.")

View File

@@ -27,6 +27,7 @@ from typing import Any, Dict, Optional, Text
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE. # SOFTWARE.
from zerver.lib.str_utils import force_bytes
import base64 import base64
import struct import struct
import six import six
@@ -38,22 +39,22 @@ import six
# limiting MIT Kerberos's exposure to malformed ccaches, encode it # limiting MIT Kerberos's exposure to malformed ccaches, encode it
# ourselves. To that end, here's the laziest DER encoder ever. # ourselves. To that end, here's the laziest DER encoder ever.
def der_encode_length(length): def der_encode_length(length):
# type: (int) -> str # type: (int) -> bytes
if length <= 127: if length <= 127:
return chr(length) return force_bytes(chr(length))
out = "" out = b""
while length > 0: while length > 0:
out = chr(length & 0xff) + out out = force_bytes(chr(length & 0xff)) + out
length >>= 8 length >>= 8
out = chr(len(out) | 0x80) + out out = force_bytes(chr(len(out) | 0x80)) + out
return out return out
def der_encode_tlv(tag, value): def der_encode_tlv(tag, value):
# type: (int, str) -> str # type: (int, bytes) -> bytes
return chr(tag) + der_encode_length(len(value)) + value return force_bytes(chr(tag)) + der_encode_length(len(value)) + value
def der_encode_integer_value(val): def der_encode_integer_value(val):
# type: (int) -> str # type: (int) -> bytes
if not isinstance(val, six.integer_types): if not isinstance(val, six.integer_types):
raise TypeError("int") raise TypeError("int")
# base 256, MSB first, two's complement, minimum number of octets # base 256, MSB first, two's complement, minimum number of octets
@@ -64,45 +65,45 @@ def der_encode_integer_value(val):
# Special-case to avoid an empty encoding. # Special-case to avoid an empty encoding.
if val == 0: if val == 0:
return "\x00" return b"\x00"
sign = 0 # What you would get if you sign-extended the current high bit. sign = 0 # What you would get if you sign-extended the current high bit.
out = "" out = b""
# We can stop once sign-extension matches the remaining value. # We can stop once sign-extension matches the remaining value.
while val != sign: while val != sign:
byte = val & 0xff byte = val & 0xff
out = chr(byte) + out out = force_bytes(chr(byte)) + out
sign = -1 if byte & 0x80 == 0x80 else 0 sign = -1 if byte & 0x80 == 0x80 else 0
val >>= 8 val >>= 8
return out return out
def der_encode_integer(val): def der_encode_integer(val):
# type: (int) -> str # type: (int) -> bytes
return der_encode_tlv(0x02, der_encode_integer_value(val)) return der_encode_tlv(0x02, der_encode_integer_value(val))
def der_encode_int32(val): def der_encode_int32(val):
# type: (int) -> str # type: (int) -> bytes
if val < -2147483648 or val > 2147483647: if val < -2147483648 or val > 2147483647:
raise ValueError("Bad value") raise ValueError("Bad value")
return der_encode_integer(val) return der_encode_integer(val)
def der_encode_uint32(val): def der_encode_uint32(val):
# type: (int) -> str # type: (int) -> bytes
if val < 0 or val > 4294967295: if val < 0 or val > 4294967295:
raise ValueError("Bad value") raise ValueError("Bad value")
return der_encode_integer(val) return der_encode_integer(val)
def der_encode_string(val): def der_encode_string(val):
# type: (Text) -> str # type: (Text) -> bytes
if not isinstance(val, Text): if not isinstance(val, Text):
raise TypeError("unicode") raise TypeError("unicode")
return der_encode_tlv(0x1b, val.encode("utf-8")) return der_encode_tlv(0x1b, val.encode("utf-8"))
def der_encode_octet_string(val): def der_encode_octet_string(val):
# type: (str) -> str # type: (bytes) -> bytes
if not isinstance(val, str): if not isinstance(val, bytes):
raise TypeError("str") raise TypeError("bytes")
return der_encode_tlv(0x04, val) return der_encode_tlv(0x04, val)
def der_encode_sequence(tlvs, tagged=True): def der_encode_sequence(tlvs, tagged=True):
# type: (List[str], Optional[bool]) -> str # type: (List[bytes], Optional[bool]) -> bytes
body = [] body = []
for i, tlv in enumerate(tlvs): for i, tlv in enumerate(tlvs):
# Missing optional elements represented as None. # Missing optional elements represented as None.
@@ -112,10 +113,10 @@ def der_encode_sequence(tlvs, tagged=True):
# Assume kerberos-style explicit tagging of components. # Assume kerberos-style explicit tagging of components.
tlv = der_encode_tlv(0xa0 | i, tlv) tlv = der_encode_tlv(0xa0 | i, tlv)
body.append(tlv) body.append(tlv)
return der_encode_tlv(0x30, "".join(body)) return der_encode_tlv(0x30, b"".join(body))
def der_encode_ticket(tkt): def der_encode_ticket(tkt):
# type: (Dict[str, Any]) -> str # type: (Dict[str, Any]) -> bytes
return der_encode_tlv( return der_encode_tlv(
0x61, # Ticket 0x61, # Ticket
der_encode_sequence( der_encode_sequence(
@@ -138,17 +139,17 @@ def der_encode_ticket(tkt):
# http://www.gnu.org/software/shishi/manual/html_node/The-Credential-Cache-Binary-File-Format.html # http://www.gnu.org/software/shishi/manual/html_node/The-Credential-Cache-Binary-File-Format.html
def ccache_counted_octet_string(data): def ccache_counted_octet_string(data):
# type: (str) -> bytes # type: (bytes) -> bytes
if not isinstance(data, str): if not isinstance(data, bytes):
raise TypeError("str") raise TypeError("bytes")
return struct.pack("!I", len(data)) + data return struct.pack("!I", len(data)) + data
def ccache_principal(name, realm): def ccache_principal(name, realm):
# type: (Dict[str, str], str) -> str # type: (Dict[str, str], str) -> bytes
header = struct.pack("!II", name["nameType"], len(name["nameString"])) header = struct.pack("!II", name["nameType"], len(name["nameString"]))
return (header + ccache_counted_octet_string(realm.encode("utf-8")) + return (header + ccache_counted_octet_string(force_bytes(realm)) +
"".join(ccache_counted_octet_string(c.encode("utf-8")) b"".join(ccache_counted_octet_string(force_bytes(c))
for c in name["nameString"])) for c in name["nameString"]))
def ccache_key(key): def ccache_key(key):
# type: (Dict[str, str]) -> bytes # type: (Dict[str, str]) -> bytes
@@ -164,7 +165,7 @@ def flags_to_uint32(flags):
return ret return ret
def ccache_credential(cred): def ccache_credential(cred):
# type: (Dict[str, Any]) -> str # type: (Dict[str, Any]) -> bytes
out = ccache_principal(cred["cname"], cred["crealm"]) out = ccache_principal(cred["cname"], cred["crealm"])
out += ccache_principal(cred["sname"], cred["srealm"]) out += ccache_principal(cred["sname"], cred["srealm"])
out += ccache_key(cred["key"]) out += ccache_key(cred["key"])
@@ -179,11 +180,11 @@ def ccache_credential(cred):
out += struct.pack("!II", 0, 0) out += struct.pack("!II", 0, 0)
out += ccache_counted_octet_string(der_encode_ticket(cred["ticket"])) out += ccache_counted_octet_string(der_encode_ticket(cred["ticket"]))
# No second_ticket. # No second_ticket.
out += ccache_counted_octet_string("") out += ccache_counted_octet_string(b"")
return out return out
def make_ccache(cred): def make_ccache(cred):
# type: (Dict[str, Any]) -> str # type: (Dict[str, Any]) -> bytes
# Do we need a DeltaTime header? The ccache I get just puts zero # Do we need a DeltaTime header? The ccache I get just puts zero
# in there, so do the same. # in there, so do the same.
out = struct.pack("!HHHHII", out = struct.pack("!HHHHII",