You've already forked TEF6686_Driver
217 lines
7.8 KiB
Python
217 lines
7.8 KiB
Python
# This is a compatible server for the TEF6686 firmware, but instead using the i2c control mode
|
|
# This fully works with FM-DX-Webserver (no plugins)
|
|
|
|
import socket
|
|
import hashlib
|
|
import secrets
|
|
import string
|
|
import libtimer
|
|
import time
|
|
from tef import TEF6686
|
|
from protocol import I2CPCClient
|
|
|
|
HOST = '0.0.0.0'
|
|
PORT = 7373
|
|
PASSWORD = "test"
|
|
CLOCK = 12000000 # DP-666
|
|
|
|
SALT_LENGTH = 16
|
|
SS_UPDATE_INTERVAL = 0.125
|
|
RDS_UPDATE_INTERVAL = 0.086
|
|
|
|
def generate_salt(length=SALT_LENGTH):
|
|
return "".join(secrets.choice(string.ascii_lowercase) for _ in range(length))
|
|
|
|
def compute_hash(salt, password):
|
|
return hashlib.sha1((salt + password).encode()).hexdigest().encode()
|
|
|
|
def init_tef():
|
|
p = I2CPCClient("COM6")
|
|
tef = TEF6686(p)
|
|
tef.init(clock=CLOCK)
|
|
tef.AUDIO_Set_Mute(False)
|
|
tef.AUDIO_Set_Volume(60)
|
|
tef.FM_Tune_To(1, 9500)
|
|
tef.FM_Set_RDS(1)
|
|
tef.FM_Set_ChannelEqualizer(True)
|
|
tef.FM_Set_MphSuppression(True)
|
|
tef.APPL_Set_OperationMode(True)
|
|
return tef
|
|
|
|
def authenticate(conn):
|
|
salt = generate_salt()
|
|
conn.sendall(salt.encode() + b"\n")
|
|
expected_hash = compute_hash(salt, PASSWORD)
|
|
|
|
while True:
|
|
data = conn.recv(1024)
|
|
if not data: return False
|
|
if data.strip() == expected_hash.strip(): return True
|
|
|
|
def process_command(tef: TEF6686, data: bytes, state: dict, conn: socket.socket):
|
|
out = b""
|
|
for cmd in data.splitlines():
|
|
if cmd.startswith(b"T"):
|
|
freq = int(cmd.decode().removeprefix("T").strip()) // 10
|
|
state['last_tune'] = freq
|
|
tef.FM_Tune_To(1, freq)
|
|
out += f"T{freq*10}\n".encode()
|
|
elif cmd.startswith(b"G"):
|
|
eqims = int(cmd.decode().removeprefix("G").strip(), 2)
|
|
state['last_eqims'] = eqims
|
|
tef.FM_Set_ChannelEqualizer((eqims & 1) == 1)
|
|
tef.FM_Set_MphSuppression((eqims & 2) == 2)
|
|
out += f"G{bin(eqims).removeprefix('0b').zfill(2)}\n".encode()
|
|
elif cmd.startswith(b"B"):
|
|
mono = bool(int(cmd.decode().removeprefix("B").strip(), 2))
|
|
state['forced_mono'] = mono
|
|
tef.FM_Set_Stereo_Min(2 if mono else 0)
|
|
out += f"B{int(mono)}\n".encode()
|
|
elif cmd.startswith(b"D"):
|
|
deemp = int(cmd.decode().removeprefix("D").strip())
|
|
dtime = 500 if deemp == 0 else (750 if deemp == 1 else 0)
|
|
state['deemp'] = deemp
|
|
tef.FM_Set_Deemphasis(dtime)
|
|
out += f"D{deemp}\n".encode()
|
|
elif cmd.startswith(b"x"):
|
|
out += b"OK\n"
|
|
tef.APPL_Set_OperationMode(False)
|
|
elif cmd.startswith(b"X"):
|
|
tef.APPL_Set_OperationMode(True)
|
|
elif cmd.startswith(b"W"):
|
|
bw = int(cmd.decode().removeprefix("W").strip())
|
|
auto = (bw == 0)
|
|
tef.FM_Set_Bandwidth(auto, 2360 if auto else (bw // 100))
|
|
state['bw'] = bw
|
|
out += f"W{bw}\n".encode()
|
|
elif cmd.startswith(b"?"):
|
|
out += b"XRD Python driver\n"
|
|
elif cmd.startswith(b"S"):
|
|
cmd = cmd[1:]
|
|
if cmd != b"":
|
|
arg = int(cmd.decode()[1:].strip())
|
|
match cmd[0]:
|
|
case 97: state['scan_start'] = (arg + 5) // 10
|
|
case 98: state['scan_stop'] = (arg + 5) // 10
|
|
case 99: state['scan_step'] = (arg + 5) // 10
|
|
case 119: state['scan_bw'] = arg
|
|
else:
|
|
tef.FM_Set_Bandwidth((state["scan_bw"] == 0), state["scan_bw"])
|
|
conn.sendall(b"U")
|
|
for freq in range(state["scan_start"], state["scan_stop"] + state["scan_step"], state["scan_step"]):
|
|
tef.FM_Tune_To(2, freq)
|
|
time.sleep(0.0055)
|
|
conn.sendall(str(freq * 10).encode())
|
|
conn.sendall(b" = ")
|
|
_, level, *_ = tef.FM_Get_Quality_Data()
|
|
if level is None: continue
|
|
conn.sendall(str((level // 10) + 10).encode())
|
|
conn.sendall(b", ")
|
|
conn.sendall(b"\n")
|
|
|
|
tef.FM_Tune_To(1, state["last_tune"])
|
|
tef.FM_Set_Bandwidth((state["bw"] == 0), 2360 if (state["bw"] == 0) else (state["bw"] // 100))
|
|
|
|
return out
|
|
|
|
def send_signal_status(tef: TEF6686, conn: socket.socket, state):
|
|
_, _, stereo, _ = tef.FM_Get_Processing_Status()
|
|
stereo = stereo if stereo is not None else 1000
|
|
|
|
status, level, usn, wam, _, bandwidth, _ = tef.FM_Get_Quality_Data()
|
|
if status is None or level is None or wam is None or usn is None or bandwidth is None: return
|
|
|
|
level = level / 10
|
|
|
|
data = b"S"
|
|
if state['forced_mono']: data += b"M"
|
|
elif stereo < 500: data += b"s"
|
|
else: data += b"m"
|
|
|
|
data += f"{level},{wam//10},{usn//10},{bandwidth}\n\n".encode()
|
|
conn.sendall(data)
|
|
|
|
def send_rds_data(tef, conn):
|
|
status, A, B, C, D, dec_error = tef.FM_Get_RDS_Data__decoder()
|
|
if None in (status, A, B, C, D, dec_error): return
|
|
|
|
if (status & (1 << 9) == 0) or (status & (1 << 15) == 0): return
|
|
|
|
err = 0
|
|
err |= ((dec_error >> 8) & 0x30) >> 4
|
|
err |= ((dec_error >> 8) & 0xC)
|
|
err |= ((dec_error >> 8) & 3) << 4
|
|
|
|
data = b"R"
|
|
data += f"{B:04X}{C:04X}{D:04X}{err:02X}".encode()
|
|
|
|
pi_error = dec_error >> 14
|
|
if pi_error < 3:
|
|
data += b"\nP" + f"{A:04X}".encode()
|
|
data += b"?" * pi_error + b"\n"
|
|
elif status & (1 << 12):
|
|
pi_error = (dec_error >> 10) & 0b11
|
|
if pi_error < 3:
|
|
data += b"\nP" + f"{C:04X}".encode()
|
|
data += b"?" * pi_error + b"\n"
|
|
|
|
conn.sendall(data)
|
|
|
|
def run_server():
|
|
tef = init_tef()
|
|
state = {
|
|
'last_tune': 9500,
|
|
'last_eqims': 0b11,
|
|
'forced_mono': False,
|
|
'deemp': 0,
|
|
'bw': 0,
|
|
'scan_start': 87500,
|
|
'scan_stop': 108000,
|
|
'scan_step': 100,
|
|
'scan_bw': 0,
|
|
}
|
|
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind((HOST, PORT))
|
|
s.listen()
|
|
print(f"Server listening on {HOST}:{PORT}")
|
|
|
|
while True:
|
|
conn, addr = s.accept()
|
|
with conn:
|
|
print(f"Connected by {addr}")
|
|
if not authenticate(conn):
|
|
print("Authentication failed.")
|
|
continue
|
|
|
|
# Send initial state
|
|
conn.sendall(f"T{state['last_tune']*10}\n".encode())
|
|
conn.sendall(f"G{bin(state['last_eqims']).removeprefix('0b').zfill(2)}\n".encode())
|
|
conn.sendall(f"B{int(state['forced_mono'])}\n".encode())
|
|
conn.sendall(f"D{state['deemp']}\n".encode())
|
|
conn.sendall(f"W{state['bw']}\n".encode())
|
|
conn.setblocking(False)
|
|
|
|
ss_timer = libtimer.Timer()
|
|
rds_timer = libtimer.Timer()
|
|
|
|
while True:
|
|
try:
|
|
data = conn.recv(1024)
|
|
if not data: break
|
|
resp = process_command(tef, data, state, conn)
|
|
if resp: conn.sendall(resp)
|
|
except ConnectionResetError: break
|
|
except ConnectionAbortedError: break
|
|
except BlockingIOError:
|
|
if ss_timer.get_time() > SS_UPDATE_INTERVAL:
|
|
send_signal_status(tef, conn, state)
|
|
ss_timer.reset()
|
|
if rds_timer.get_time() > RDS_UPDATE_INTERVAL:
|
|
send_rds_data(tef, conn)
|
|
rds_timer.reset()
|
|
time.sleep(0.01)
|
|
finally: tef.close()
|
|
|
|
if __name__ == "__main__": run_server() |