#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Updater for MiniDexed
import os
import sys
import tempfile
import zipfile
import requests
import ftplib
import socket
import atexit
import re
import argparse
try :
from zeroconf import ServiceBrowser , ServiceListener , Zeroconf
except ImportError :
print ( " Please install the zeroconf library to use mDNS functionality. " )
print ( " You can install it using: pip install zeroconf " )
sys . exit ( 1 )
class MyListener ( ServiceListener ) :
def __init__ ( self , ip_list , name_list ) :
self . ip_list = ip_list
self . name_list = name_list
def update_service ( self , zc : Zeroconf , type_ : str , name : str ) - > None :
print ( f " Service { name } updated " )
def remove_service ( self , zc : Zeroconf , type_ : str , name : str ) - > None :
print ( f " Service { name } removed " )
def add_service ( self , zc : Zeroconf , type_ : str , name : str ) - > None :
info = zc . get_service_info ( type_ , name )
print ( f " Service { name } added, service info: { info } " )
if info and info . addresses :
# Only add if TXT record contains 'MiniDexed'
txt_records = info . properties
if txt_records :
for k , v in txt_records . items ( ) :
# v may be bytes, decode if needed
val = v . decode ( ) if isinstance ( v , bytes ) else v
if ( b " MiniDexed " in k or b " MiniDexed " in v ) or ( " MiniDexed " in str ( k ) or " MiniDexed " in str ( val ) ) :
ip = socket . inet_ntoa ( info . addresses [ 0 ] )
if ip not in self . ip_list :
self . ip_list . append ( ip )
self . name_list . append ( info . server . rstrip ( ' . ' ) )
break
# Constants
TEMP_DIR = tempfile . gettempdir ( )
# Register cleanup function for temp files
zip_path = None
extract_path = None
def cleanup_temp_files ( ) :
if zip_path and os . path . exists ( zip_path ) :
os . remove ( zip_path )
if extract_path and os . path . exists ( extract_path ) :
for root , dirs , files in os . walk ( extract_path , topdown = False ) :
for name in files :
os . remove ( os . path . join ( root , name ) )
for name in dirs :
os . rmdir ( os . path . join ( root , name ) )
os . rmdir ( extract_path )
print ( " Cleaned up temporary files. " )
atexit . register ( cleanup_temp_files )
# Function to download the latest release
def download_latest_release ( url ) :
response = requests . get ( url , stream = True )
if response . status_code == 200 :
zip_path = os . path . join ( TEMP_DIR , " MiniDexed_latest.zip " )
with open ( zip_path , ' wb ' ) as f :
for chunk in response . iter_content ( chunk_size = 8192 ) :
f . write ( chunk )
return zip_path
return None
# Function to extract the downloaded zip file
def extract_zip ( zip_path ) :
extract_path = os . path . join ( TEMP_DIR , " MiniDexed " )
with zipfile . ZipFile ( zip_path , ' r ' ) as zip_ref :
zip_ref . extractall ( extract_path )
return extract_path
# Function to download the latest release asset using GitHub API
def download_latest_release_github_api ( release_type ) :
"""
release_type : ' latest ' or ' continuous '
Returns : path to downloaded zip or None
"""
import json
headers = { ' Accept ' : ' application/vnd.github.v3+json ' }
repo = ' probonopd/MiniDexed '
if release_type == ' latest ' :
api_url = f ' https://api.github.com/repos/ { repo } /releases/latest '
resp = requests . get ( api_url , headers = headers )
if resp . status_code != 200 :
print ( f " GitHub API error: { resp . status_code } " )
return None
release = resp . json ( )
assets = release . get ( ' assets ' , [ ] )
elif release_type == ' continuous ' :
api_url = f ' https://api.github.com/repos/ { repo } /releases '
resp = requests . get ( api_url , headers = headers )
if resp . status_code != 200 :
print ( f " GitHub API error: { resp . status_code } " )
return None
releases = resp . json ( )
release = next ( ( r for r in releases if ' continuous ' in ( r . get ( ' tag_name ' , ' ' ) + r . get ( ' name ' , ' ' ) ) . lower ( ) ) , None )
if not release :
print ( " No continuous release found. " )
return None
assets = release . get ( ' assets ' , [ ] )
else :
print ( f " Unknown release type: { release_type } " )
return None
asset = next ( ( a for a in assets if a [ ' name ' ] . startswith ( ' MiniDexed ' ) and a [ ' name ' ] . endswith ( ' .zip ' ) ) , None )
if not asset :
print ( " No MiniDexed*.zip asset found in release. " )
return None
url = asset [ ' browser_download_url ' ]
print ( f " Downloading asset: { asset [ ' name ' ] } from { url } " )
resp = requests . get ( url , stream = True )
if resp . status_code == 200 :
zip_path = os . path . join ( TEMP_DIR , asset [ ' name ' ] )
with open ( zip_path , ' wb ' ) as f :
for chunk in resp . iter_content ( chunk_size = 8192 ) :
f . write ( chunk )
return zip_path
print ( f " Failed to download asset: { resp . status_code } " )
return None
if __name__ == " __main__ " :
parser = argparse . ArgumentParser ( description = " MiniDexed Updater " )
parser . add_argument ( " -v " , action = " store_true " , help = " Enable verbose FTP debug output " )
parser . add_argument ( " --ip " , type = str , help = " IP address of the device to upload to (skip mDNS discovery) " )
parser . add_argument ( " --version " , type = int , choices = [ 1 , 2 , 3 ] , help = " Version to upload: 1=Latest official, 2=Continuous, 3=Local build (skip prompt) " )
parser . add_argument ( " --pr " , type = str , help = " Pull request number or URL to fetch build artifacts from PR comment " )
parser . add_argument ( " --github-token " , type = str , help = " GitHub personal access token for downloading PR artifacts (optional, can also use GITHUB_TOKEN env var) " )
args = parser . parse_args ( )
import time
# Check for local kernel*.img files
local_kernel_dir = os . path . join ( os . path . dirname ( __file__ ) , ' src ' )
local_kernel_imgs = [ f for f in os . listdir ( local_kernel_dir ) if f . startswith ( ' kernel ' ) and f . endswith ( ' .img ' ) ]
has_local_build = len ( local_kernel_imgs ) > 0
# Get GitHub token from argument or environment
github_token = args . github_token or os . environ . get ( " GITHUB_TOKEN " )
# Ask user which release to download (numbered choices)
release_options = [
( " Latest official release " , " https://github.com/probonopd/MiniDexed/releases/expanded_assets/latest " ) ,
( " Continuous (experimental) build " , " https://github.com/probonopd/MiniDexed/releases/expanded_assets/continuous " )
]
if has_local_build :
release_options . append ( ( " Local build (from src/) " , None ) )
if args . version :
selected_idx = args . version - 1
if selected_idx < 0 or selected_idx > = len ( release_options ) :
print ( f " Invalid version selection: { args . version } " )
sys . exit ( 1 )
github_url = release_options [ selected_idx ] [ 1 ]
else :
print ( " Which release do you want to update? " )
for idx , ( desc , _ ) in enumerate ( release_options ) :
print ( f " [ { idx + 1 } ] { desc } " )
print ( " [PR] Pull request build (enter PR number or URL) " )
while True :
choice = input ( f " Enter the number of your choice (1- { len ( release_options ) } ) or PR number: " ) . strip ( )
if choice . isdigit ( ) and 1 < = int ( choice ) < = len ( release_options ) :
selected_idx = int ( choice ) - 1
github_url = release_options [ selected_idx ] [ 1 ]
args . pr = None
break
# Accept PR number, #NNN, or PR URL
pr_match = re . match ( r ' ^(#? \ d+|https?://github.com/[^/]+/[^/]+/pull/ \ d+)$ ' , choice )
if pr_match :
args . pr = choice
selected_idx = None
github_url = None
break
print ( " Invalid selection. Please enter a valid number or PR number/URL. " )
# If local build is selected, skip all GitHub/zip logic and do not register cleanup
use_local_build = has_local_build and selected_idx == len ( release_options ) - 1
if args . pr :
# Extract PR number from input (accepts URL, #899, or 899)
import re
pr_input = args . pr . strip ( )
m = re . search ( r ' ( \ d+)$ ' , pr_input )
if not m :
print ( f " Could not parse PR number from: { pr_input } " )
sys . exit ( 1 )
pr_number = m . group ( 1 )
# Fetch PR page HTML
pr_url = f " https://github.com/probonopd/MiniDexed/pull/ { pr_number } "
print ( f " Fetching PR page: { pr_url } " )
resp = requests . get ( pr_url )
if resp . status_code != 200 :
print ( f " Failed to fetch PR page: { resp . status_code } " )
sys . exit ( 1 )
html = resp . text
# Find all 'Build for testing' artifact blocks (look for <p dir="auto">Build for testing: ...</p>)
import html as ihtml
import re
pattern = re . compile ( r ' <p dir= " auto " >Build for testing:(.*?)Use at your own risk \ . ' , re . DOTALL )
matches = pattern . findall ( html )
if not matches :
print ( " No build artifact links found in PR comment. " )
sys . exit ( 1 )
last_block = matches [ - 1 ]
# Find all artifact links in the last block
link_pattern = re . compile ( r ' <a href= " ([^ " ]+) " >([^<]+)</a> ' )
links = link_pattern . findall ( last_block )
if not links :
print ( " No artifact links found in PR comment block. " )
sys . exit ( 1 )
# Download both 32bit and 64bit artifacts if present
artifact_paths = [ ]
for url , name in links :
print ( f " Downloading artifact: { name } from { url } " )
# Try to extract the artifact ID from the URL
m = re . search ( r ' /artifacts/( \ d+) ' , url )
if m and github_token :
artifact_id = m . group ( 1 )
api_url = f " https://api.github.com/repos/probonopd/MiniDexed/actions/artifacts/ { artifact_id } /zip "
headers = {
" Authorization " : f " Bearer { github_token } " ,
" Accept " : " application/vnd.github.v3+json " ,
" User-Agent " : " Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 "
}
resp = requests . get ( api_url , stream = True , headers = headers )
if resp . status_code == 200 :
local_path = os . path . join ( TEMP_DIR , name + " .zip " )
with open ( local_path , ' wb ' ) as f :
for chunk in resp . iter_content ( chunk_size = 8192 ) :
f . write ( chunk )
artifact_paths . append ( local_path )
else :
print ( f " Failed to download artifact { name } via GitHub API: { resp . status_code } " )
print ( f " Request headers: { resp . request . headers } " )
print ( f " Response headers: { resp . headers } " )
print ( f " Response URL: { resp . url } " )
print ( f " Response text (first 500 chars): { resp . text [ : 500 ] } " )
else :
# Fallback to direct link if no artifact ID or no token
headers = { }
if github_token :
headers [ " Authorization " ] = f " Bearer { github_token } "
headers [ " Accept " ] = " application/octet-stream "
headers [ " User-Agent " ] = " Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 "
resp = requests . get ( url , stream = True , headers = headers )
if resp . status_code == 200 :
local_path = os . path . join ( TEMP_DIR , name + " .zip " )
with open ( local_path , ' wb ' ) as f :
for chunk in resp . iter_content ( chunk_size = 8192 ) :
f . write ( chunk )
artifact_paths . append ( local_path )
else :
print ( f " Failed to download artifact { name } : { resp . status_code } " )
print ( f " Request headers: { resp . request . headers } " )
print ( f " Response headers: { resp . headers } " )
print ( f " Response URL: { resp . url } " )
print ( f " Response text (first 500 chars): { resp . text [ : 500 ] } " )
if not github_token :
print ( " You may need to provide a GitHub personal access token using --github-token or the GITHUB_TOKEN environment variable. " )
if not artifact_paths :
print ( " No artifacts downloaded. " )
sys . exit ( 1 )
# Extract all downloaded zips
extract_paths = [ ]
for path in artifact_paths :
ep = extract_zip ( path )
print ( f " Extracted { path } to { ep } " )
extract_paths . append ( ep )
# Use the first extracted path for further logic (or merge as needed)
extract_path = extract_paths [ 0 ] if extract_paths else None
use_local_build = False
elif use_local_build :
# Remove cleanup function if registered
atexit . unregister ( cleanup_temp_files )
print ( " Using local build: src/kernel*.img will be uploaded. " )
extract_path = None
else :
# Use GitHub API instead of HTML parsing
if selected_idx == 0 :
zip_path = download_latest_release_github_api ( ' latest ' )
elif selected_idx == 1 :
zip_path = download_latest_release_github_api ( ' continuous ' )
else :
zip_path = None
if zip_path :
print ( f " Downloaded to: { zip_path } " )
extract_path = extract_zip ( zip_path )
print ( f " Extracted to: { extract_path } " )
else :
print ( " Failed to download the release. " )
sys . exit ( 1 )
# Ask user if they want to update Performances (default no)
if not use_local_build :
update_perf = input ( " Do you want to update the Performances? This will OVERWRITE all existing performances. [y/N]: " ) . strip ( ) . lower ( )
update_performances = update_perf == ' y '
else :
update_performances = False
# Using mDNS to find the IP address of the device(s) that advertise the FTP service "_ftp._tcp."
ip_addresses = [ ]
device_names = [ ]
selected_ip = None
selected_name = None
if args . ip :
selected_ip = args . ip
selected_name = args . ip
else :
zeroconf = Zeroconf ( )
listener = MyListener ( ip_addresses , device_names )
browser = ServiceBrowser ( zeroconf , " _ftp._tcp.local. " , listener )
try :
print ( " Searching for devices... " )
time . sleep ( 10 )
if ip_addresses :
print ( " Devices found: " )
for idx , ( name , ip ) in enumerate ( zip ( device_names , ip_addresses ) ) :
print ( f " [ { idx + 1 } ] { name } ( { ip } ) " )
while True :
selection = input ( f " Enter the number of the device to upload to (1- { len ( ip_addresses ) } ): " ) . strip ( )
if selection . isdigit ( ) and 1 < = int ( selection ) < = len ( ip_addresses ) :
selected_ip = ip_addresses [ int ( selection ) - 1 ]
selected_name = device_names [ int ( selection ) - 1 ]
break
print ( " Invalid selection. Please enter a valid number. " )
else :
print ( " No devices found. " )
sys . exit ( 1 )
finally :
zeroconf . close ( )
print ( " Devices found: " , list ( zip ( device_names , ip_addresses ) ) )
# Log into the selected device and upload the new version of MiniDexed
print ( f " Connecting to { selected_name } ( { selected_ip } )... " )
try :
ftp = ftplib . FTP ( )
if args . v :
ftp . set_debuglevel ( 2 )
ftp . connect ( selected_ip , 21 , timeout = 10 )
ftp . login ( " admin " , " admin " )
ftp . set_pasv ( True )
print ( f " Connected to { selected_ip } (passive mode). " )
# --- Performances update logic ---
if update_performances and not use_local_build :
print ( " Updating Performance: recursively deleting and uploading /SD/performance directory... " )
def ftp_rmdirs ( ftp , path ) :
try :
items = ftp . nlst ( path )
except Exception as e :
print ( f " [WARN] Could not list { path } : { e } " )
return
for item in items :
if item in [ ' . ' , ' .. ' , path ] :
continue
full_path = f " { path } / { item } " if not item . startswith ( path ) else item
try :
# Try to delete as a file first
ftp . delete ( full_path )
print ( f " Deleted file: { full_path } " )
except Exception :
# If not a file, try as a directory
try :
ftp_rmdirs ( ftp , full_path )
ftp . rmd ( full_path )
print ( f " Deleted directory: { full_path } " )
except Exception as e :
print ( f " [WARN] Could not delete { full_path } : { e } " )
try :
ftp_rmdirs ( ftp , ' /SD/performance ' )
try :
ftp . rmd ( ' /SD/performance ' )
print ( " Deleted /SD/performance on device. " )
except Exception as e :
print ( f " [WARN] Could not delete /SD/performance directory itself: { e } " )
except Exception as e :
print ( f " Warning: Could not delete /SD/performance: { e } " )
# Upload extracted performance/ recursively
local_perf = os . path . join ( extract_path , ' performance ' )
def ftp_mkdirs ( ftp , path ) :
try :
ftp . mkd ( path )
except Exception :
pass
def ftp_upload_dir ( ftp , local_dir , remote_dir ) :
ftp_mkdirs ( ftp , remote_dir )
for item in os . listdir ( local_dir ) :
lpath = os . path . join ( local_dir , item )
rpath = f " { remote_dir } / { item } "
if os . path . isdir ( lpath ) :
ftp_upload_dir ( ftp , lpath , rpath )
else :
with open ( lpath , ' rb ' ) as fobj :
ftp . storbinary ( f ' STOR { rpath } ' , fobj )
print ( f " Uploaded { rpath } " )
if os . path . isdir ( local_perf ) :
ftp_upload_dir ( ftp , local_perf , ' /SD/performance ' )
print ( " Uploaded new /SD/performance directory. " )
else :
print ( " No extracted performance/ directory found, skipping upload. " )
# Upload performance.ini if it exists in extract_path
local_perfini = os . path . join ( extract_path , ' performance.ini ' )
if os . path . isfile ( local_perfini ) :
with open ( local_perfini , ' rb ' ) as fobj :
ftp . storbinary ( ' STOR /SD/performance.ini ' , fobj )
print ( " Uploaded /SD/performance.ini. " )
else :
print ( " No extracted performance.ini found, skipping upload. " )
# Upload kernel files
if use_local_build :
for file in local_kernel_imgs :
local_path = os . path . join ( local_kernel_dir , file )
remote_path = f " /SD/ { file } "
# Check if file exists on FTP server
file_exists = False
try :
ftp . cwd ( " /SD " )
if file in ftp . nlst ( ) :
file_exists = True
except Exception as e :
print ( f " Error checking for { file } on FTP server: { e } " )
file_exists = False
if not file_exists :
print ( f " Skipping { file } : does not exist on device. " )
continue
filesize = os . path . getsize ( local_path )
uploaded = [ 0 ]
def progress_callback ( data ) :
uploaded [ 0 ] + = len ( data )
percent = uploaded [ 0 ] * 100 / / filesize
print ( f " \r Uploading { file } : { percent } % " , end = " " , flush = True )
with open ( local_path , ' rb ' ) as f :
ftp . storbinary ( f ' STOR { remote_path } ' , f , 8192 , callback = progress_callback )
print ( f " \n Uploaded { file } to { selected_ip } . " )
else :
for root , dirs , files in os . walk ( extract_path ) :
for file in files :
if file . startswith ( " kernel " ) and file . endswith ( " .img " ) :
local_path = os . path . join ( root , file )
remote_path = f " /SD/ { file } "
# Check if file exists on FTP server
file_exists = False
try :
ftp . cwd ( " /SD " )
if file in ftp . nlst ( ) :
file_exists = True
except Exception as e :
print ( f " Error checking for { file } on FTP server: { e } " )
file_exists = False
if not file_exists :
print ( f " Skipping { file } : does not exist on device. " )
continue
filesize = os . path . getsize ( local_path )
uploaded = [ 0 ]
def progress_callback ( data ) :
uploaded [ 0 ] + = len ( data )
percent = uploaded [ 0 ] * 100 / / filesize
print ( f " \r Uploading { file } : { percent } % " , end = " " , flush = True )
with open ( local_path , ' rb ' ) as f :
ftp . storbinary ( f ' STOR { remote_path } ' , f , 8192 , callback = progress_callback )
print ( f " \n Uploaded { file } to { selected_ip } . " )
except ftplib . all_errors as e :
print ( f " FTP error: { e } " )
ftp = None # Mark ftp as unusable
# Only attempt to send BYE if ftp is connected and has a socket
try :
if ftp is not None and getattr ( ftp , ' sock ' , None ) is not None :
ftp . sendcmd ( " BYE " )
print ( f " Disconnected from { selected_ip } . " )
else :
print ( f " No active FTP connection to disconnect from. " )
except ( * ftplib . all_errors , AttributeError ) as e :
# Handle timeout or already closed connection
if isinstance ( e , TimeoutError ) or ( str ( e ) . strip ( ) . lower ( ) == " timed out " ) :
print ( f " Disconnected from { selected_ip } (timeout after BYE, device likely restarted). " )
else :
print ( f " FTP error after BYE: { e } " )