import serial import struct import time from warnings import warn def crc8(data: bytes) -> int: crc = 0x00 for byte in data: crc ^= byte for _ in range(8): if crc & 0x80: crc = ((crc << 1) & 0xFF) ^ 0x07 else: crc = (crc << 1) & 0xFF return crc class I2CPCClient: def __init__(self, port, baudrate=115200, timeout=1, wake_bauds: list | None = None): if wake_bauds is None: wake_bauds = [921_600, 115_200, 9600, 19200, 576_000] self.ser = serial.Serial(port=port, baudrate=baudrate, timeout=timeout) def wake(baud: int | None): if baud: self.ser.baudrate = baud self.ser.write(b"~/") self.ser.write(b"\x00"*128) self.ser.flush() self.ser.write(b"~/") self.ser.flush() start = time.monotonic() while not ((d := self.ser.read_all()) and b"\xff" in d): time.sleep(0.01) if ((time.monotonic()) - start) > 2: return False return True if not wake(None): for baud in wake_bauds: if wake(baud): warn(f"Initial baud was changed to {baud}, due to the device being in it") break else: raise ConnectionError("Could not wake") def _send_packet(self, payload: bytes, crc: bool = True): if len(payload) > 0x7fff: raise ValueError("Payload too large") length_data = len(payload) | (0x8000 if crc else 0) data = struct.pack(">H", length_data) + payload if crc: data += bytes([crc8(data)]) self.ser.write(data) # Read response length resp_len_raw = self.ser.read(2) if not resp_len_raw: raise TimeoutError("No response") resp_len, *_ = struct.unpack(">H", resp_len_raw) resp_has_crc = bool(resp_len & 0x8000) resp_len &= 0x7FFF total_to_read = resp_len + (1 if resp_has_crc else 0) response = self.ser.read(total_to_read) if response[0] == 0xff and len(response) > 1: raise Exception(f"Error from device: {response[1]}") if len(response) != total_to_read: raise TimeoutError("Incomplete response") if resp_has_crc: received_crc = response[-1] body = resp_len_raw + response[:-1] if crc8(body) != received_crc: raise ValueError("CRC mismatch") response = response[:-1] return response[1:] def set_clock(self, clock_hz: int): payload = bytes([0]) + struct.pack(">I", clock_hz) return self._send_packet(payload) def write_i2c(self, addr: int, data: bytes): payload = bytes([1, addr]) + data return self._send_packet(payload, False) def write_read_i2c(self, addr: int, write_data: bytes, read_len: int): payload = bytes([2, addr, len(write_data)]) + write_data + bytes([read_len]) return self._send_packet(payload, False) def write_eeprom(self, addr: int, data: bytes): payload = bytes([7]) + struct.pack(">H", addr) + data return self._send_packet(payload, False) def read_eeprom(self, addr: int, len: int): payload = bytes([8]) + struct.pack(">H", addr) + bytes([len]) return self._send_packet(payload, False) def version(self): return self._send_packet(bytes([4])) def quit(self): return self._send_packet(bytes([3])) def reboot(self): return self._send_packet(bytes([5])) def get_persistence_address(self): return self._send_packet(bytes([0xfe])) def set_baudrate(self, baud: int): payload = bytes([6]) + struct.pack(">I", baud) out = self._send_packet(payload) self.ser.baudrate = baud return out def close(self): self.ser.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()