|
| 1 | +# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA <http://www.arduino.cc> |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MPL-2.0 |
| 4 | + |
| 5 | +import base64 |
| 6 | +import time |
| 7 | +import hashlib |
| 8 | +import hmac |
| 9 | +import secrets |
| 10 | +import struct |
| 11 | +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 |
| 12 | + |
| 13 | +from arduino.app_utils.logger import Logger |
| 14 | + |
| 15 | +logger = Logger("BPPCodec") |
| 16 | + |
| 17 | +BPP_VERSION = 0x00 |
| 18 | +MODE_NONE = 0x00 |
| 19 | +MODE_SIGN = 0x01 |
| 20 | +MODE_ENC = 0x02 |
| 21 | + |
| 22 | +# Big-endian header: [Version:1] [Mode:1] [Timestamp:8] [Random:4] |
| 23 | +HEADER_FORMAT = ">BBQL" |
| 24 | +HEADER_SIZE = struct.calcsize(HEADER_FORMAT) # 14 bytes |
| 25 | +WINDOW_US = 10_000_000 # 10s in µs |
| 26 | + |
| 27 | + |
| 28 | +class ReplayProtection: |
| 29 | + """ |
| 30 | + Manages the sliding window replay protection and the temporary cache storing |
| 31 | + the IVs already seen within the validity window. |
| 32 | + """ |
| 33 | + |
| 34 | + def __init__(self, window_us: int = WINDOW_US): |
| 35 | + self.window_us = window_us |
| 36 | + self.cache: dict[bytes, int] = {} # IV -> Expiration timestamp |
| 37 | + |
| 38 | + def check_and_update(self, iv: bytes, timestamp_us: int) -> bool: |
| 39 | + """ |
| 40 | + Determines if the message is valid by assessing replay attack conditions: |
| 41 | + timestamp out of validity window and IV reuse. |
| 42 | + """ |
| 43 | + now = time.time_ns() // 1_000 |
| 44 | + |
| 45 | + # Check time window |
| 46 | + if abs(now - timestamp_us) > self.window_us: |
| 47 | + logger.warning(f"Message outside validity window. Drift: {(now - timestamp_us) / 1000}ms") |
| 48 | + return False |
| 49 | + |
| 50 | + # Check IV reuse |
| 51 | + if iv in self.cache: |
| 52 | + logger.warning("IV reuse detected") |
| 53 | + return False |
| 54 | + |
| 55 | + # Prune old entries if cache grows too large |
| 56 | + if len(self.cache) > 1000: |
| 57 | + self._prune(now) |
| 58 | + |
| 59 | + self.cache[iv] = now + self.window_us |
| 60 | + |
| 61 | + return True |
| 62 | + |
| 63 | + def _prune(self, now: int): |
| 64 | + # Remove expired entries |
| 65 | + expired_keys = [k for k, v in self.cache.items() if now > v] |
| 66 | + for k in expired_keys: |
| 67 | + del self.cache[k] |
| 68 | + |
| 69 | + |
| 70 | +class BPPCodec: |
| 71 | + """ |
| 72 | + Binary Peripheral Protocol (BPP) Codec. |
| 73 | + Implements a secure container format for peripherals and allows to encode and |
| 74 | + decode payloads. |
| 75 | + This codec is intended to be used with message-based protocols, i.e. with builtin |
| 76 | + message boundaries (e.g., WebSocket). If used with stream-based protocols (e.g., |
| 77 | + TCP, BLE, UART), it must be wrapped in BPPStreamCodec. |
| 78 | +
|
| 79 | + The protocol supports three security modes: |
| 80 | + - Mode 0: No Security; |
| 81 | + - Mode 1: HMAC-SHA256 Signing, useful for authentication and data integrity; |
| 82 | + - Mode 2: ChaCha20-Poly1305 Encryption and Signing, providing confidentiality, |
| 83 | + authentication and data integrity. |
| 84 | +
|
| 85 | + The binary format is as follows: |
| 86 | +
|
| 87 | + [Version (1)] [Mode (1)] [Timestamp (8)] [Random (4)] [Payload (Var)] [AuthTag/Sig (16/32)] |
| 88 | +
|
| 89 | + - Version: Protocol version (currently 0x01). |
| 90 | + - Mode: Security mode (0x00: None, 0x01: HMAC-SHA256, 0x02: ChaCha20-Poly1305). |
| 91 | + - Timestamp: Microsecond-precision timestamp (Unix epoch). |
| 92 | + - Random: 32-bit random value for uniqueness. |
| 93 | + - Payload: Actual data being transmitted. |
| 94 | + - AuthTag/Sig: HMAC signature (32 bytes for Mode 1) or AuthTag (16 bytes for Mode 2). |
| 95 | +
|
| 96 | + Text-safe encoding/decoding via Base64URL are also provided. |
| 97 | + """ |
| 98 | + |
| 99 | + def __init__(self, secret: str = "", enable_encryption: bool = False): |
| 100 | + """ |
| 101 | + Initialize codec. |
| 102 | +
|
| 103 | + Args: |
| 104 | + secret: Pre-shared secret. Default: empty (no security). |
| 105 | + enable_encryption: If True, uses ChaCha20-Poly1305. If False, uses HMAC-SHA256 if |
| 106 | + secret is provided. Default: False. |
| 107 | + """ |
| 108 | + self.secret = secret.encode() if secret else b"" |
| 109 | + self.enable_encryption = enable_encryption and bool(secret) |
| 110 | + self.cc_cipher = None |
| 111 | + |
| 112 | + if self.enable_encryption: |
| 113 | + # Derive 32-byte key for ChaCha20 |
| 114 | + key = hashlib.sha256(self.secret).digest() |
| 115 | + self.cc_cipher = ChaCha20Poly1305(key) |
| 116 | + |
| 117 | + self.replay_protection = ReplayProtection() |
| 118 | + |
| 119 | + def encode(self, data: bytes) -> bytes: |
| 120 | + """ |
| 121 | + Packs data into a BPP message and returns its bytes. |
| 122 | +
|
| 123 | + Args: |
| 124 | + data: The payload to encode. |
| 125 | + Returns: |
| 126 | + The complete BPP message (bytes). |
| 127 | + """ |
| 128 | + # Assemble the header |
| 129 | + mode = MODE_ENC if self.enable_encryption else (MODE_SIGN if self.secret else MODE_NONE) |
| 130 | + timestamp_us = time.time_ns() // 1_000 |
| 131 | + random_val = secrets.randbits(32) |
| 132 | + header = struct.pack(HEADER_FORMAT, BPP_VERSION, mode, timestamp_us, random_val) |
| 133 | + |
| 134 | + if mode == MODE_ENC and self.cc_cipher: |
| 135 | + # Encrypt with ChaCha20-Poly1305, use header as AAD |
| 136 | + # Note: cryptography lib appends the 16-byte Poly1305 AuthTag automatically |
| 137 | + iv = header[2:] # Last 12 bytes of header (Timestamp + Random) |
| 138 | + encrypted_payload = self.cc_cipher.encrypt(iv, data, header) |
| 139 | + return header + encrypted_payload |
| 140 | + |
| 141 | + elif mode == MODE_SIGN and self.secret: |
| 142 | + # HMAC Signature |
| 143 | + msg_to_sign = header + data |
| 144 | + signature = hmac.new(self.secret, msg_to_sign, hashlib.sha256).digest() |
| 145 | + return header + data + signature |
| 146 | + |
| 147 | + else: |
| 148 | + # No Security |
| 149 | + return header + data |
| 150 | + |
| 151 | + def decode(self, message: bytes) -> bytes | None: |
| 152 | + """ |
| 153 | + Unpacks a BPP message and returns its payload. |
| 154 | +
|
| 155 | + Args: |
| 156 | + message: The complete BPP message to decode. |
| 157 | + Returns: |
| 158 | + The decoded payload (bytes) if valid, else None. |
| 159 | + """ |
| 160 | + if len(message) < HEADER_SIZE: |
| 161 | + logger.warning("Message too short for header") |
| 162 | + return None |
| 163 | + |
| 164 | + try: |
| 165 | + ver, mode, timestamp_us, random_val = struct.unpack(HEADER_FORMAT, message[:HEADER_SIZE]) |
| 166 | + except struct.error: |
| 167 | + logger.warning("Header parsing failed") |
| 168 | + return None |
| 169 | + |
| 170 | + if ver != BPP_VERSION: |
| 171 | + logger.warning(f"Unsupported version {ver}") |
| 172 | + return None |
| 173 | + |
| 174 | + # Check expected minimum size |
| 175 | + footer_size = 16 if mode == MODE_ENC else (32 if mode == MODE_SIGN else 0) |
| 176 | + min_size = HEADER_SIZE + footer_size |
| 177 | + if len(message) < min_size: |
| 178 | + logger.warning("Message too short (truncated)") |
| 179 | + return None |
| 180 | + |
| 181 | + # Check for downgrade attacks |
| 182 | + if self.enable_encryption: |
| 183 | + if mode != MODE_ENC: |
| 184 | + logger.warning(f"Security mode mismatch: expected Mode 2 (encrypt), but received Mode {mode}.") |
| 185 | + return None |
| 186 | + elif self.secret: |
| 187 | + if mode != MODE_SIGN: |
| 188 | + logger.warning(f"Security mode mismatch: expected Mode 1 (sign), but received Mode {mode}.") |
| 189 | + return None |
| 190 | + else: |
| 191 | + if mode != MODE_NONE: |
| 192 | + logger.warning(f"Security mode mismatch: expected Mode 0 (none), but received Mode {mode}.") |
| 193 | + return None |
| 194 | + |
| 195 | + # Check for replay attacks |
| 196 | + replay_id = message[2:HEADER_SIZE] # Timestamp (8) + Random (4) |
| 197 | + if not self.replay_protection.check_and_update(replay_id, timestamp_us): |
| 198 | + return None |
| 199 | + |
| 200 | + header_bytes = message[:HEADER_SIZE] |
| 201 | + |
| 202 | + # Decrypt/verify |
| 203 | + try: |
| 204 | + if mode == MODE_ENC: |
| 205 | + iv = replay_id |
| 206 | + ciphertext_with_tag = message[HEADER_SIZE:] |
| 207 | + return self.cc_cipher.decrypt(iv, ciphertext_with_tag, header_bytes) |
| 208 | + |
| 209 | + elif mode == MODE_SIGN: |
| 210 | + if len(message) < HEADER_SIZE + 32: |
| 211 | + return None |
| 212 | + |
| 213 | + payload = message[HEADER_SIZE:-32] |
| 214 | + received_sig = message[-32:] |
| 215 | + |
| 216 | + msg_to_verify = header_bytes + payload |
| 217 | + expected_sig = hmac.new(self.secret, msg_to_verify, hashlib.sha256).digest() |
| 218 | + |
| 219 | + if not hmac.compare_digest(received_sig, expected_sig): |
| 220 | + logger.warning("HMAC verification failed") |
| 221 | + return None |
| 222 | + |
| 223 | + return payload |
| 224 | + |
| 225 | + elif mode == MODE_NONE: |
| 226 | + return message[HEADER_SIZE:] |
| 227 | + |
| 228 | + except Exception as e: |
| 229 | + logger.warning(f"Crypto Error: {e}") |
| 230 | + return None |
| 231 | + |
| 232 | + def encode_text(self, data: bytes) -> str: |
| 233 | + """ |
| 234 | + Encodes a text-safe BPP packet to a Base64URL string. |
| 235 | + """ |
| 236 | + binary_packet = self.encode(data) |
| 237 | + return base64.b64encode(binary_packet).decode("ascii") |
| 238 | + |
| 239 | + def decode_text(self, b64_str: str) -> bytes | None: |
| 240 | + """ |
| 241 | + Decodes a text-safe BPP packet from a Base64URL string. |
| 242 | + """ |
| 243 | + try: |
| 244 | + binary_packet = base64.b64decode(b64_str) |
| 245 | + return self.decode(binary_packet) |
| 246 | + |
| 247 | + except Exception as e: |
| 248 | + logger.warning(f"Text decode failed: {e}") |
| 249 | + return None |
0 commit comments