From 596df82d2844c15579ff8962b9b0fdc42888251a Mon Sep 17 00:00:00 2001 From: KubaPro010 Date: Sun, 22 Feb 2026 10:30:42 +0100 Subject: [PATCH] some internal changes --- xrd.py | 78 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 32 deletions(-) diff --git a/xrd.py b/xrd.py index c048742..2356030 100644 --- a/xrd.py +++ b/xrd.py @@ -11,26 +11,27 @@ import time from tef import TEF6686 from protocol import I2CPCClient import os +from functools import wraps +from typing import Callable HOST = os.getenv("HOST") or '0.0.0.0' PORT = int(os.getenv("PORT") or 0) or 7373 DEVICE = os.getenv("DEV") or "COM6" -FREQ_NOT_ALLOWED_RANGE = [9482, 9518] -FREQ_NOT_ALLOWED_LIST = [10090] +FREQ_NOT_ALLOWED_RANGE = [] PASSWORD = os.getenv("PW") or "test" CLOCK = 12000000 # DP-666 SALT_LENGTH = 16 -SS_UPDATE_INTERVAL = 0.125 +SS_UPDATE_INTERVAL = 0.115 RDS_UPDATE_INTERVAL = 0.086 -def generate_salt(length=SALT_LENGTH) -> str: - return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length)) - -def compute_hash(salt: str, password: str): - return hashlib.sha1((salt + password).encode()).hexdigest().encode() +def freq_allowed(freq: int): + if not FREQ_NOT_ALLOWED_RANGE: return True + for (lower, upper) in FREQ_NOT_ALLOWED_RANGE: + if freq >= lower and freq <= upper: return False + return True def init_tef(): p = I2CPCClient(DEVICE) @@ -46,9 +47,9 @@ def init_tef(): return tef def authenticate(conn: socket.socket): - salt = generate_salt() + salt = "".join(secrets.choice(string.ascii_lowercase) for _ in range(SALT_LENGTH)) conn.sendall(salt.encode() + b"\n") - expected_hash = compute_hash(salt, PASSWORD) + expected_hash = hashlib.sha1((salt + PASSWORD).encode()).hexdigest().encode() while True: data = conn.recv(1024) @@ -60,11 +61,10 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) for cmd in data.splitlines(): if cmd.startswith(b"T"): freq = int(cmd.decode().removeprefix("T").strip()) // 10 - if freq < 6500: continue - if freq > 10800: continue + if freq < 6500 or freq > 10800: continue state['last_tune'] = freq tef.FM_Tune_To(1, freq) - if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) + if not freq_allowed(freq): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) out += f"T{freq*10}\n".encode() elif cmd.startswith(b"G"): @@ -88,7 +88,7 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) out += b"OK\n" tef.APPL_Set_OperationMode(False) tef.FM_Tune_To(1, state["last_tune"]) - if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) + if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) elif cmd.startswith(b"X"): tef.APPL_Set_OperationMode(True) @@ -115,8 +115,8 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]): tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent time.sleep(0.0064) - if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): - conn.sendall(f"{freq * 10} = 0, ".encode()) + if freq_allowed(freq): + conn.sendall(f"{freq * 10} = -11.25, ".encode()) continue _, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None) if level is None: continue @@ -124,14 +124,34 @@ def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket) conn.sendall(b"\n") tef.FM_Tune_To(1, state["last_tune"]) - if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) + if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100)) return out +PERIODIC_FUNCTIONS: list[tuple[Callable, float, libtimer.Timer]] = [] + +def periodic(t: float): + def decorator(func): + PERIODIC_FUNCTIONS.append((func, t, libtimer.Timer())) + @wraps(func) + def wrapper(*args, **kwargs): return func(*args, **kwargs) + return wrapper + return decorator + +def reset_periodic(): + for (_, _, timer) in PERIODIC_FUNCTIONS: timer.reset() + +def run_periodic(*args, **kwargs): + for (func, t, timer) in PERIODIC_FUNCTIONS: + if timer.get_time() > t: + func(*args, **kwargs) + timer.reset() + +@periodic(SS_UPDATE_INTERVAL) def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): - if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST): + if not freq_allowed(state["last_tune"]): conn.sendall(b"Sm11.25,0,0,0\n\n") return _, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None) @@ -150,8 +170,9 @@ def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode() conn.sendall(data) +@periodic(RDS_UPDATE_INTERVAL) def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict): - if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST or state["last_tune"] in FREQ_NOT_ALLOWED_LIST): return + if not freq_allowed(state["last_tune"]): return res = tef.FM_Get_RDS_Data__decoder() if res is None: return @@ -193,10 +214,10 @@ def run_server(): variant_str = "TEF6686" if variant == 1: variant_str = "TEF6687" - if variant == 9: variant_str = "TEF6688" - if variant == 3: variant_str = "TEF6689" + elif variant == 9: variant_str = "TEF6688" + elif variant == 3: variant_str = "TEF6689" - print(f"Got {variant_str} (V{hw_major}{hw_minor}{sw_major})") + print(f"{variant_str} (V{hw_major}{hw_minor}{sw_major})") state = { 'last_tune': 9500, @@ -210,7 +231,7 @@ def run_server(): 'scan_bw': 0, } - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + with socket.socket() as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen() @@ -219,6 +240,7 @@ def run_server(): while True: conn, addr = s.accept() with conn: + reset_periodic() print(f"Connected by {addr}") if not authenticate(conn): print("Authentication failed.") @@ -233,9 +255,6 @@ def run_server(): conn.sendall(f"M0\n".encode()) conn.setblocking(False) - ss_timer = libtimer.Timer() - rds_timer = libtimer.Timer() - while True: try: if not (data := conn.recv(1024)): break @@ -244,12 +263,7 @@ def run_server(): except ConnectionResetError: break except ConnectionAbortedError: break except BlockingIOError: - if ss_timer.get_time() > SS_UPDATE_INTERVAL: - send_signal_status(tef, conn, state) - ss_timer.reset() - if rds_timer.get_time() > RDS_UPDATE_INTERVAL: - send_rds_data(tef, conn, state) - rds_timer.reset() + run_periodic() time.sleep(0.015) if __name__ == "__main__": run_server() \ No newline at end of file