# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode # This fully works with FM-DX-Webserver (no plugins) # Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3) import socket import hashlib import secrets import string import libtimer import time from tef import TEF6686 from protocol import I2CPCClient import os HOST = os.getenv("HOST") or '0.0.0.0' PORT = int(os.getenv("PORT") or 0) or 7373 DEVICE = os.getenv("DEV") or "COM6" FREQ_NOT_ALLOWED_RANGE = [9482, 9518] FREQ_NOT_ALLOWED_LIST = [10900] PASSWORD = os.getenv("PW") or "test" CLOCK = 12000000 # DP-666 SALT_LENGTH = 16 SS_UPDATE_INTERVAL = 0.125 RDS_UPDATE_INTERVAL = 0.086 def generate_salt(length=SALT_LENGTH) -> str: return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length)) def compute_hash(salt: str, password: str): return hashlib.sha1((salt + password).encode()).hexdigest().encode() def init_tef(): p = I2CPCClient(DEVICE) tef = TEF6686(p) tef.init(clock=CLOCK) tef.AUDIO_Set_Mute(False) tef.AUDIO_Set_Volume(40) tef.FM_Tune_To(1, 9500) tef.FM_Set_RDS(1) tef.FM_Set_ChannelEqualizer(True) tef.FM_Set_MphSuppression(True) tef.APPL_Set_OperationMode(True) return tef def authenticate(conn: socket.socket): salt = generate_salt() conn.sendall(salt.encode() + b"\n") expected_hash = compute_hash(salt, PASSWORD) while True: data = conn.recv(1024) if not data: return False if data.strip() == expected_hash.strip(): return True def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket): out = b"" for cmd in data.splitlines(): if cmd.startswith(b"T"): freq = int(cmd.decode().removeprefix("T").strip()) // 10 if freq < 6500: continue if freq > 10800: continue state['last_tune'] = freq tef.FM_Tune_To(1, freq) if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) out += f"T{freq*10}\n".encode() elif cmd.startswith(b"G"): eqims = int(cmd.decode().removeprefix("G").strip(), 2) state['last_eqims'] = eqims tef.FM_Set_ChannelEqualizer((eqims & 1) == 1) tef.FM_Set_MphSuppression((eqims & 2) == 2) out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode() elif cmd.startswith(b"B"): mono = bool(int(cmd.decode().removeprefix("B").strip(), 2)) state['forced_mono'] = mono tef.FM_Set_Stereo_Min(2 if mono else 0) out += f"B{int(mono)}\n".encode() elif cmd.startswith(b"D"): deemp = int(cmd.decode().removeprefix("D").strip()) dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0) state['deemp'] = deemp tef.FM_Set_Deemphasis(dtime) out += f"D{deemp}\n".encode() elif cmd.startswith(b"x"): out += b"OK\n" tef.APPL_Set_OperationMode(False) tef.FM_Tune_To(1, state["last_tune"]) if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) elif cmd.startswith(b"X"): tef.APPL_Set_OperationMode(True) elif cmd.startswith(b"W"): bw = int(cmd.decode().removeprefix("W").strip()) auto = (bw == 0) tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100)) state['bw'] = bw out += f"W{bw}\n".encode() elif cmd.startswith(b"?"): out += b"XRD Python driver\n" elif cmd.startswith(b"S"): cmd = cmd[1:] if cmd != b"": arg = int(cmd.decode()[1:].strip()) match cmd[0]: case 97: state['scan_start'] = (arg + 5) // 10 case 98: state['scan_stop'] = (arg + 5) // 10 case 99: state['scan_step'] = (arg + 5) // 10 case 119: state['scan_bw'] = arg else: tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"]) conn.sendall(b"U") for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]): tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent time.sleep(0.0064) if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): conn.sendall(f"{freq * 10} = 0, ".encode()) continue _, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None) if level is None: continue conn.sendall(str(freq * 10).encode() + b" = " + str((level / 10) + 11.25).encode() + b", ") conn.sendall(b"\n") tef.FM_Tune_To(1, state["last_tune"]) if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100)) return out def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST): conn.sendall(b"Sm11.25,0,0,0\n\n") return _, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None) res = tef.FM_Get_Quality_Data() if res is None: return _, level, usn, wam, _, bandwidth, *_ = res level = level / 10 data = b"S" if state['forced_mono']: data += b"M" elif stereo < 350: data += b"s" else: data += b"m" data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode() conn.sendall(data) def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict): if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST or state["last_tune"] in FREQ_NOT_ALLOWED_LIST): return res = tef.FM_Get_RDS_Data__decoder() if res is None: return status, A, B, C, D, dec_error = res if status is None or A is None or B is None or C is None or D is None or dec_error is None: return # Fucking hate pyright if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return data = b"" if (status & (1 << 13) == 0): err = 0 err |= ((dec_error >> 8) & 0x30) >> 4 err |= ((dec_error >> 8) & 0xC) err |= ((dec_error >> 8) & 3) << 4 data = b"R" data += f"{B:04X}{C:04X}{D:04X}{err:02X}\n".encode() pi_error = (dec_error >> 14) & 0b11 if pi_error < 3: data += b"P" + f"{A:04X}".encode() data += b"?" * pi_error + b"\n" elif status & (1 << 12): pi_error = (dec_error >> 10) & 0b11 if pi_error < 3: data += b"P" + f"{C:04X}".encode() data += b"?" * pi_error + b"\n" conn.sendall(data) def run_server(): with init_tef() as tef: device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None) if device and hw_version and sw_version: variant = device & 127 hw_major = hw_version >> 8 hw_minor = hw_version & 127 sw_major = sw_version >> 8 variant_str = "TEF6686" if variant == 1: variant_str = "TEF6687" if variant == 9: variant_str = "TEF6688" if variant == 3: variant_str = "TEF6689" print(f"Got {variant_str} (V{hw_major}{hw_minor}{sw_major})") state = { 'last_tune': 9500, 'last_eqims': 0b11, 'forced_mono': False, 'deemp': 0, 'bw': 0, 'scan_start': 87500, 'scan_stop': 108000, 'scan_step': 100, 'scan_bw': 0, } with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen() print(f"Server listening on {HOST}:{PORT}") while True: conn, addr = s.accept() with conn: print(f"Connected by {addr}") if not authenticate(conn): print("Authentication failed.") continue # Send initial state conn.sendall(f"T{state['last_tune']*10}\n".encode()) conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode()) conn.sendall(f"B{int(state['forced_mono'])}\n".encode()) conn.sendall(f"D{state['deemp']}\n".encode()) conn.sendall(f"W{state['bw']}\n".encode()) conn.sendall(f"M0\n".encode()) conn.setblocking(False) ss_timer = libtimer.Timer() rds_timer = libtimer.Timer() while True: try: if not (data := conn.recv(1024)): break resp = process_command(tef, data, state, conn) if resp: conn.sendall(resp) except ConnectionResetError: break except ConnectionAbortedError: break except BlockingIOError: if ss_timer.get_time() > SS_UPDATE_INTERVAL: send_signal_status(tef, conn, state) ss_timer.reset() if rds_timer.get_time() > RDS_UPDATE_INTERVAL: send_rds_data(tef, conn, state) rds_timer.reset() time.sleep(0.015) if __name__ == "__main__": run_server()