Merge pull request #1411 from turannul/pr/bluetooth-rework

Bluetooth rework
This commit is contained in:
Lemmy
2026-01-18 14:11:30 -05:00
committed by GitHub
4 changed files with 387 additions and 4 deletions
+216
View File
@@ -0,0 +1,216 @@
#!/usr/bin/env python3
import sys
import time
import subprocess
import pty
import os
import select
import errno
def log(msg):
sys.stderr.write(f"[bluetooth-connect] {msg}\n")
def main():
if len(sys.argv) < 5:
log("Usage: bluetooth-connect.sh <addr> <pairWaitSeconds> <attempts> <intervalSec>")
sys.exit(2)
addr = sys.argv[1]
# We won't use pair_wait_seconds in the same way, but we'll respect the timeout logic.
pair_wait_seconds = float(sys.argv[2])
if pair_wait_seconds < 15:
log(f"Warning: pairWaitSeconds ({pair_wait_seconds}) is too short. Enforcing 15s minimum.")
pair_wait_seconds = 15.0
attempts = int(sys.argv[3])
interval_sec = float(sys.argv[4])
if not addr or len(addr) < 17:
# Basic MAC address length check
log(f"Invalid Bluetooth address: '{addr}'")
sys.exit(2)
# Master/Slave PTY for interactive control
master_fd, slave_fd = pty.openpty()
# Start bluetoothctl
proc = subprocess.Popen(['bluetoothctl'], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, close_fds=True, text=True)
os.close(slave_fd)
def send_command(cmd):
log(f"Sending: {cmd}")
os.write(master_fd, (cmd + "\n").encode('utf-8'))
def read_output(timeout=1.0):
# Reads available output from master_fd
output = b""
end_time = time.time() + timeout
while time.time() < end_time:
r, _, _ = select.select([master_fd], [], [], 0.1)
if master_fd in r:
try:
data = os.read(master_fd, 1024)
if not data:
break
output += data
except OSError as e:
if e.errno == errno.EIO:
break
raise
else:
pass
return output.decode('utf-8', errors='replace')
log("Initializing bluetoothctl...")
time.sleep(1) # Wait for startup
initial_out = read_output(timeout=1)
# print(initial_out) # Debug
send_command("agent on")
send_command("default-agent")
send_command("power on")
time.sleep(1)
# Clean start
log(f"Removing {addr} to start fresh...")
send_command(f"remove {addr}")
time.sleep(2)
# Scan and wait for discovery
log("Scanning for device (30s timeout)...")
send_command("scan on")
found = False
scan_start = time.time()
while time.time() - scan_start < 30: # Wait up to 30s to find it
out = read_output(timeout=0.5)
if out:
print(out, end='')
# Split lines to handle mixed output safely
for line in out.splitlines():
if addr in line:
if "[DEL]" in line:
continue
if "[NEW]" in line or "[CHG]" in line:
if "not available" not in line:
log(f"Device {addr} discovered!")
found = True
break
if found:
break
if not found:
log("Device not found in scan. Trying to pair anyway...")
# Pair
send_command(f"pair {addr}")
# Loop to watch for confirmation or success
start_time = time.time()
paired = False
log("Waiting for pairing logic...")
while time.time() - start_time < pair_wait_seconds:
out = read_output(timeout=0.5)
if out:
print(out, end='')
# Numberic Comparison (NC) 1 of 4 - Tested pairing with my iPhone.
if "Confirm passkey" in out or "yes/no" in out or "Request confirmation" in out:
log("Detected passkey prompt. Sending 'yes'.")
send_command("yes")
# Authorization Request
if "Authorize service" in out or "Request authorization" in out:
log("Detected authorization request. Sending 'yes'.")
send_command("yes")
# Passkey Display (User needs to type this on the remote device, e.g. Keyboard)
if "Passkey:" in out:
for line in out.splitlines():
if "Passkey:" in line:
log(f"ACTION REQUIRED: {line.strip()} (Type this on the device)")
# Interactive PIN/Passkey Entry (Device displays code, User must enter on PC)
if "Enter passkey" in out or "Enter PIN code" in out:
log("Device requested PIN/Passkey. Waiting for user input...")
print("[PIN_REQ]")
sys.stdout.flush()
try:
# Read PIN from stdin (blocking)
user_pin = sys.stdin.readline().strip()
if user_pin:
log(f"Sending PIN: {user_pin}")
send_command(user_pin)
else:
log("Empty PIN received. Aborting.")
break
except Exception as e:
log(f"Error reading stdin: {e}")
break
# Just Works (JW) is implicit (no prompt)
if "Pairing successful" in out or "Paired: yes" in out or "Bonded: yes" in out:
paired = True
log("Pairing successful detected in stream.")
break
if "Failed to pair" in out:
log("Pairing failed explicitly.")
break
if "Already joined" in out or "Already exists" in out:
paired = True
log("Device already paired.")
break
# Double check pairing status via info command if not sure
if not paired:
send_command(f"info {addr}")
time.sleep(1)
out = read_output(timeout=1)
if "Paired: yes" in out:
paired = True
if paired:
log("Device is paired. Trusting...")
send_command(f"trust {addr}")
time.sleep(1)
log("Connecting...")
connected = False
for i in range(attempts):
send_command(f"connect {addr}")
# Wait a bit for connection
time.sleep(interval_sec)
# Check status
send_command(f"info {addr}")
time.sleep(1)
out = read_output(timeout=1)
if "Connected: yes" in out:
log("Connected successfully.")
connected = True
break
else:
log(f"Connection attempt {i + 1}/{attempts} failed. Retrying...")
if connected:
send_command("quit")
sys.exit(0)
else:
log("Failed to connect after all attempts.")
send_command("quit")
sys.exit(1)
else:
log("Failed to pair within timeout.")
send_command("quit")
sys.exit(1)
if __name__ == "__main__":
main()
+28
View File
@@ -0,0 +1,28 @@
#!/usr/bin/env python3
import sys
import time
print("Mocking Bluetooth Pairing...", flush=True)
time.sleep(1)
print("Simulating PIN request from device...", flush=True)
print("[PIN_REQ]", flush=True)
try:
print("Waiting for PIN input...", flush=True)
pin = sys.stdin.readline().strip()
with open("/tmp/pin_test.log", "w") as f:
f.write(f"SUCCESS: Received PIN from UI: '{pin}'\n")
print(f"Received PIN: {pin}", flush=True)
time.sleep(1)
print("Pairing successful", flush=True)
print("Connected successfully.", flush=True)
sys.exit(0)
except Exception as e:
with open("/tmp/pin_test_error.log", "w") as f:
f.write(f"ERROR: {e}\n")
sys.exit(1)
@@ -383,5 +383,95 @@ SmartPanel {
}
}
}
// PIN Authentication Overlay
Rectangle {
id: pinOverlay
anchors.fill: parent
color: Color.mSurface
visible: BluetoothService.pinRequired
// Trap all input
MouseArea {
anchors.fill: parent
acceptedButtons: Qt.AllButtons
onClicked: mouse => mouse.accepted = true
onWheel: wheel => wheel.accepted = true
}
ColumnLayout {
anchors.centerIn: parent
width: parent.width * 0.85
spacing: Style.marginL
NIcon {
icon: "lock"
pointSize: 48
color: Color.mPrimary
Layout.alignment: Qt.AlignHCenter
}
NText {
text: I18n.tr("common.authentication-required")
pointSize: Style.fontSizeXL
font.weight: Style.fontWeightBold
color: Color.mOnSurface
horizontalAlignment: Text.AlignHCenter
}
NText {
text: I18n.tr("bluetooth.panel.pin-instructions")
pointSize: Style.fontSizeM
color: Color.mOnSurfaceVariant
wrapMode: Text.WordWrap
horizontalAlignment: Text.AlignHCenter
Layout.fillWidth: true
}
NTextInput {
id: pinInput
Layout.fillWidth: true
placeholderText: "123456"
inputIconName: "key"
// Clear text when overlay appears
onVisibleChanged: {
if (visible) {
text = "";
inputItem.forceActiveFocus();
}
}
// Submit on Enter
inputItem.onAccepted: {
if (text.length > 0) {
BluetoothService.submitPin(text);
text = "";
}
}
}
RowLayout {
Layout.alignment: Qt.AlignHCenter
spacing: Style.marginM
NButton {
text: I18n.tr("common.cancel")
icon: "x"
onClicked: BluetoothService.cancelPairing()
}
NButton {
text: I18n.tr("common.confirm")
icon: "check"
backgroundColor: Color.mPrimary
textColor: Color.mOnPrimary
enabled: pinInput.text.length > 0
onClicked: {
BluetoothService.submitPin(pinInput.text);
pinInput.text = "";
}
}
}
}
}
}
}
+53 -4
View File
@@ -516,6 +516,48 @@ Singleton {
}
}
// Interaction state
property bool pinRequired: false
function submitPin(pin) {
if (pairingProcess.running) {
pairingProcess.write(pin + "\n");
root.pinRequired = false;
}
}
function cancelPairing() {
if (pairingProcess.running) {
pairingProcess.kill();
}
root.pinRequired = false;
}
// Interactive pairing process
Process {
id: pairingProcess
stdout: SplitParser {
onRead: data => {
var chunk = data;
if (chunk.indexOf("[PIN_REQ]") !== -1) {
root.pinRequired = true;
Logger.i("Bluetooth", "PIN required for pairing");
ToastService.showNotice(I18n.tr("common.bluetooth"), I18n.tr("bluetooth.panel.pin-required"), "lock");
}
}
}
stderr: StdioCollector {}
onExited: {
root.pinRequired = false;
Logger.i("Bluetooth", "Pairing process exited.");
// Restore discovery if we paused it
if (root._discoveryWasRunning) {
root.setScanActive(true, 0);
}
root._discoveryWasRunning = false;
}
}
// Pair using bluetoothctl which registers its own BlueZ agent internally.
function pairWithBluetoothctl(device) {
if (!device) {
@@ -528,6 +570,12 @@ Singleton {
}
Logger.i("Bluetooth", "pairWithBluetoothctl", addr);
// Stop any previous pairing attempt
if (pairingProcess.running) {
pairingProcess.kill();
}
root.pinRequired = false;
// Compute bounded waits from tunables
const pairWait = Math.max(5, Number(root.pairWaitSeconds) | 0);
@@ -539,10 +587,11 @@ Singleton {
const totalPauseMs = (pairWait * 1000) + (attempts * intervalSec * 1000) + 2000;
_pauseDiscoveryFor(totalPauseMs);
// Prefer external dev script for pairing/connecting; executed detached
const scriptPath = Quickshell.shellDir + "/Scripts/network/bluetooth-connect.sh";
// Use bash explicitly to avoid relying on executable bit in all environments
btExec(["bash", scriptPath, String(addr), String(pairWait), String(attempts), String(intervalSec)]);
// TEST MODE: Mock script
const scriptPath = Quickshell.shellDir + "/Bin/test-pin-mock.py";
pairingProcess.command = ["python3", scriptPath, String(addr), String(pairWait), String(attempts), String(intervalSec)];
pairingProcess.running = true;
}
// Helper to run bluetoothctl and scripts with consistent error logging