# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode # This fully works with FM-DX-Webserver (no plugins) # Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3) import socket import hashlib import secrets import string import libtimer import time from tef import TEF6686 from protocol import I2CPCClient import os from functools import wraps from typing import Callable INITIAL_FREQ = 9500 INITIAL_EQ = False INITIAL_IMS = True HOST = os.getenv("HOST") or '0.0.0.0' PORT = int(os.getenv("PORT") or 0) or 7373 DEVICE = os.getenv("DEV") or "COM6" FREQ_NOT_ALLOWED_RANGE = [] PASSWORD = os.getenv("PW") or "test" CLOCK = 12000000 # DP-666 SALT_LENGTH = 16 SS_UPDATE_INTERVAL = 0.115 RDS_UPDATE_INTERVAL = 0.086 def freq_allowed(freq: int): if not FREQ_NOT_ALLOWED_RANGE: return True for (lower, upper) in FREQ_NOT_ALLOWED_RANGE: if freq >= lower and freq <= upper: return False return True def init_tef(): p = I2CPCClient(DEVICE, int(os.getenv("BAUD") or 0) or 115200) tef = TEF6686(p) tef.init(clock=CLOCK) tef.AUDIO_Set_Mute(False) tef.AUDIO_Set_Volume(30) tef.FM_Tune_To(1, INITIAL_FREQ) tef.FM_Set_RDS(1) tef.FM_Set_ChannelEqualizer(INITIAL_EQ) tef.FM_Set_MphSuppression(INITIAL_IMS) tef.FM_Set_Stereo_Max(False) # Disables stereo blend??!!??!? tef.APPL_Set_OperationMode(True) # Turn off return tef def authenticate(conn: socket.socket): salt = "".join(secrets.choice(string.ascii_lowercase) for _ in range(SALT_LENGTH)) conn.sendall(salt.encode() + b"\n") expected_hash = hashlib.sha1((salt + PASSWORD).encode()).hexdigest().encode() while True: data = conn.recv(1024) if not data: return False if data.strip() == expected_hash.strip(): return True def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket): out = b"" for cmd in data.splitlines(): if cmd.startswith(b"T"): freq = int(cmd.decode().removeprefix("T").strip()) // 10 if freq < 6500 or freq > 10800: continue tef.FM_Tune_To(1, freq) if not freq_allowed(freq): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) state['last_tune'] = freq out += f"T{freq*10}\n".encode() elif cmd.startswith(b"G"): eqims = int(cmd.decode().removeprefix("G").strip(), 2) tef.FM_Set_ChannelEqualizer((eqims & 1) == 1) tef.FM_Set_MphSuppression((eqims & 2) == 2) out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode() state['last_eqims'] = eqims elif cmd.startswith(b"B"): mono = bool(int(cmd.decode().removeprefix("B").strip(), 2)) tef.FM_Set_Stereo_Min(2 if mono else 0) out += f"B{int(mono)}\n".encode() state['forced_mono'] = mono elif cmd.startswith(b"D"): deemp = int(cmd.decode().removeprefix("D").strip()) dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0) tef.FM_Set_Deemphasis(dtime) out += f"D{deemp}\n".encode() state['deemp'] = deemp elif cmd.startswith(b"x"): out += b"OK\n" tef.APPL_Set_OperationMode(False) # Enable tef.FM_Tune_To(1, state["last_tune"]) if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) elif cmd.startswith(b"X"): tef.APPL_Set_OperationMode(True) # turn off elif cmd.startswith(b"W"): bw = int(cmd.decode().removeprefix("W").strip()) auto = (bw == 0) tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100)) state['bw'] = bw out += f"W{bw}\n".encode() elif cmd.startswith(b"?"): out += b"XRD Python driver\n" elif cmd.startswith(b"S"): cmd = cmd[1:] if cmd != b"": arg = int(cmd.decode()[1:].strip()) match cmd[0]: case 97: state['scan_start'] = (arg + 5) // 10 case 98: state['scan_stop'] = (arg + 5) // 10 case 99: state['scan_step'] = (arg + 5) // 10 case 119: state['scan_bw'] = arg else: start = True tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"]) conn.sendall(b"U") for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]): if not start: conn.sendall(b", ") # Prevent trailing comma, because the FM-DX-Webserver spectrum plugin treats us as actual firmware and throws as harder api, without it we're treated as a module start = False tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent time.sleep(0.0067) # sick seven if not freq_allowed(freq): conn.sendall(f"{freq * 10} = 11.25".encode()) continue _, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None) if level is None: continue conn.sendall(str(freq * 10).encode() + b" = " + str((level / 10) + 11.25).encode()) conn.sendall(b"\n") tef.FM_Tune_To(1, state["last_tune"]) if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True) else: tef.AUDIO_Set_Mute(False) tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100)) return out PERIODIC_FUNCTIONS: list[tuple[Callable, float, libtimer.Timer]] = [] def periodic(t: float): def decorator(func): PERIODIC_FUNCTIONS.append((func, t, libtimer.Timer())) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator def reset_periodic(): for (_, _, timer) in PERIODIC_FUNCTIONS: timer.reset() def run_periodic(*args, **kwargs): for (func, t, timer) in PERIODIC_FUNCTIONS: if timer.get_time() > t: func(*args, **kwargs) timer.reset() @periodic(SS_UPDATE_INTERVAL) def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict): if not freq_allowed(state["last_tune"]): conn.sendall(b"Sm11.25,0,0,0\n\n") return stereo_pilot, _ = d if (d := tef.FM_Get_Signal_Status()) else (None, None) res = tef.FM_Get_Quality_Data() if res is None: return _, level, usn, wam, _, bandwidth, *_ = res level = level / 10 data = b"S" if state['forced_mono']: data += b"M" elif stereo_pilot: data += b"s" else: data += b"m" data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode() conn.sendall(data) @periodic(RDS_UPDATE_INTERVAL) def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict): if not freq_allowed(state["last_tune"]): return res = tef.FM_Get_RDS_Data__decoder() if res is None: return status, A, B, C, D, dec_error = res dec_error >>= 8 if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return data = b"" a_error = dec_error >> 6 c_error = (dec_error >> 2) & 0b11 if (status & (1 << 13) == 0): if (status & (1 << 12) == 0): # Type A, PI in A data = f"R{A:04X}{B:04X}{C:04X}".encode() elif (status >> 12) & 1: # Type B, PI in A and C pi = A if c_error < a_error: pi = C data = f"R{pi:04X}{B:04X}{pi:04X}".encode() data += f"{D:04X}{dec_error:02X}\n".encode() else: data = f"P{A:04X}{"?" * a_error}\n".encode() # No group data, only PI if not data.strip(): return conn.sendall(data) def run_server(): with init_tef() as tef: device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None) if device and hw_version and sw_version: variant = device & 127 hw_major = hw_version >> 8 hw_minor = hw_version & 127 sw_major = sw_version >> 8 variant_str = "TEF6686" if variant == 1: variant_str = "TEF6687" elif variant == 9: variant_str = "TEF6688" elif variant == 3: variant_str = "TEF6689" print(f"{variant_str} (V{hw_major}{hw_minor}{sw_major})") state = { 'last_tune': INITIAL_FREQ, 'last_eqims': (INITIAL_EQ << 1) | INITIAL_IMS, 'forced_mono': False, 'deemp': 0, 'bw': 0, 'scan_start': 87500, 'scan_stop': 108000, 'scan_step': 100, 'scan_bw': 0, } with socket.socket() as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen() print(f"Server listening on {HOST}:{PORT}") while True: conn, addr = s.accept() with conn: reset_periodic() print(f"Connected by {addr}") if not authenticate(conn): print("Authentication failed.") continue # Send initial state conn.sendall(f"T{state['last_tune']*10}\n".encode()) conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode()) conn.sendall(f"B{int(state['forced_mono'])}\n".encode()) conn.sendall(f"D{state['deemp']}\n".encode()) conn.sendall(f"W{state['bw']}\n".encode()) conn.sendall(f"M0\n".encode()) conn.setblocking(False) while True: try: if not (data := conn.recv(1024)): break resp = process_command(tef, data, state, conn) if resp: conn.sendall(resp) except ConnectionResetError: break except ConnectionAbortedError: break except BlockingIOError: run_periodic(tef, conn, state) time.sleep(0.025) if __name__ == "__main__": run_server()