# Copyright Β© 2019 Damir JeliΔ <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import annotations
from datetime import datetime, timedelta
from enum import Enum
from itertools import zip_longest
from typing import List, Optional, Tuple
from uuid import uuid4
import olm
from ..api import Api
from ..event_builders import ToDeviceMessage
from ..events import KeyVerificationEvent, KeyVerificationStart
from ..exceptions import LocalProtocolError
from .device import OlmDevice
class SasState(Enum):
"""Short Authentication String enum.
This enum tracks the current state of our verification process.
"""
created = 0
started = 1
accepted = 2
key_received = 3
mac_received = 4
canceled = 5
[docs]
class Sas(olm.Sas):
"""Matrix Short Authentication String class.
This class implements a state machine to handle device verification using
short authentication strings.
Attributes:
we_started_it (bool): Is true if the verification process was started
by us, otherwise false.
sas_accepted (bool): Is true if we accepted that the short
authentication string matches on both devices.
verified_devices(List[str]): The list of device ids that were verified
during the verification process.
Args:
own_user (str): The user id of our own user.
own_device (str): The device id of our own user.
own_fp_key (str): The fingerprint key of our own device that will
be verified by the other client.
other_olm_device (OlmDevice): The OlmDevice which we would like to
verify.
transaction_id (str, optional): A string that will uniquely identify
this verification process. A random and unique string will be
generated if one isn't provided.
short_auth_string (List[str], optional): A list of valid short
authentication methods that the client would like to allow for this
authentication session. By default the 'emoji' and 'decimal'
methods are allowed.
"""
_sas_method_v1 = "m.sas.v1"
_key_agreement_v1 = "curve25519"
_key_agreement_v2 = "curve25519-hkdf-sha256"
_key_agreeemnt_protocols = [_key_agreement_v1, _key_agreement_v2]
_hash_v1 = "sha256"
_mac_normal = "hkdf-hmac-sha256"
_mac_old = "hmac-sha256"
_mac_v1 = [_mac_normal, _mac_old]
_strings_v1 = ["emoji", "decimal"]
_user_cancel_error = ("m.user", "Canceled by user")
_timeout_error = ("m.timeout", "Timed out")
_txid_error = ("m.unknown_transaction", "Unknown transaction")
_unknown_method_error = ("m.unknown_method", "Unknown method")
_unexpected_message_error = ("m.unexpected_message", "Unexpected message")
_key_mismatch_error = ("m.key_mismatch", "Key mismatch")
_user_mismatch_error = ("m.user_error", "User mismatch")
_invalid_message_error = ("m.invalid_message", "Invalid message")
_commitment_mismatch_error = (
"m.mismatched_commitment",
"Mismatched commitment",
)
_sas_mismatch_error = (
"m.mismatched_sas",
"Mismatched short authentication string",
)
_max_age = timedelta(minutes=5)
_max_event_timeout = timedelta(minutes=1)
emoji = [
("πΆ", "Dog"),
("π±", "Cat"),
("π¦", "Lion"),
("π", "Horse"),
("π¦", "Unicorn"),
("π·", "Pig"),
("π", "Elephant"),
("π°", "Rabbit"),
("πΌ", "Panda"),
("π", "Rooster"),
("π§", "Penguin"),
("π’", "Turtle"),
("π", "Fish"),
("π", "Octopus"),
("π¦", "Butterfly"),
("π·", "Flower"),
("π³", "Tree"),
("π΅", "Cactus"),
("π", "Mushroom"),
("π", "Globe"),
("π", "Moon"),
("βοΈ", "Cloud"),
("π₯", "Fire"),
("π", "Banana"),
("π", "Apple"),
("π", "Strawberry"),
("π½", "Corn"),
("π", "Pizza"),
("π", "Cake"),
("β€οΈ", "Heart"),
("π", "Smiley"),
("π€", "Robot"),
("π©", "Hat"),
("π", "Glasses"),
("π§", "Wrench"),
("π
", "Santa"),
("π", "Thumbs up"),
("βοΈ", "Umbrella"),
("β", "Hourglass"),
("β°", "Clock"),
("π", "Gift"),
("π‘", "Light Bulb"),
("π", "Book"),
("βοΈ", "Pencil"),
("π", "Paperclip"),
("βοΈ", "Scissors"),
("π", "Lock"),
("π", "Key"),
("π¨", "Hammer"),
("βοΈ", "Telephone"),
("π", "Flag"),
("π", "Train"),
("π²", "Bicycle"),
("βοΈ", "Airplane"),
("π", "Rocket"),
("π", "Trophy"),
("β½", "Ball"),
("πΈ", "Guitar"),
("πΊ", "Trumpet"),
("π", "Bell"),
("β", "Anchor"),
("π§", "Headphones"),
("π", "Folder"),
("π", "Pin"),
]
def __init__(
self,
own_user: str,
own_device: str,
own_fp_key: str,
other_olm_device: OlmDevice,
transaction_id: Optional[str] = None,
short_auth_string: Optional[List[str]] = None,
mac_methods: Optional[List[str]] = None,
):
self.own_user = own_user
self.own_device = own_device
self.own_fp_key = own_fp_key
self.other_olm_device = other_olm_device
self.transaction_id = transaction_id or str(uuid4())
self.short_auth_string = short_auth_string or ["emoji", "decimal"]
self.mac_methods = mac_methods or Sas._mac_v1
self.chosen_mac_method = ""
self.key_agreement_protocols = Sas._key_agreeemnt_protocols
self.chosen_key_agreement: Optional[str] = None
self.state = SasState.created
self.we_started_it = True
self.sas_accepted = False
self.commitment = None
self.cancel_reason = ""
self.cancel_code = ""
self.their_sas_key: Optional[str] = None
self.verified_devices: List[str] = []
self.creation_time = datetime.now()
self._last_event_time = self.creation_time
super().__init__()
[docs]
@classmethod
def from_key_verification_start(
cls,
own_user: str,
own_device: str,
own_fp_key: str,
other_olm_device: OlmDevice,
event: KeyVerificationStart,
) -> Sas:
"""Create a SAS object from a KeyVerificationStart event.
Args:
own_user (str): The user id of our own user.
own_device (str): The device id of our own user.
own_fp_key (str): The fingerprint key of our own device that will
be verified by the other client.
other_olm_device (OlmDevice): The Olm device of the other user that
should be verified.
event (KeyVerificationStart): The event that we received from the
other device to start the key verification process.
"""
obj = cls(
own_user,
own_device,
own_fp_key,
other_olm_device,
event.transaction_id,
event.short_authentication_string,
event.message_authentication_codes,
)
obj.we_started_it = False
obj.state = SasState.started
string_content = Api.to_canonical_json(event.source["content"])
obj.commitment = olm.sha256(obj.pubkey + string_content)
obj.key_agreement_protocols = event.key_agreement_protocols
if (
Sas._sas_method_v1 != event.method
or (
Sas._key_agreement_v1 not in event.key_agreement_protocols
and Sas._key_agreement_v2 not in event.key_agreement_protocols
)
or Sas._hash_v1 not in event.hashes
or (
Sas._mac_normal not in event.message_authentication_codes
and Sas._mac_old not in event.message_authentication_codes
)
or (
"emoji" not in event.short_authentication_string
and "decimal" not in event.short_authentication_string
)
):
obj.state = SasState.canceled
obj.cancel_code, obj.cancel_reason = obj._unknown_method_error
return obj
@property
def canceled(self) -> bool:
"""Is the verification request canceled."""
return self.state == SasState.canceled
@property
def timed_out(self) -> bool:
"""Did the verification process time out."""
if self.verified or self.canceled:
return False
now = datetime.now()
if (
now - self.creation_time >= self._max_age
or now - self._last_event_time >= self._max_event_timeout
):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._timeout_error
return True
return False
@property
def verified(self) -> bool:
"""Is the device verified and the request done."""
return self.state == SasState.mac_received and self.sas_accepted
def set_their_pubkey(self, pubkey: str):
self.their_sas_key = pubkey
super().set_their_pubkey(pubkey)
[docs]
def accept_sas(self):
"""Accept the short authentication string."""
if self.state == SasState.canceled:
raise LocalProtocolError(
"Key verification process was canceled "
"can't accept short authentication "
"string"
)
if not self.other_key_set:
raise LocalProtocolError(
"Other public key isn't set yet, can't "
"generate nor accept a short "
"authentication string."
)
self.sas_accepted = True
[docs]
def reject_sas(self):
"""Reject the authentication string."""
if not self.other_key_set:
raise LocalProtocolError(
"Other public key isn't set yet, can't "
"generate nor reject a short "
"authentication string."
)
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._sas_mismatch_error
[docs]
def cancel(self):
"""Cancel the authentication process."""
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._user_cancel_error
def _check_commitment(self, key: str):
assert self.commitment
calculated_commitment = olm.sha256(
key + Api.to_canonical_json(self.start_verification().content)
)
return self.commitment == calculated_commitment
def _grouper(self, iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks."""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
@property
def _extra_info_v1(self) -> str:
device = self.other_olm_device
tx_id = self.transaction_id
our_info = f"{self.own_user}{self.own_device}"
their_info = f"{device.user_id}{device.device_id}"
if self.we_started_it:
return f"MATRIX_KEY_VERIFICATION_SAS{our_info}{their_info}{tx_id}"
else:
return f"MATRIX_KEY_VERIFICATION_SAS{their_info}{our_info}{tx_id}"
@property
def _extra_info_v2(self) -> str:
device = self.other_olm_device
tx_id = self.transaction_id
assert self.their_sas_key
our_info = f"{self.own_user}|{self.own_device}|{self.pubkey}"
their_info = f"{device.user_id}|{device.device_id}|{self.their_sas_key}"
if self.we_started_it:
return f"MATRIX_KEY_VERIFICATION_SAS|{our_info}|{their_info}|{tx_id}"
else:
return f"MATRIX_KEY_VERIFICATION_SAS|{their_info}|{our_info}|{tx_id}"
@property
def _extra_info(self) -> str:
if self.chosen_key_agreement == Sas._key_agreement_v1:
return self._extra_info_v1
elif self.chosen_key_agreement == Sas._key_agreement_v2:
return self._extra_info_v2
raise ValueError(f"Unknown key agreement protocol {self.chosen_key_agreement}")
[docs]
def get_emoji(self) -> List[Tuple[str, str]]:
"""Get the emoji short authentication string.
Returns a list of tuples that contain the emoji and the description of
the emoji of the short authentication string.
"""
return self._generate_emoji(self._extra_info)
[docs]
def get_decimals(self) -> Tuple[int, ...]:
"""Get the decimal short authentication string.
Returns a tuple that contains three 4 digit integer numbers that
represent the short authentication string.
"""
return self._generate_decimals(self._extra_info)
def _generate_emoji(self, extra_info: str) -> List[Tuple[str, str]]:
"""Create a list of emojies from our shared secret."""
generated_bytes = self.generate_bytes(extra_info, 6)
number = "".join([format(x, "08b") for x in bytes(generated_bytes)])
return [
self.emoji[int(x, 2)]
for x in map("".join, list(self._grouper(number[:42], 6)))
]
def _generate_decimals(self, extra_info: str) -> Tuple[int, ...]:
"""Create a decimal number from our shared secret."""
generated_bytes = self.generate_bytes(extra_info, 5)
number = "".join([format(x, "08b") for x in bytes(generated_bytes)])
return tuple(
int(x, 2) + 1000 for x in map("".join, list(self._grouper(number[:-1], 13)))
)
[docs]
def start_verification(self) -> ToDeviceMessage:
"""Create a content dictionary to start the verification."""
if not self.we_started_it:
raise LocalProtocolError(
"Verification was not started by us, "
"can't send start verification message."
)
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, "
"can't send start verification message."
)
content = {
"from_device": self.own_device,
"method": self._sas_method_v1,
"transaction_id": self.transaction_id,
"key_agreement_protocols": Sas._key_agreeemnt_protocols,
"hashes": [self._hash_v1],
"message_authentication_codes": self._mac_v1,
"short_authentication_string": self._strings_v1,
}
message = ToDeviceMessage(
"m.key.verification.start",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
[docs]
def accept_verification(self) -> ToDeviceMessage:
"""Create a content dictionary to accept the verification offer."""
if self.we_started_it:
raise LocalProtocolError(
"Verification was started by us, can't " "accept offer."
)
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "accept offer."
)
sas_methods = []
if "emoji" in self.short_auth_string:
sas_methods.append("emoji")
if "decimal" in self.short_auth_string:
sas_methods.append("decimal")
if self._mac_normal in self.mac_methods:
self.chosen_mac_method = self._mac_normal
else:
self.chosen_mac_method = self._mac_old
if Sas._key_agreement_v2 in self.key_agreement_protocols:
self.chosen_key_agreement = Sas._key_agreement_v2
else:
self.chosen_key_agreement = Sas._key_agreement_v1
content = {
"transaction_id": self.transaction_id,
"key_agreement_protocol": self.chosen_key_agreement,
"hash": self._hash_v1,
"message_authentication_code": self.chosen_mac_method,
"short_authentication_string": sas_methods,
"commitment": self.commitment,
}
message = ToDeviceMessage(
"m.key.verification.accept",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
[docs]
def share_key(self) -> ToDeviceMessage:
"""Create a dictionary containing our public key."""
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "share our public key."
)
content = {"transaction_id": self.transaction_id, "key": self.pubkey}
message = ToDeviceMessage(
"m.key.verification.key",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
[docs]
def get_mac(self) -> ToDeviceMessage:
"""Create a dictionary containing our MAC."""
if not self.sas_accepted:
raise LocalProtocolError("SAS string wasn't yet accepted")
if self.state == SasState.canceled:
raise LocalProtocolError(
"SAS verification was canceled, can't " "generate MAC."
)
key_id = f"ed25519:{self.own_device}"
assert self.chosen_mac_method
if self.chosen_mac_method == self._mac_normal:
calculate_mac = self.calculate_mac
elif self.chosen_mac_method == self._mac_old:
calculate_mac = self.calculate_mac_long_kdf
info = (
"MATRIX_KEY_VERIFICATION_MAC"
f"{self.own_user}{self.own_device}"
f"{self.other_olm_device.user_id}{self.other_olm_device.id}{self.transaction_id}"
)
mac = {key_id: calculate_mac(self.own_fp_key, info + key_id)}
content = {
"mac": mac,
"keys": calculate_mac(key_id, info + "KEY_IDS"),
"transaction_id": self.transaction_id,
}
message = ToDeviceMessage(
"m.key.verification.mac",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
[docs]
def get_cancellation(self) -> ToDeviceMessage:
"""Create a dictionary containing our verification cancellation."""
if self.state != SasState.canceled:
raise LocalProtocolError("Sas process isn't canceled.")
assert self.cancel_code
assert self.cancel_reason
content = {
"code": self.cancel_code,
"reason": self.cancel_reason,
"transaction_id": self.transaction_id,
}
message = ToDeviceMessage(
"m.key.verification.cancel",
self.other_olm_device.user_id,
self.other_olm_device.id,
content,
)
return message
def _event_ok(self, event: KeyVerificationEvent):
if self.state == SasState.canceled:
return False
if event.transaction_id != self.transaction_id:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._txid_error
return False
if self.other_olm_device.user_id != event.sender:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._user_mismatch_error
return False
return True
[docs]
def receive_accept_event(self, event):
"""Receive a KeyVerificationAccept event."""
if not self._event_ok(event):
return
if self.state != SasState.created:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = Sas._unexpected_message_error
return
if (
event.key_agreement_protocol not in Sas._key_agreeemnt_protocols
or event.hash != Sas._hash_v1
or event.message_authentication_code not in Sas._mac_v1
or (
"emoji" not in event.short_authentication_string
and "decimal" not in event.short_authentication_string
)
):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = Sas._unknown_method_error
return
self.commitment = event.commitment
self.chosen_mac_method = event.message_authentication_code
self.chosen_key_agreement = event.key_agreement_protocol
self.short_auth_string = event.short_authentication_string
self.state = SasState.accepted
[docs]
def receive_key_event(self, event):
"""Receive a KeyVerificationKey event."""
if self.other_key_set or (
(self.state != SasState.started) and (self.state != SasState.accepted)
):
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._unexpected_message_error
return
if not self._event_ok(event):
return
if self.we_started_it:
if not self._check_commitment(event.key):
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._commitment_mismatch_error
return
self.set_their_pubkey(event.key)
self.state = SasState.key_received
[docs]
def receive_mac_event(self, event):
"""Receive a KeyVerificationMac event.
Args:
event (KeyVerificationMac): The MAC event that was received for
this SAS session.
"""
if self.verified:
return
if not self._event_ok(event):
return
if self.state != SasState.key_received:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = Sas._unexpected_message_error
return
info = (
f"MATRIX_KEY_VERIFICATION_MAC{self.other_olm_device.user_id}{self.other_olm_device.id}"
f"{self.own_user}{self.own_device}{self.transaction_id}"
)
key_ids = ",".join(sorted(event.mac.keys()))
assert self.chosen_mac_method
if self.chosen_mac_method == self._mac_normal:
calculate_mac = self.calculate_mac
elif self.chosen_mac_method == self._mac_old:
calculate_mac = self.calculate_mac_long_kdf
if event.keys != calculate_mac(key_ids, info + "KEY_IDS"):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
for key_id, key_mac in event.mac.items():
try:
key_type, device_id = key_id.split(":", 2)
except ValueError:
self.state = SasState.canceled
(
self.cancel_code,
self.cancel_reason,
) = self._invalid_message_error
return
if key_type != "ed25519":
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
if device_id != self.other_olm_device.id:
continue
other_fp_key = self.other_olm_device.ed25519
if key_mac != calculate_mac(other_fp_key, info + key_id):
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
return
self.verified_devices.append(device_id)
if not self.verified_devices:
self.state = SasState.canceled
self.cancel_code, self.cancel_reason = self._key_mismatch_error
self.state = SasState.mac_received