From 8e11d6239db1801e984c1e6ad010dce2102dcb6b Mon Sep 17 00:00:00 2001 From: KubaPro010 Date: Thu, 19 Feb 2026 23:12:44 +0100 Subject: [PATCH] crc in protocol --- protocol.py | 50 +++++++++++++++++++++++++++++++++++++++++--------- test.py | 1 + 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/protocol.py b/protocol.py index 187e648..2fdb9f0 100644 --- a/protocol.py +++ b/protocol.py @@ -2,26 +2,56 @@ import serial import struct import time +def crc8(data: bytes) -> int: + crc = 0x00 + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: crc = ((crc << 1) & 0xFF) ^ 0x07 + else: crc = (crc << 1) & 0xFF + return crc + class I2CPCClient: def __init__(self, port, baudrate=115200, timeout=1): self.ser = serial.Serial(port=port, baudrate=baudrate, timeout=timeout) - self.ser.write(b"~/") - while not ((d := self.ser.read_all()) and (b"\x01" in d or b"\xff" in d)): time.sleep(0.01) - def _send_packet(self, payload: bytes): - self.ser.write(bytes([len(payload)]) + payload) + # Gurantees resync + self.ser.write(b"~/") + self.ser.write(b"\x00"*64) + self.ser.flush() + self.ser.write(b"~/") + self.ser.flush() + while not ((d := self.ser.read_all()) and b"\xff" in d): time.sleep(0.01) + + def _send_packet(self, payload: bytes, crc: bool = True): + if len(payload) > 127: raise ValueError("Payload too large") + + length_byte = len(payload) | (0x80 if crc else 0x00) + data = bytes([length_byte]) + payload + if crc: data += bytes([crc8(data)]) + self.ser.write(data) # Read response length resp_len_raw = self.ser.read(1) - if not resp_len_raw: - raise TimeoutError("Error") + if not resp_len_raw: raise TimeoutError("No response") resp_len = resp_len_raw[0] - response = self.ser.read(resp_len) + resp_has_crc = bool(resp_len & 0x80) + resp_len &= 0x7F - if len(response) != resp_len: raise TimeoutError("Incomplete response") + total_to_read = resp_len + (1 if resp_has_crc else 0) - return response + response = self.ser.read(total_to_read) + if len(response) != total_to_read: raise TimeoutError("Incomplete response") + + if resp_has_crc: + received_crc = response[-1] + body = resp_len_raw + response[:-1] + if crc8(body) != received_crc: raise ValueError("CRC mismatch") + response = response[:-1] + + return response + def set_clock(self, clock_hz: int): payload = bytes([0]) + struct.pack(">I", clock_hz) @@ -48,6 +78,8 @@ class I2CPCClient: def reboot(self): return self._send_packet(bytes([5])) + def get_persistence_address(self): return self._send_packet(bytes([0xfe])) + def set_baudrate(self, baud: int): payload = bytes([6]) + struct.pack(">I", baud) out = self._send_packet(payload) diff --git a/test.py b/test.py index 1bfd951..a45195a 100644 --- a/test.py +++ b/test.py @@ -2,6 +2,7 @@ from tef import TEF6686 from protocol import I2CPCClient p = I2CPCClient("COM17") +print(p._send_packet(bytes([4]))) tef = TEF6686(p) tef.init() tef.FM_Tune_To(1, 9500)