You've already forked TEF6686_Driver
104 lines
3.8 KiB
Python
104 lines
3.8 KiB
Python
import serial
|
|
import struct
|
|
import time
|
|
from warnings import warn
|
|
|
|
def crc8(data: bytes) -> int:
|
|
crc = 0x00
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x80: crc = ((crc << 1) & 0xFF) ^ 0x07
|
|
else: crc = (crc << 1) & 0xFF
|
|
return crc
|
|
|
|
class I2CPCClient:
|
|
def __init__(self, port, baudrate=115200, timeout=1, wake_bauds: list | None = None):
|
|
if wake_bauds is None: wake_bauds = [921_600, 115_200, 9600, 19200, 576_000]
|
|
self.ser = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
|
|
|
|
def wake(baud: int | None):
|
|
if baud: self.ser.baudrate = baud
|
|
self.ser.write(b"~/")
|
|
self.ser.write(b"\x00"*128)
|
|
self.ser.flush()
|
|
self.ser.write(b"~/")
|
|
self.ser.flush()
|
|
start = time.monotonic()
|
|
while not ((d := self.ser.read_all()) and b"\xff" in d):
|
|
time.sleep(0.01)
|
|
if ((time.monotonic()) - start) > 2: return False
|
|
return True
|
|
if not wake(None):
|
|
for baud in wake_bauds:
|
|
if wake(baud):
|
|
warn(f"Initial baud was changed to {baud}, due to the device being in it")
|
|
break
|
|
else: raise ConnectionError("Could not wake")
|
|
|
|
def _send_packet(self, payload: bytes, crc: bool = True):
|
|
if len(payload) > 0x7fff: raise ValueError("Payload too large")
|
|
|
|
length_data = len(payload) | (0x8000 if crc else 0)
|
|
data = struct.pack(">H", length_data) + payload
|
|
if crc: data += bytes([crc8(data)])
|
|
self.ser.write(data)
|
|
|
|
# Read response length
|
|
resp_len_raw = self.ser.read(2)
|
|
if not resp_len_raw: raise TimeoutError("No response")
|
|
|
|
resp_len, *_ = struct.unpack(">H", resp_len_raw)
|
|
resp_has_crc = bool(resp_len & 0x8000)
|
|
resp_len &= 0x7FFF
|
|
|
|
total_to_read = resp_len + (1 if resp_has_crc else 0)
|
|
|
|
response = self.ser.read(total_to_read)
|
|
if response[0] == 0xff and len(response) > 1: raise Exception(f"Error from device: {response[1]}")
|
|
if len(response) != total_to_read: raise TimeoutError("Incomplete response")
|
|
|
|
if resp_has_crc:
|
|
received_crc = response[-1]
|
|
body = resp_len_raw + response[:-1]
|
|
if crc8(body) != received_crc: raise ValueError("CRC mismatch")
|
|
response = response[:-1]
|
|
return response[1:]
|
|
|
|
def set_clock(self, clock_hz: int):
|
|
payload = bytes([0]) + struct.pack(">I", clock_hz)
|
|
return self._send_packet(payload)
|
|
|
|
def write_i2c(self, addr: int, data: bytes):
|
|
payload = bytes([1, addr]) + data
|
|
return self._send_packet(payload, False)
|
|
|
|
def write_read_i2c(self, addr: int, write_data: bytes, read_len: int):
|
|
payload = bytes([2, addr, len(write_data)]) + write_data + bytes([read_len])
|
|
return self._send_packet(payload, False)
|
|
|
|
def write_eeprom(self, addr: int, data: bytes):
|
|
payload = bytes([7]) + struct.pack(">H", addr) + data
|
|
return self._send_packet(payload, False)
|
|
def read_eeprom(self, addr: int, len: int):
|
|
payload = bytes([8]) + struct.pack(">H", addr) + bytes([len])
|
|
return self._send_packet(payload, False)
|
|
|
|
def version(self): return self._send_packet(bytes([4]))
|
|
|
|
def quit(self): return self._send_packet(bytes([3]))
|
|
|
|
def reboot(self): return self._send_packet(bytes([5]))
|
|
|
|
def get_persistence_address(self): return self._send_packet(bytes([0xfe]))
|
|
|
|
def set_baudrate(self, baud: int):
|
|
payload = bytes([6]) + struct.pack(">I", baud)
|
|
out = self._send_packet(payload)
|
|
self.ser.baudrate = baud
|
|
return out
|
|
|
|
def close(self): self.ser.close()
|
|
|
|
def __enter__(self): return self
|
|
def __exit__(self, exc_type, exc_val, exc_tb): self.close() |