1
0
Files
TEF6686_Driver/xrd.py
2026-02-22 10:30:42 +01:00

269 lines
9.9 KiB
Python

# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode
# This fully works with FM-DX-Webserver (no plugins)
# Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3)
import socket
import hashlib
import secrets
import string
import libtimer
import time
from tef import TEF6686
from protocol import I2CPCClient
import os
from functools import wraps
from typing import Callable
HOST = os.getenv("HOST") or '0.0.0.0'
PORT = int(os.getenv("PORT") or 0) or 7373
DEVICE = os.getenv("DEV") or "COM6"
FREQ_NOT_ALLOWED_RANGE = []
PASSWORD = os.getenv("PW") or "test"
CLOCK = 12000000 # DP-666
SALT_LENGTH = 16
SS_UPDATE_INTERVAL = 0.115
RDS_UPDATE_INTERVAL = 0.086
def freq_allowed(freq: int):
if not FREQ_NOT_ALLOWED_RANGE: return True
for (lower, upper) in FREQ_NOT_ALLOWED_RANGE:
if freq >= lower and freq <= upper: return False
return True
def init_tef():
p = I2CPCClient(DEVICE)
tef = TEF6686(p)
tef.init(clock=CLOCK)
tef.AUDIO_Set_Mute(False)
tef.AUDIO_Set_Volume(40)
tef.FM_Tune_To(1, 9500)
tef.FM_Set_RDS(1)
tef.FM_Set_ChannelEqualizer(True)
tef.FM_Set_MphSuppression(True)
tef.APPL_Set_OperationMode(True)
return tef
def authenticate(conn: socket.socket):
salt = "".join(secrets.choice(string.ascii_lowercase) for _ in range(SALT_LENGTH))
conn.sendall(salt.encode() + b"\n")
expected_hash = hashlib.sha1((salt + PASSWORD).encode()).hexdigest().encode()
while True:
data = conn.recv(1024)
if not data: return False
if data.strip() == expected_hash.strip(): return True
def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket):
out = b""
for cmd in data.splitlines():
if cmd.startswith(b"T"):
freq = int(cmd.decode().removeprefix("T").strip()) // 10
if freq < 6500 or freq > 10800: continue
state['last_tune'] = freq
tef.FM_Tune_To(1, freq)
if not freq_allowed(freq): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
out += f"T{freq*10}\n".encode()
elif cmd.startswith(b"G"):
eqims = int(cmd.decode().removeprefix("G").strip(), 2)
state['last_eqims'] = eqims
tef.FM_Set_ChannelEqualizer((eqims & 1) == 1)
tef.FM_Set_MphSuppression((eqims & 2) == 2)
out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode()
elif cmd.startswith(b"B"):
mono = bool(int(cmd.decode().removeprefix("B").strip(), 2))
state['forced_mono'] = mono
tef.FM_Set_Stereo_Min(2 if mono else 0)
out += f"B{int(mono)}\n".encode()
elif cmd.startswith(b"D"):
deemp = int(cmd.decode().removeprefix("D").strip())
dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0)
state['deemp'] = deemp
tef.FM_Set_Deemphasis(dtime)
out += f"D{deemp}\n".encode()
elif cmd.startswith(b"x"):
out += b"OK\n"
tef.APPL_Set_OperationMode(False)
tef.FM_Tune_To(1, state["last_tune"])
if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
elif cmd.startswith(b"X"):
tef.APPL_Set_OperationMode(True)
elif cmd.startswith(b"W"):
bw = int(cmd.decode().removeprefix("W").strip())
auto = (bw == 0)
tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100))
state['bw'] = bw
out += f"W{bw}\n".encode()
elif cmd.startswith(b"?"):
out += b"XRD Python driver\n"
elif cmd.startswith(b"S"):
cmd = cmd[1:]
if cmd != b"":
arg = int(cmd.decode()[1:].strip())
match cmd[0]:
case 97: state['scan_start'] = (arg + 5) // 10
case 98: state['scan_stop'] = (arg + 5) // 10
case 99: state['scan_step'] = (arg + 5) // 10
case 119: state['scan_bw'] = arg
else:
tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"])
conn.sendall(b"U")
for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]):
tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent
time.sleep(0.0064)
if freq_allowed(freq):
conn.sendall(f"{freq * 10} = -11.25, ".encode())
continue
_, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None)
if level is None: continue
conn.sendall(str(freq * 10).encode() + b" = " + str((level / 10) + 11.25).encode() + b", ")
conn.sendall(b"\n")
tef.FM_Tune_To(1, state["last_tune"])
if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100))
return out
PERIODIC_FUNCTIONS: list[tuple[Callable, float, libtimer.Timer]] = []
def periodic(t: float):
def decorator(func):
PERIODIC_FUNCTIONS.append((func, t, libtimer.Timer()))
@wraps(func)
def wrapper(*args, **kwargs): return func(*args, **kwargs)
return wrapper
return decorator
def reset_periodic():
for (_, _, timer) in PERIODIC_FUNCTIONS: timer.reset()
def run_periodic(*args, **kwargs):
for (func, t, timer) in PERIODIC_FUNCTIONS:
if timer.get_time() > t:
func(*args, **kwargs)
timer.reset()
@periodic(SS_UPDATE_INTERVAL)
def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict):
if not freq_allowed(state["last_tune"]):
conn.sendall(b"Sm11.25,0,0,0\n\n")
return
_, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None)
res = tef.FM_Get_Quality_Data()
if res is None: return
_, level, usn, wam, _, bandwidth, *_ = res
level = level / 10
data = b"S"
if state['forced_mono']: data += b"M"
elif stereo < 350: data += b"s"
else: data += b"m"
data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode()
conn.sendall(data)
@periodic(RDS_UPDATE_INTERVAL)
def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict):
if not freq_allowed(state["last_tune"]): return
res = tef.FM_Get_RDS_Data__decoder()
if res is None: return
status, A, B, C, D, dec_error = res
if status is None or A is None or B is None or C is None or D is None or dec_error is None: return # Fucking hate pyright
if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return
data = b""
if (status & (1 << 13) == 0):
err = 0
err |= ((dec_error >> 8) & 0x30) >> 4
err |= ((dec_error >> 8) & 0xC)
err |= ((dec_error >> 8) & 3) << 4
data = b"R"
data += f"{B:04X}{C:04X}{D:04X}{err:02X}\n".encode()
pi_error = (dec_error >> 14) & 0b11
if pi_error < 3:
data += b"P" + f"{A:04X}".encode()
data += b"?" * pi_error + b"\n"
elif status & (1 << 12):
pi_error = (dec_error >> 10) & 0b11
if pi_error < 3:
data += b"P" + f"{C:04X}".encode()
data += b"?" * pi_error + b"\n"
conn.sendall(data)
def run_server():
with init_tef() as tef:
device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None)
if device and hw_version and sw_version:
variant = device & 127
hw_major = hw_version >> 8
hw_minor = hw_version & 127
sw_major = sw_version >> 8
variant_str = "TEF6686"
if variant == 1: variant_str = "TEF6687"
elif variant == 9: variant_str = "TEF6688"
elif variant == 3: variant_str = "TEF6689"
print(f"{variant_str} (V{hw_major}{hw_minor}{sw_major})")
state = {
'last_tune': 9500,
'last_eqims': 0b11,
'forced_mono': False,
'deemp': 0,
'bw': 0,
'scan_start': 87500,
'scan_stop': 108000,
'scan_step': 100,
'scan_bw': 0,
}
with socket.socket() as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen()
print(f"Server listening on {HOST}:{PORT}")
while True:
conn, addr = s.accept()
with conn:
reset_periodic()
print(f"Connected by {addr}")
if not authenticate(conn):
print("Authentication failed.")
continue
# Send initial state
conn.sendall(f"T{state['last_tune']*10}\n".encode())
conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode())
conn.sendall(f"B{int(state['forced_mono'])}\n".encode())
conn.sendall(f"D{state['deemp']}\n".encode())
conn.sendall(f"W{state['bw']}\n".encode())
conn.sendall(f"M0\n".encode())
conn.setblocking(False)
while True:
try:
if not (data := conn.recv(1024)): break
resp = process_command(tef, data, state, conn)
if resp: conn.sendall(resp)
except ConnectionResetError: break
except ConnectionAbortedError: break
except BlockingIOError:
run_periodic()
time.sleep(0.015)
if __name__ == "__main__": run_server()