mirror of
https://github.com/noctalia-dev/noctalia-shell.git
synced 2026-05-11 17:08:27 +08:00
Merge pull request #1681 from turannul/pr/bt-py-bridge
Increase bluetooth pair timeout
This commit is contained in:
Executable → Regular
+26
-56
@@ -8,21 +8,21 @@ import select
|
||||
import errno
|
||||
|
||||
|
||||
def log(msg):
|
||||
sys.stderr.write(f"[bluetooth-connect] {msg}\n")
|
||||
def log(msg) -> None:
|
||||
sys.stderr.write(f"[pair] {msg}\n")
|
||||
|
||||
|
||||
def main():
|
||||
def pair_fast():
|
||||
if len(sys.argv) < 5:
|
||||
log("Usage: bluetooth-connect.py <addr> <pairWaitSeconds> <attempts> <intervalSec>")
|
||||
log("Usage: bluetooth-pair.py <addr> <pairWaitSeconds> <attempts> <intervalSec>")
|
||||
sys.exit(2)
|
||||
|
||||
addr = sys.argv[1]
|
||||
# We won't use pair_wait_seconds in the same way, but we'll respect the timeout logic.
|
||||
pair_wait_seconds = float(sys.argv[2])
|
||||
if pair_wait_seconds < 15:
|
||||
log(f"Warning: pairWaitSeconds ({pair_wait_seconds}) is too short. Enforcing 15s minimum.")
|
||||
pair_wait_seconds = 15.0
|
||||
if pair_wait_seconds < 30:
|
||||
log(f"Warning: pairWaitSeconds ({pair_wait_seconds}s) is too short. Enforcing 45s minimum.")
|
||||
pair_wait_seconds = 45.0
|
||||
|
||||
attempts = int(sys.argv[3])
|
||||
interval_sec = float(sys.argv[4])
|
||||
@@ -32,27 +32,27 @@ def main():
|
||||
log(f"Invalid Bluetooth address: '{addr}'")
|
||||
sys.exit(2)
|
||||
|
||||
# Master/Slave PTY for interactive control
|
||||
master_fd, slave_fd = pty.openpty()
|
||||
# m/s PTY for interactive control
|
||||
mfd, sfd = pty.openpty()
|
||||
|
||||
# Start bluetoothctl
|
||||
proc = subprocess.Popen(['bluetoothctl'], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, close_fds=True, text=True)
|
||||
subprocess.Popen(['bluetoothctl'], stdin=sfd, stdout=sfd, stderr=sfd, close_fds=True, text=True)
|
||||
|
||||
os.close(slave_fd)
|
||||
os.close(sfd)
|
||||
|
||||
def send_command(cmd):
|
||||
log(f"Sending: {cmd}")
|
||||
os.write(master_fd, (cmd + "\n").encode('utf-8'))
|
||||
log(f"Sending cmd: {cmd}")
|
||||
os.write(mfd, (cmd + "\n").encode('utf-8'))
|
||||
|
||||
def read_output(timeout=1.0):
|
||||
# Reads available output from master_fd
|
||||
# Reads available output from mfd
|
||||
output = b""
|
||||
end_time = time.time() + timeout
|
||||
while time.time() < end_time:
|
||||
r, _, _ = select.select([master_fd], [], [], 0.1)
|
||||
if master_fd in r:
|
||||
r, _, _ = select.select([mfd], [], [], 0.1)
|
||||
if mfd in r:
|
||||
try:
|
||||
data = os.read(master_fd, 1024)
|
||||
data = os.read(mfd, 1024)
|
||||
if not data:
|
||||
break
|
||||
output += data
|
||||
@@ -66,52 +66,23 @@ def main():
|
||||
|
||||
log("Initializing bluetoothctl...")
|
||||
time.sleep(1) # Wait for startup
|
||||
initial_out = read_output(timeout=1)
|
||||
# initial_out = read_output(timeout=1)
|
||||
# print(initial_out) # Debug
|
||||
|
||||
send_command("agent on")
|
||||
send_command("default-agent")
|
||||
send_command("power on")
|
||||
time.sleep(1)
|
||||
# send_command("power on") # If we are pairing bluetooth is already powered on
|
||||
time.sleep(0.5)
|
||||
|
||||
# Clean start
|
||||
log(f"Removing {addr} to start fresh...")
|
||||
send_command(f"remove {addr}")
|
||||
time.sleep(2)
|
||||
|
||||
# Scan and wait for discovery
|
||||
log("Scanning for device (30s timeout)...")
|
||||
send_command("scan on")
|
||||
found = False
|
||||
scan_start = time.time()
|
||||
while time.time() - scan_start < 30: # Wait up to 30s to find it
|
||||
out = read_output(timeout=0.5)
|
||||
if out:
|
||||
print(out, end='')
|
||||
# Split lines to handle mixed output safely
|
||||
for line in out.splitlines():
|
||||
if addr in line:
|
||||
if "[DEL]" in line:
|
||||
continue
|
||||
if "[NEW]" in line or "[CHG]" in line:
|
||||
if "not available" not in line:
|
||||
log(f"Device {addr} discovered!")
|
||||
found = True
|
||||
break
|
||||
if found:
|
||||
break
|
||||
|
||||
if not found:
|
||||
log("Device not found in scan. Trying to pair anyway...")
|
||||
|
||||
# Pair
|
||||
# Pair directly since the device is already discovered in the UI/Panel (Removed previous scan/wait part)
|
||||
log(f"Attempting to pair with {addr}...")
|
||||
send_command(f"pair {addr}")
|
||||
|
||||
# Loop to watch for confirmation or success
|
||||
start_time = time.time()
|
||||
paired = False
|
||||
|
||||
log("Waiting for pairing logic...")
|
||||
log("Waiting for pairing sequence start...")
|
||||
while time.time() - start_time < pair_wait_seconds:
|
||||
out = read_output(timeout=0.5)
|
||||
if out:
|
||||
@@ -137,7 +108,7 @@ def main():
|
||||
log("Device requested PIN/Passkey. Waiting for user input...")
|
||||
print("[PIN_REQ]")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
try:
|
||||
# Read PIN from stdin (blocking)
|
||||
user_pin = sys.stdin.readline().strip()
|
||||
@@ -152,7 +123,6 @@ def main():
|
||||
break
|
||||
|
||||
# Just Works (JW) is implicit (no prompt)
|
||||
|
||||
if "Pairing successful" in out or "Paired: yes" in out or "Bonded: yes" in out:
|
||||
paired = True
|
||||
log("Pairing successful detected in stream.")
|
||||
@@ -192,7 +162,7 @@ def main():
|
||||
time.sleep(1)
|
||||
out = read_output(timeout=1)
|
||||
if "Connected: yes" in out:
|
||||
log("Connected successfully.")
|
||||
log("Connected successfully, we are done here.")
|
||||
connected = True
|
||||
break
|
||||
else:
|
||||
@@ -213,4 +183,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
pair_fast()
|
||||
Reference in New Issue
Block a user