You've already forked TEF6686_Driver
278 lines
10 KiB
Python
278 lines
10 KiB
Python
# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode
|
|
# This fully works with FM-DX-Webserver (no plugins)
|
|
# Released under the Unlicense (however both FM-DX-Webserver and the firmware are under the tyranny of GPL-3)
|
|
|
|
import socket
|
|
import hashlib
|
|
import secrets
|
|
import string
|
|
import libtimer
|
|
import time
|
|
from tef import TEF6686
|
|
from protocol import I2CPCClient
|
|
import os
|
|
from functools import wraps
|
|
from typing import Callable
|
|
|
|
INITIAL_FREQ = 9500
|
|
INITIAL_EQ = False
|
|
INITIAL_IMS = True
|
|
|
|
HOST = os.getenv("HOST") or '0.0.0.0'
|
|
PORT = int(os.getenv("PORT") or 0) or 7373
|
|
DEVICE = os.getenv("DEV") or "COM6"
|
|
|
|
FREQ_NOT_ALLOWED_RANGE = []
|
|
|
|
PASSWORD = os.getenv("PW") or "test"
|
|
CLOCK = 12000000 # DP-666
|
|
|
|
SALT_LENGTH = 16
|
|
SS_UPDATE_INTERVAL = 0.115
|
|
RDS_UPDATE_INTERVAL = 0.086
|
|
|
|
def freq_allowed(freq: int):
|
|
if not FREQ_NOT_ALLOWED_RANGE: return True
|
|
for (lower, upper) in FREQ_NOT_ALLOWED_RANGE:
|
|
if freq >= lower and freq <= upper: return False
|
|
return True
|
|
|
|
def init_tef():
|
|
p = I2CPCClient(DEVICE)
|
|
tef = TEF6686(p)
|
|
tef.init(clock=CLOCK)
|
|
tef.AUDIO_Set_Mute(False)
|
|
tef.AUDIO_Set_Volume(45)
|
|
tef.FM_Tune_To(1, INITIAL_FREQ)
|
|
tef.FM_Set_RDS(1)
|
|
tef.FM_Set_ChannelEqualizer(INITIAL_EQ)
|
|
tef.FM_Set_MphSuppression(INITIAL_IMS)
|
|
tef.FM_Set_Stereo_Max(False) # Disables stereo blend??!!??!?
|
|
tef.APPL_Set_OperationMode(True) # Turn off
|
|
return tef
|
|
|
|
def authenticate(conn: socket.socket):
|
|
salt = "".join(secrets.choice(string.ascii_lowercase) for _ in range(SALT_LENGTH))
|
|
conn.sendall(salt.encode() + b"\n")
|
|
expected_hash = hashlib.sha1((salt + PASSWORD).encode()).hexdigest().encode()
|
|
|
|
while True:
|
|
data = conn.recv(1024)
|
|
if not data: return False
|
|
if data.strip() == expected_hash.strip(): return True
|
|
|
|
def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket):
|
|
out = b""
|
|
for cmd in data.splitlines():
|
|
if cmd.startswith(b"T"):
|
|
freq = int(cmd.decode().removeprefix("T").strip()) // 10
|
|
if freq < 6500 or freq > 10800: continue
|
|
tef.FM_Tune_To(1, freq)
|
|
if not freq_allowed(freq): tef.AUDIO_Set_Mute(True)
|
|
else: tef.AUDIO_Set_Mute(False)
|
|
state['last_tune'] = freq
|
|
out += f"T{freq*10}\n".encode()
|
|
elif cmd.startswith(b"G"):
|
|
eqims = int(cmd.decode().removeprefix("G").strip(), 2)
|
|
tef.FM_Set_ChannelEqualizer((eqims & 1) == 1)
|
|
tef.FM_Set_MphSuppression((eqims & 2) == 2)
|
|
out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode()
|
|
state['last_eqims'] = eqims
|
|
elif cmd.startswith(b"B"):
|
|
mono = bool(int(cmd.decode().removeprefix("B").strip(), 2))
|
|
tef.FM_Set_Stereo_Min(2 if mono else 0)
|
|
out += f"B{int(mono)}\n".encode()
|
|
state['forced_mono'] = mono
|
|
elif cmd.startswith(b"D"):
|
|
deemp = int(cmd.decode().removeprefix("D").strip())
|
|
dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0)
|
|
tef.FM_Set_Deemphasis(dtime)
|
|
out += f"D{deemp}\n".encode()
|
|
state['deemp'] = deemp
|
|
elif cmd.startswith(b"x"):
|
|
out += b"OK\n"
|
|
tef.APPL_Set_OperationMode(False) # Enable
|
|
tef.FM_Tune_To(1, state["last_tune"])
|
|
if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True)
|
|
else: tef.AUDIO_Set_Mute(False)
|
|
elif cmd.startswith(b"X"):
|
|
tef.APPL_Set_OperationMode(True) # turn off
|
|
elif cmd.startswith(b"W"):
|
|
bw = int(cmd.decode().removeprefix("W").strip())
|
|
auto = (bw == 0)
|
|
tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100))
|
|
state['bw'] = bw
|
|
out += f"W{bw}\n".encode()
|
|
elif cmd.startswith(b"?"):
|
|
out += b"XRD Python driver\n"
|
|
elif cmd.startswith(b"S"):
|
|
cmd = cmd[1:]
|
|
if cmd != b"":
|
|
arg = int(cmd.decode()[1:].strip())
|
|
match cmd[0]:
|
|
case 97: state['scan_start'] = (arg + 5) // 10
|
|
case 98: state['scan_stop'] = (arg + 5) // 10
|
|
case 99: state['scan_step'] = (arg + 5) // 10
|
|
case 119: state['scan_bw'] = arg
|
|
else:
|
|
start = True
|
|
|
|
tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"])
|
|
conn.sendall(b"U")
|
|
for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]):
|
|
if not start: conn.sendall(b", ") # Prevent trailing comma, because the FM-DX-Webserver spectrum plugin treats us as actual firmware and throws as harder api, without it we're treated as a module
|
|
start = False
|
|
|
|
tef.FM_Tune_To(2, freq) # Auto mutes, less commands sent
|
|
time.sleep(0.0064)
|
|
if not freq_allowed(freq):
|
|
conn.sendall(f"{freq * 10} = 11.25".encode())
|
|
continue
|
|
_, level, *_ = d if (d := tef.FM_Get_Quality_Data()) else (None, None)
|
|
if level is None: continue
|
|
conn.sendall(str(freq * 10).encode() + b" = " + str((level / 10) + 11.25).encode())
|
|
conn.sendall(b"\n")
|
|
|
|
tef.FM_Tune_To(1, state["last_tune"])
|
|
if not freq_allowed(state["last_tune"]): tef.AUDIO_Set_Mute(True)
|
|
else: tef.AUDIO_Set_Mute(False)
|
|
tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100))
|
|
|
|
return out
|
|
|
|
PERIODIC_FUNCTIONS: list[tuple[Callable, float, libtimer.Timer]] = []
|
|
|
|
def periodic(t: float):
|
|
def decorator(func):
|
|
PERIODIC_FUNCTIONS.append((func, t, libtimer.Timer()))
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs): return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
def reset_periodic():
|
|
for (_, _, timer) in PERIODIC_FUNCTIONS: timer.reset()
|
|
|
|
def run_periodic(*args, **kwargs):
|
|
for (func, t, timer) in PERIODIC_FUNCTIONS:
|
|
if timer.get_time() > t:
|
|
func(*args, **kwargs)
|
|
timer.reset()
|
|
|
|
@periodic(SS_UPDATE_INTERVAL)
|
|
def send_signal_status(tef: TEF6686, conn: socket.socket, state: dict):
|
|
if not freq_allowed(state["last_tune"]):
|
|
conn.sendall(b"Sm11.25,0,0,0\n\n")
|
|
return
|
|
_, _, stereo, _ = d if (d := tef.FM_Get_Processing_Status()) else (None, None, 1000, None)
|
|
|
|
res = tef.FM_Get_Quality_Data()
|
|
if res is None: return
|
|
_, level, usn, wam, _, bandwidth, *_ = res
|
|
|
|
level = level / 10
|
|
|
|
data = b"S"
|
|
if state['forced_mono']: data += b"M"
|
|
elif stereo < 350: data += b"s"
|
|
else: data += b"m"
|
|
|
|
data += f"{level + 11.25},{wam//10},{usn//10},{bandwidth}\n\n".encode()
|
|
conn.sendall(data)
|
|
|
|
@periodic(RDS_UPDATE_INTERVAL)
|
|
def send_rds_data(tef: TEF6686, conn: socket.socket, state: dict):
|
|
if not freq_allowed(state["last_tune"]): return
|
|
|
|
res = tef.FM_Get_RDS_Data__decoder()
|
|
if res is None: return
|
|
status, A, B, C, D, dec_error = res
|
|
|
|
if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return
|
|
|
|
data = b""
|
|
if (status & (1 << 13) == 0):
|
|
err = 0
|
|
err |= ((dec_error >> 8) & 0x30) >> 4
|
|
err |= ((dec_error >> 8) & 0xC)
|
|
err |= ((dec_error >> 8) & 3) << 4
|
|
|
|
data = b"R"
|
|
data += f"{B:04X}{C:04X}{D:04X}{err:02X}\n".encode()
|
|
|
|
pi_error = (dec_error >> 14) & 0b11
|
|
if pi_error < 3:
|
|
data += b"P" + f"{A:04X}".encode()
|
|
data += b"?" * pi_error + b"\n"
|
|
elif status & (1 << 12):
|
|
pi_error = (dec_error >> 10) & 0b11
|
|
if pi_error < 3:
|
|
data += b"P" + f"{C:04X}".encode()
|
|
data += b"?" * pi_error + b"\n"
|
|
|
|
conn.sendall(data)
|
|
|
|
def run_server():
|
|
with init_tef() as tef:
|
|
device, hw_version, sw_version = d if (d := tef.APPL_Get_Identification()) else (None, None, None)
|
|
if device and hw_version and sw_version:
|
|
variant = device & 127
|
|
hw_major = hw_version >> 8
|
|
hw_minor = hw_version & 127
|
|
sw_major = sw_version >> 8
|
|
|
|
variant_str = "TEF6686"
|
|
if variant == 1: variant_str = "TEF6687"
|
|
elif variant == 9: variant_str = "TEF6688"
|
|
elif variant == 3: variant_str = "TEF6689"
|
|
|
|
print(f"{variant_str} (V{hw_major}{hw_minor}{sw_major})")
|
|
|
|
state = {
|
|
'last_tune': INITIAL_FREQ,
|
|
'last_eqims': (INITIAL_EQ << 1) | INITIAL_IMS,
|
|
'forced_mono': False,
|
|
'deemp': 0,
|
|
'bw': 0,
|
|
'scan_start': 87500,
|
|
'scan_stop': 108000,
|
|
'scan_step': 100,
|
|
'scan_bw': 0,
|
|
}
|
|
|
|
with socket.socket() as s:
|
|
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
s.bind((HOST, PORT))
|
|
s.listen()
|
|
print(f"Server listening on {HOST}:{PORT}")
|
|
|
|
while True:
|
|
conn, addr = s.accept()
|
|
with conn:
|
|
reset_periodic()
|
|
print(f"Connected by {addr}")
|
|
if not authenticate(conn):
|
|
print("Authentication failed.")
|
|
continue
|
|
|
|
# Send initial state
|
|
conn.sendall(f"T{state['last_tune']*10}\n".encode())
|
|
conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode())
|
|
conn.sendall(f"B{int(state['forced_mono'])}\n".encode())
|
|
conn.sendall(f"D{state['deemp']}\n".encode())
|
|
conn.sendall(f"W{state['bw']}\n".encode())
|
|
conn.sendall(f"M0\n".encode())
|
|
conn.setblocking(False)
|
|
|
|
while True:
|
|
try:
|
|
if not (data := conn.recv(1024)): break
|
|
resp = process_command(tef, data, state, conn)
|
|
if resp: conn.sendall(resp)
|
|
except ConnectionResetError: break
|
|
except ConnectionAbortedError: break
|
|
except BlockingIOError:
|
|
run_periodic(tef, conn, state)
|
|
time.sleep(0.015)
|
|
|
|
if __name__ == "__main__": run_server() |