1
0
Files
TEF6686_Driver/xrd.py
2026-02-21 22:48:28 +01:00

255 lines
11 KiB
Python

# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode
# This fully works with FM-DX-Webserver (no plugins)
# Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3)
import socket
import hashlib
import secrets
import string
import libtimer
import time
from tef import TEF6686
from protocol import I2CPCClient
import os
HOST = os.getenv("HOST") or '0.0.0.0'
PORT = int(os.getenv("PORT") or 0) or 7373
DEVICE = os.getenv("DEV") or "COM6"
FREQ_NOT_ALLOWED_RANGE = [9482, 9518]
FREQ_NOT_ALLOWED_LIST = [100900]
PASSWORD = os.getenv("PW") or "test"
CLOCK = 12000000 # DP-666
SALT_LENGTH = 16
SS_UPDATE_INTERVAL = 0.125
RDS_UPDATE_INTERVAL = 0.086
def generate_salt(length=SALT_LENGTH) -> str:
return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length))
def compute_hash(salt: str, password: str):
return hashlib.sha1((salt + password).encode()).hexdigest().encode()
def init_tef():
p = I2CPCClient(DEVICE)
tef = TEF6686(p)
tef.init(clock=CLOCK)
tef.AUDIO_Set_Mute(False)
tef.AUDIO_Set_Volume(40)
tef.FM_Tune_To(1, 9500)
tef.FM_Set_RDS(1)
tef.FM_Set_ChannelEqualizer(True)
tef.FM_Set_MphSuppression(True)
tef.APPL_Set_OperationMode(True)
return tef
def authenticate(conn: socket.socket):
salt = generate_salt()
conn.sendall(salt.encode() + b"\n")
expected_hash = compute_hash(salt, PASSWORD)
while True:
data = conn.recv(1024)
if not data: return False
if data.strip() == expected_hash.strip(): return True
def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket):
out = b""
for cmd in data.splitlines():
if cmd.startswith(b"T"):
freq = int(cmd.decode().removeprefix("T").strip()) // 10
if freq < 6500: continue
if freq > 10800: continue
state['last_tune'] = freq
tef.FM_Tune_To(1, freq)
if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
out += f"T{freq*10}\n".encode()
elif cmd.startswith(b"G"):
eqims = int(cmd.decode().removeprefix("G").strip(), 2)
state['last_eqims'] = eqims
tef.FM_Set_ChannelEqualizer((eqims & 1) == 1)
tef.FM_Set_MphSuppression((eqims & 2) == 2)
out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode()
elif cmd.startswith(b"B"):
mono = bool(int(cmd.decode().removeprefix("B").strip(), 2))
state['forced_mono'] = mono
tef.FM_Set_Stereo_Min(2 if mono else 0)
out += f"B{int(mono)}\n".encode()
elif cmd.startswith(b"D"):
deemp = int(cmd.decode().removeprefix("D").strip())
dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0)
state['deemp'] = deemp
tef.FM_Set_Deemphasis(dtime)
out += f"D{deemp}\n".encode()
elif cmd.startswith(b"x"):
out += b"OK\n"
tef.APPL_Set_OperationMode(False)
tef.FM_Tune_To(1, state["last_tune"])
if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
elif cmd.startswith(b"X"):
tef.APPL_Set_OperationMode(True)
elif cmd.startswith(b"W"):
bw = int(cmd.decode().removeprefix("W").strip())
auto = (bw == 0)
tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100))
state['bw'] = bw
out += f"W{bw}\n".encode()
elif cmd.startswith(b"?"):
out += b"XRD Python driver\n"
elif cmd.startswith(b"S"):
cmd = cmd[1:]
if cmd != b"":
arg = int(cmd.decode()[1:].strip())
match cmd[0]:
case 97: state['scan_start'] = (arg + 5) // 10
case 98: state['scan_stop'] = (arg + 5) // 10
case 99: state['scan_step'] = (arg + 5) // 10
case 119: state['scan_bw'] = arg
else:
tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"])
conn.sendall(b"U")
for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]):
tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent
time.sleep(0.0064)
if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST):
conn.sendall(f"{freq * 10} = 0, ".encode())
continue
_, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None)
if level is None: continue
conn.sendall(str(freq * 10).encode() + b" = " + str((level / 10) + 11.25).encode() + b", ")
conn.sendall(b"\n")
tef.FM_Tune_To(1, state["last_tune"])
if (FREQ_NOT_ALLOWED_RANGE and freq >= FREQ_NOT_ALLOWED_RANGE[0] and freq <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and freq in FREQ_NOT_ALLOWED_LIST): tef.AUDIO_Set_Mute(True)
else: tef.AUDIO_Set_Mute(False)
tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100))
return out
def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict):
if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST and state["last_tune"] in FREQ_NOT_ALLOWED_LIST):
conn.sendall(b"Sm11.25,0,0,0\n\n")
return
_, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None)
res = tef.FM_Get_Quality_Data()
if res is None: return
_, level, usn, wam, _, bandwidth, *_ = res
level = level / 10
data = b"S"
if state['forced_mono']: data += b"M"
elif stereo < 350: data += b"s"
else: data += b"m"
data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode()
conn.sendall(data)
def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict):
if (FREQ_NOT_ALLOWED_RANGE and state["last_tune"] >= FREQ_NOT_ALLOWED_RANGE[0] and state["last_tune"] <= FREQ_NOT_ALLOWED_RANGE[1]) or (FREQ_NOT_ALLOWED_LIST or state["last_tune"] in FREQ_NOT_ALLOWED_LIST): return
res = tef.FM_Get_RDS_Data__decoder()
if res is None: return
status, A, B, C, D, dec_error = res
if status is None or A is None or B is None or C is None or D is None or dec_error is None: return # Fucking hate pyright
if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return
data = b""
if (status & (1 << 13) == 0):
err = 0
err |= ((dec_error >> 8) & 0x30) >> 4
err |= ((dec_error >> 8) & 0xC)
err |= ((dec_error >> 8) & 3) << 4
data = b"R"
data += f"{B:04X}{C:04X}{D:04X}{err:02X}\n".encode()
pi_error = (dec_error >> 14) & 0b11
if pi_error < 3:
data += b"P" + f"{A:04X}".encode()
data += b"?" * pi_error + b"\n"
elif status & (1 << 12):
pi_error = (dec_error >> 10) & 0b11
if pi_error < 3:
data += b"P" + f"{C:04X}".encode()
data += b"?" * pi_error + b"\n"
conn.sendall(data)
def run_server():
with init_tef() as tef:
device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None)
if device and hw_version and sw_version:
variant = device & 127
hw_major = hw_version >> 8
hw_minor = hw_version & 127
sw_major = sw_version >> 8
variant_str = "TEF6686"
if variant == 1: variant_str = "TEF6687"
if variant == 9: variant_str = "TEF6688"
if variant == 3: variant_str = "TEF6689"
print(f"Got {variant_str} (V{hw_major}{hw_minor}{sw_major})")
state = {
'last_tune': 9500,
'last_eqims': 0b11,
'forced_mono': False,
'deemp': 0,
'bw': 0,
'scan_start': 87500,
'scan_stop': 108000,
'scan_step': 100,
'scan_bw': 0,
}
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((HOST, PORT))
s.listen()
print(f"Server listening on {HOST}:{PORT}")
while True:
conn, addr = s.accept()
with conn:
print(f"Connected by {addr}")
if not authenticate(conn):
print("Authentication failed.")
continue
# Send initial state
conn.sendall(f"T{state['last_tune']*10}\n".encode())
conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode())
conn.sendall(f"B{int(state['forced_mono'])}\n".encode())
conn.sendall(f"D{state['deemp']}\n".encode())
conn.sendall(f"W{state['bw']}\n".encode())
conn.sendall(f"M0\n".encode())
conn.setblocking(False)
ss_timer = libtimer.Timer()
rds_timer = libtimer.Timer()
while True:
try:
if not (data := conn.recv(1024)): break
resp = process_command(tef, data, state, conn)
if resp: conn.sendall(resp)
except ConnectionResetError: break
except ConnectionAbortedError: break
except BlockingIOError:
if ss_timer.get_time() > SS_UPDATE_INTERVAL:
send_signal_status(tef, conn, state)
ss_timer.reset()
if rds_timer.get_time() > RDS_UPDATE_INTERVAL:
send_rds_data(tef, conn, state)
rds_timer.reset()
time.sleep(0.015)
if __name__ == "__main__": run_server()