mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 22:13:26 +00:00
There seems to have been a confusion between two different uses of the word “optional”: • An optional parameter may be omitted and replaced with a default value. • An Optional type has None as a possible value. Sometimes an optional parameter has a default value of None, or None is otherwise a meaningful value to provide, in which case it makes sense for the optional parameter to have an Optional type. But in other cases, optional parameters should not have Optional type. Fix them. Signed-off-by: Anders Kaseorg <anders@zulip.com>
190 lines
7.5 KiB
Python
190 lines
7.5 KiB
Python
import base64
|
|
import struct
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
# This file is adapted from samples/shellinabox/ssh-krb-wrapper in
|
|
# https://github.com/davidben/webathena, which has the following
|
|
# license:
|
|
#
|
|
# Copyright (c) 2013 David Benjamin and Alan Huang
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person
|
|
# obtaining a copy of this software and associated documentation files
|
|
# (the "Software"), to deal in the Software without restriction,
|
|
# including without limitation the rights to use, copy, modify, merge,
|
|
# publish, distribute, sublicense, and/or sell copies of the Software,
|
|
# and to permit persons to whom the Software is furnished to do so,
|
|
# subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
|
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
|
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
|
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
|
|
def force_bytes(s: Union[str, bytes], encoding: str='utf-8') -> bytes:
|
|
"""converts a string to binary string"""
|
|
if isinstance(s, bytes):
|
|
return s
|
|
elif isinstance(s, str):
|
|
return s.encode(encoding)
|
|
else:
|
|
raise TypeError("force_bytes expects a string type")
|
|
|
|
# Some DER encoding stuff. Bleh. This is because the ccache contains a
|
|
# DER-encoded krb5 Ticket structure, whereas Webathena deserializes
|
|
# into the various fields. Re-encoding in the client would be easy as
|
|
# there is already an ASN.1 implementation, but in the interest of
|
|
# limiting MIT Kerberos's exposure to malformed ccaches, encode it
|
|
# ourselves. To that end, here's the laziest DER encoder ever.
|
|
def der_encode_length(length: int) -> bytes:
|
|
if length <= 127:
|
|
return struct.pack('!B', length)
|
|
out = b""
|
|
while length > 0:
|
|
out = struct.pack('!B', length & 0xff) + out
|
|
length >>= 8
|
|
out = struct.pack('!B', len(out) | 0x80) + out
|
|
return out
|
|
|
|
def der_encode_tlv(tag: int, value: bytes) -> bytes:
|
|
return struct.pack('!B', tag) + der_encode_length(len(value)) + value
|
|
|
|
def der_encode_integer_value(val: int) -> bytes:
|
|
if not isinstance(val, int):
|
|
raise TypeError("int")
|
|
# base 256, MSB first, two's complement, minimum number of octets
|
|
# necessary. This has a number of annoying edge cases:
|
|
# * 0 and -1 are 0x00 and 0xFF, not the empty string.
|
|
# * 255 is 0x00 0xFF, not 0xFF
|
|
# * -256 is 0xFF 0x00, not 0x00
|
|
|
|
# Special-case to avoid an empty encoding.
|
|
if val == 0:
|
|
return b"\x00"
|
|
sign = 0 # What you would get if you sign-extended the current high bit.
|
|
out = b""
|
|
# We can stop once sign-extension matches the remaining value.
|
|
while val != sign:
|
|
byte = val & 0xff
|
|
out = struct.pack('!B', byte) + out
|
|
sign = -1 if byte & 0x80 == 0x80 else 0
|
|
val >>= 8
|
|
return out
|
|
|
|
def der_encode_integer(val: int) -> bytes:
|
|
return der_encode_tlv(0x02, der_encode_integer_value(val))
|
|
def der_encode_int32(val: int) -> bytes:
|
|
if val < -2147483648 or val > 2147483647:
|
|
raise ValueError("Bad value")
|
|
return der_encode_integer(val)
|
|
def der_encode_uint32(val: int) -> bytes:
|
|
if val < 0 or val > 4294967295:
|
|
raise ValueError("Bad value")
|
|
return der_encode_integer(val)
|
|
|
|
def der_encode_string(val: str) -> bytes:
|
|
if not isinstance(val, str):
|
|
raise TypeError("unicode")
|
|
return der_encode_tlv(0x1b, val.encode("utf-8"))
|
|
|
|
def der_encode_octet_string(val: bytes) -> bytes:
|
|
if not isinstance(val, bytes):
|
|
raise TypeError("bytes")
|
|
return der_encode_tlv(0x04, val)
|
|
|
|
def der_encode_sequence(tlvs: List[Optional[bytes]], tagged: bool=True) -> bytes:
|
|
body = []
|
|
for i, tlv in enumerate(tlvs):
|
|
# Missing optional elements represented as None.
|
|
if tlv is None:
|
|
continue
|
|
if tagged:
|
|
# Assume kerberos-style explicit tagging of components.
|
|
tlv = der_encode_tlv(0xa0 | i, tlv)
|
|
body.append(tlv)
|
|
return der_encode_tlv(0x30, b"".join(body))
|
|
|
|
def der_encode_ticket(tkt: Dict[str, Any]) -> bytes:
|
|
return der_encode_tlv(
|
|
0x61, # Ticket
|
|
der_encode_sequence(
|
|
[der_encode_integer(5), # tktVno
|
|
der_encode_string(tkt["realm"]),
|
|
der_encode_sequence( # PrincipalName
|
|
[der_encode_int32(tkt["sname"]["nameType"]),
|
|
der_encode_sequence([der_encode_string(c)
|
|
for c in tkt["sname"]["nameString"]],
|
|
tagged=False)]),
|
|
der_encode_sequence( # EncryptedData
|
|
[der_encode_int32(tkt["encPart"]["etype"]),
|
|
(der_encode_uint32(tkt["encPart"]["kvno"])
|
|
if "kvno" in tkt["encPart"]
|
|
else None),
|
|
der_encode_octet_string(
|
|
base64.b64decode(tkt["encPart"]["cipher"]))])]))
|
|
|
|
# Kerberos ccache writing code. Using format documentation from here:
|
|
# https://www.gnu.org/software/shishi/manual/html_node/The-Credential-Cache-Binary-File-Format.html
|
|
|
|
def ccache_counted_octet_string(data: bytes) -> bytes:
|
|
if not isinstance(data, bytes):
|
|
raise TypeError("bytes")
|
|
return struct.pack("!I", len(data)) + data
|
|
|
|
def ccache_principal(name: Dict[str, str], realm: str) -> bytes:
|
|
header = struct.pack("!II", name["nameType"], len(name["nameString"]))
|
|
return (header + ccache_counted_octet_string(force_bytes(realm)) +
|
|
b"".join(ccache_counted_octet_string(force_bytes(c))
|
|
for c in name["nameString"]))
|
|
|
|
def ccache_key(key: Dict[str, str]) -> bytes:
|
|
return (struct.pack("!H", key["keytype"]) +
|
|
ccache_counted_octet_string(base64.b64decode(key["keyvalue"])))
|
|
|
|
def flags_to_uint32(flags: List[str]) -> int:
|
|
ret = 0
|
|
for i, v in enumerate(flags):
|
|
if v:
|
|
ret |= 1 << (31 - i)
|
|
return ret
|
|
|
|
def ccache_credential(cred: Dict[str, Any]) -> bytes:
|
|
out = ccache_principal(cred["cname"], cred["crealm"])
|
|
out += ccache_principal(cred["sname"], cred["srealm"])
|
|
out += ccache_key(cred["key"])
|
|
out += struct.pack("!IIII",
|
|
cred["authtime"] // 1000,
|
|
cred.get("starttime", cred["authtime"]) // 1000,
|
|
cred["endtime"] // 1000,
|
|
cred.get("renewTill", 0) // 1000)
|
|
out += struct.pack("!B", 0)
|
|
out += struct.pack("!I", flags_to_uint32(cred["flags"]))
|
|
# TODO: Care about addrs or authdata? Former is "caddr" key.
|
|
out += struct.pack("!II", 0, 0)
|
|
out += ccache_counted_octet_string(der_encode_ticket(cred["ticket"]))
|
|
# No second_ticket.
|
|
out += ccache_counted_octet_string(b"")
|
|
return out
|
|
|
|
def make_ccache(cred: Dict[str, Any]) -> bytes:
|
|
# Do we need a DeltaTime header? The ccache I get just puts zero
|
|
# in there, so do the same.
|
|
out = struct.pack("!HHHHII",
|
|
0x0504, # file_format_version
|
|
12, # headerlen
|
|
1, # tag (DeltaTime)
|
|
8, # taglen (two uint32_ts)
|
|
0, 0, # time_offset / usec_offset
|
|
)
|
|
out += ccache_principal(cred["cname"], cred["crealm"])
|
|
out += ccache_credential(cred)
|
|
return out
|