From 7daa4d6e9439c3e1cb1621c5aafca79696f129d4 Mon Sep 17 00:00:00 2001 From: KubaPro010 Date: Sat, 21 Feb 2026 11:32:24 +0100 Subject: [PATCH] minor changes in functionality --- base_tef.py | 43 ++++---- patches/__init__.py | 2 + tef_102_patch.py => patches/tef_102_patch.py | 0 tef_205_patch.py => patches/tef_205_patch.py | 0 protocol.py | 5 +- tef.py | 6 +- test.py | 28 +++-- xrd.py | 106 +++++++++++-------- 8 files changed, 107 insertions(+), 83 deletions(-) create mode 100644 patches/__init__.py rename tef_102_patch.py => patches/tef_102_patch.py (100%) rename tef_205_patch.py => patches/tef_205_patch.py (100%) diff --git a/base_tef.py b/base_tef.py index ac96d46..8a555c8 100644 --- a/base_tef.py +++ b/base_tef.py @@ -1,6 +1,5 @@ from protocol import I2CPCClient, time -from tef_102_patch import * -from tef_205_patch import * +from patches import * from typing import Callable, TypeVar, ParamSpec, Concatenate from collections.abc import Callable as ABCCallable @@ -8,36 +7,36 @@ from collections.abc import Callable as ABCCallable P = ParamSpec("P") T = TypeVar("T") -ADDRESS = 0x64 class BaseTEF668X: - def __init__(self, p: I2CPCClient) -> None: + def __init__(self, p: I2CPCClient, address: int = 0x64) -> None: self.p = p p.set_baudrate(921600) self.p.set_clock(100_000) + self.address = address def close(self): self.p.set_baudrate(115200) self.p.close() def _reset(self): - self.p.write_i2c(ADDRESS, b"\x1e\x5a\x01\x5a\x5a") + self.p.write_i2c(self.address, b"\x1e\x5a\x01\x5a\x5a") def _write_firmware(self, patch: bytes | list[int], patch_lut: bytes | list[int]): self.p.set_clock(400_000) - self.p.write_i2c(ADDRESS, b"\x1c\x00\x00") - self.p.write_i2c(ADDRESS, b"\x1c\x00\x74") + self.p.write_i2c(self.address, b"\x1c\x00\x00") + self.p.write_i2c(self.address, b"\x1c\x00\x74") def send_patch(_patch: bytes): - for i in range(0, len(_patch), 64): + for i in range(0, len(_patch), 64): # More data in a single step is less data sent over serial data = _patch[i:i+64] - if self.p.write_i2c(ADDRESS, b"\x1b" + data)[0] != 0: raise Exception + if self.p.write_i2c(self.address, b"\x1b" + data)[0] != 0: raise Exception send_patch(bytes(patch)) - self.p.write_i2c(ADDRESS, b"\x1c\x00\x00") - self.p.write_i2c(ADDRESS, b"\x1c\x00\x75") + self.p.write_i2c(self.address, b"\x1c\x00\x00") + self.p.write_i2c(self.address, b"\x1c\x00\x75") send_patch(bytes(patch_lut)) - self.p.write_i2c(ADDRESS, b"\x1c\x00\x00") + self.p.write_i2c(self.address, b"\x1c\x00\x00") self.p.set_clock(100_000) def APPL_Get_Operation_Status(self): - data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2) + data = self.p.write_read_i2c(self.address, b"\x40\x80\x01", 2) while data[0] != 0: - data = self.p.write_read_i2c(ADDRESS, b"\x40\x80\x01", 2) + data = self.p.write_read_i2c(self.address, b"\x40\x80\x01", 2) time.sleep(0.01) return data[-1] @@ -45,8 +44,8 @@ class BaseTEF668X: def _base_command_wrapper(func: Callable[Concatenate["BaseTEF668X", P], tuple[bytes, int | None, ABCCallable[[bytes], T] | None]]) -> Callable[Concatenate["BaseTEF668X", P], bytes | T]: def inner(self: "BaseTEF668X", *args: P.args, **kwargs: P.kwargs ) -> bytes | T: data, read_bytes, out_parser = func(self, *args, **kwargs) - if read_bytes: data = self.p.write_read_i2c(ADDRESS, data, read_bytes) - else: data = self.p.write_i2c(ADDRESS, data) + if read_bytes: data = self.p.write_read_i2c(self.address, data, read_bytes) + else: data = self.p.write_i2c(self.address, data) if out_parser: return out_parser(data) return data return inner @@ -60,10 +59,14 @@ class BaseTEF668X: def init(self, patch = tef_102_patch, patch_lut = tef_102_patch_lut, clock: int = 9216000): self._reset() - while self.APPL_Get_Operation_Status() != 0: time.sleep(0.01) + while self.APPL_Get_Operation_Status() != 0: time.sleep(0.025) # TODO: timeouts self._write_firmware(patch, patch_lut) - self.p.write_i2c(ADDRESS, b"\x14\x00\x01") - while self.APPL_Get_Operation_Status() != 1: time.sleep(0.01) + self.p.write_i2c(self.address, b"\x14\x00\x01") + while self.APPL_Get_Operation_Status() != 1: time.sleep(0.025) if clock != 9216000: self.APPL_Set_ReferenceClock(clock, False) self.APPL_Activate() - while self.APPL_Get_Operation_Status() != 2: time.sleep(0.01) \ No newline at end of file + while self.APPL_Get_Operation_Status() != 2: time.sleep(0.025) + + def __enter__(self): return self + + def __exit__(self, exc_type, exc_val, exc_tb): self.close() \ No newline at end of file diff --git a/patches/__init__.py b/patches/__init__.py new file mode 100644 index 0000000..a5ab0c1 --- /dev/null +++ b/patches/__init__.py @@ -0,0 +1,2 @@ +from tef_102_patch import * +from tef_205_patch import * \ No newline at end of file diff --git a/tef_102_patch.py b/patches/tef_102_patch.py similarity index 100% rename from tef_102_patch.py rename to patches/tef_102_patch.py diff --git a/tef_205_patch.py b/patches/tef_205_patch.py similarity index 100% rename from tef_205_patch.py rename to patches/tef_205_patch.py diff --git a/protocol.py b/protocol.py index eaab92f..bd7e16f 100644 --- a/protocol.py +++ b/protocol.py @@ -90,4 +90,7 @@ class I2CPCClient: self.ser.baudrate = baud return out - def close(self): self.ser.close() \ No newline at end of file + def close(self): self.ser.close() + + def __enter__(self): return self + def __exit__(self, exc_type, exc_val, exc_tb): self.close() \ No newline at end of file diff --git a/tef.py b/tef.py index af42f86..7f3555f 100644 --- a/tef.py +++ b/tef.py @@ -1,5 +1,5 @@ from __future__ import annotations -from base_tef import BaseTEF668X, ADDRESS, Concatenate, Callable, ParamSpec +from base_tef import BaseTEF668X, Concatenate, Callable, ParamSpec from typing import overload, TypeVar from functools import wraps @@ -16,8 +16,8 @@ def _command_wrapper(func: Callable[Concatenate[TEF6686, P], tuple[bytes, int | def inner(self: TEF6686, *args: P.args, **kwargs: P.kwargs) -> bytes | T: data, read_bytes, out_parser = func(self, *args, **kwargs) - if read_bytes: data = self.p.write_read_i2c(ADDRESS, data, read_bytes) - else: data = self.p.write_i2c(ADDRESS, data) + if read_bytes: data = self.p.write_read_i2c(self.address, data, read_bytes) + else: data = self.p.write_i2c(self.address, data) if out_parser: return out_parser(data) return data return inner diff --git a/test.py b/test.py index 118c37a..bb5f2e9 100644 --- a/test.py +++ b/test.py @@ -1,24 +1,20 @@ from tef import TEF6686 from protocol import I2CPCClient +import time p = I2CPCClient("COM17") -print(p._send_packet(bytes([4]))) -tef = TEF6686(p) -tef.init(clock=12000000) -tef.AM_Tune_To(1, 225) -tef.AUDIO_Set_Mute(False) -tef.AUDIO_Set_Volume(70) -tef.FM_Set_MphSuppression(True) -tef.FM_Set_ChannelEqualizer(True) -# tef.FM_Set_Specials(1) -tef.FM_Set_Bandwidth(True) -import time -time.sleep(0.032) -try: +with TEF6686(p) as tef: + tef.init(clock=12000000) + tef.AM_Tune_To(1, 225) + tef.AUDIO_Set_Mute(False) + tef.AUDIO_Set_Volume(70) + tef.FM_Set_MphSuppression(True) + tef.FM_Set_ChannelEqualizer(True) + # tef.FM_Set_Specials(1) + tef.FM_Set_Bandwidth(True) + time.sleep(0.032) while True: status, level, usn, wam, offset, bandwidth, modulation = tef.FM_Get_Quality_Data() if not status or not level or not usn or not wam or not offset or not bandwidth or not modulation: continue print(f"{level / 10} dbμV | {usn / 10}% USN | {wam / 10}% WAM | {offset / 10} kHz offset | {bandwidth / 10} kHz bandwidth | {modulation / 10}% modulation") - time.sleep(0.3) -finally: - tef.close() \ No newline at end of file + time.sleep(0.3) \ No newline at end of file diff --git a/xrd.py b/xrd.py index 2f8625e..f32cf73 100644 --- a/xrd.py +++ b/xrd.py @@ -1,5 +1,6 @@ # This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode # This fully works with FM-DX-Webserver (no plugins) +# Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3) import socket import hashlib @@ -9,28 +10,31 @@ import libtimer import time from tef import TEF6686 from protocol import I2CPCClient +import os -HOST = '0.0.0.0' -PORT = 7373 -PASSWORD = "test" +HOST = os.getenv("HOST") or '0.0.0.0' +PORT = int(os.getenv("PORT") or 0) or 7373 +DEVICE = os.getenv("DEV") or "COM6" + +PASSWORD = os.getenv("PW") or "test" CLOCK = 12000000 # DP-666 SALT_LENGTH = 16 SS_UPDATE_INTERVAL = 0.125 RDS_UPDATE_INTERVAL = 0.086 -def generate_salt(length=SALT_LENGTH): +def generate_salt(length=SALT_LENGTH) -> str: return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length)) -def compute_hash(salt, password): +def compute_hash(salt: str, password: str): return hashlib.sha1((salt + password).encode()).hexdigest().encode() def init_tef(): - p = I2CPCClient("COM6") + p = I2CPCClient(DEVICE) tef = TEF6686(p) tef.init(clock=CLOCK) tef.AUDIO_Set_Mute(False) - tef.AUDIO_Set_Volume(60) + tef.AUDIO_Set_Volume(40) tef.FM_Tune_To(1, 9500) tef.FM_Set_RDS(1) tef.FM_Set_ChannelEqualizer(True) @@ -38,7 +42,7 @@ def init_tef(): tef.APPL_Set_OperationMode(True) return tef -def authenticate(conn): +def authenticate(conn: socket.socket): salt = generate_salt() conn.sendall(salt.encode() + b"\n") expected_hash = compute_hash(salt, PASSWORD) @@ -53,6 +57,8 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) for cmd in data.splitlines(): if cmd.startswith(b"T"): freq = int(cmd.decode().removeprefix("T").strip()) // 10 + if freq < 6500: continue + if freq > 10800: continue state['last_tune'] = freq tef.FM_Tune_To(1, freq) out += f"T{freq*10}\n".encode() @@ -76,6 +82,7 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) elif cmd.startswith(b"x"): out += b"OK\n" tef.APPL_Set_OperationMode(False) + tef.FM_Tune_To(1, state["last_tune"]) elif cmd.startswith(b"X"): tef.APPL_Set_OperationMode(True) elif cmd.startswith(b"W"): @@ -99,14 +106,11 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"]) conn.sendall(b"U") for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]): - tef.FM_Tune_To(2, freq) - time.sleep(0.0055) - conn.sendall(str(freq * 10).encode()) - conn.sendall(b" = ") + tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent + time.sleep(0.0064) _, level, *_ = tef.FM_Get_Quality_Data() if level is None: continue - conn.sendall(str((level // 10) + 10).encode()) - conn.sendall(b", ") + conn.sendall(str(freq * 10).encode() + b" = " + str(level // 10).encode() + b", ") conn.sendall(b"\n") tef.FM_Tune_To(1, state["last_tune"]) @@ -114,9 +118,9 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) return out -def send_signal_status(tef: TEF6686, conn: socket.socket, state): +def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): _, _, stereo, _ = tef.FM_Get_Processing_Status() - stereo = stereo if stereo is not None else 1000 + stereo = stereo if stereo is not None else 1000 # Default to Mono status, level, usn, wam, _, bandwidth, _ = tef.FM_Get_Quality_Data() if status is None or level is None or wam is None or usn is None or bandwidth is None: return @@ -125,54 +129,70 @@ def send_signal_status(tef: TEF6686, conn: socket.socket, state): data = b"S" if state['forced_mono']: data += b"M" - elif stereo < 500: data += b"s" + elif stereo < 350: data += b"s" else: data += b"m" data += f"{level},{wam//10},{usn//10},{bandwidth}\n\n".encode() conn.sendall(data) -def send_rds_data(tef, conn): +def send_rds_data(tef: TEF6686, conn: socket.socket): status, A, B, C, D, dec_error = tef.FM_Get_RDS_Data__decoder() - if None in (status, A, B, C, D, dec_error): return + if status is None or A is None or B is None or C is None or D is None or dec_error is None: return # Fucking hate pyright if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return - err = 0 - err |= ((dec_error >> 8) & 0x30) >> 4 - err |= ((dec_error >> 8) & 0xC) - err |= ((dec_error >> 8) & 3) << 4 + data = b"" + if (status & (1 << 13) == 0): + err = 0 + err |= ((dec_error >> 8) & 0x30) >> 4 + err |= ((dec_error >> 8) & 0xC) + err |= ((dec_error >> 8) & 3) << 4 - data = b"R" - data += f"{B:04X}{C:04X}{D:04X}{err:02X}".encode() + data = b"R" + data += f"{B:04X}{C:04X}{D:04X}{err:02X}\n".encode() - pi_error = dec_error >> 14 + pi_error = (dec_error >> 14) & 0b11 if pi_error < 3: - data += b"\nP" + f"{A:04X}".encode() + data += b"P" + f"{A:04X}".encode() data += b"?" * pi_error + b"\n" elif status & (1 << 12): pi_error = (dec_error >> 10) & 0b11 if pi_error < 3: - data += b"\nP" + f"{C:04X}".encode() + data += b"P" + f"{C:04X}".encode() data += b"?" * pi_error + b"\n" conn.sendall(data) def run_server(): - tef = init_tef() - state = { - 'last_tune': 9500, - 'last_eqims': 0b11, - 'forced_mono': False, - 'deemp': 0, - 'bw': 0, - 'scan_start': 87500, - 'scan_stop': 108000, - 'scan_step': 100, - 'scan_bw': 0, - } + with init_tef() as tef: + device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None) + if device and hw_version and sw_version: + variant = device & 127 + hw_major = hw_version >> 8 + hw_minor = hw_version & 127 + sw_major = sw_version >> 8 + + variant_str = "TEF6686" + if variant == 1: variant_str = "TEF6687" + if variant == 9: variant_str = "TEF6688" + if variant == 3: variant_str = "TEF6689" + + print(f"Got {variant_str} (V{hw_major}{hw_minor}{sw_major})") + + state = { + 'last_tune': 9500, + 'last_eqims': 0b11, + 'forced_mono': False, + 'deemp': 0, + 'bw': 0, + 'scan_start': 87500, + 'scan_stop': 108000, + 'scan_step': 100, + 'scan_bw': 0, + } - try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen() print(f"Server listening on {HOST}:{PORT}") @@ -191,6 +211,7 @@ def run_server(): conn.sendall(f"B{int(state['forced_mono'])}\n".encode()) conn.sendall(f"D{state['deemp']}\n".encode()) conn.sendall(f"W{state['bw']}\n".encode()) + conn.sendall(f"M0\n".encode()) conn.setblocking(False) ss_timer = libtimer.Timer() @@ -211,7 +232,6 @@ def run_server(): if rds_timer.get_time() > RDS_UPDATE_INTERVAL: send_rds_data(tef, conn) rds_timer.reset() - time.sleep(0.01) - finally: tef.close() + time.sleep(0.015) if __name__ == "__main__": run_server() \ No newline at end of file