Tweak for Web Workflow with sub paths
This commit is contained in:
parent
01cceec6bf
commit
6cc6a1df39
2 changed files with 50 additions and 18 deletions
|
|
@ -18,7 +18,7 @@ import sys
|
|||
import tempfile
|
||||
import zipfile
|
||||
from subprocess import check_output
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, urljoin
|
||||
|
||||
import appdirs
|
||||
import click
|
||||
|
|
@ -264,9 +264,9 @@ class Module:
|
|||
"""
|
||||
self.name = name
|
||||
self.backend = backend
|
||||
self.path = os.path.join(backend.library_path, name)
|
||||
url = urlparse(self.path)
|
||||
if url.scheme == "http":
|
||||
self.path = urljoin(backend.library_path,name, allow_fragments=False) if isinstance(backend,WebBackend) else os.path.join(backend.library_path, name)
|
||||
url = urlparse(self.path,allow_fragments=False)
|
||||
if str(url.scheme).lower() in ("http", "https"):
|
||||
if url.path.endswith(".py") or url.path.endswith(".mpy"):
|
||||
self.file = os.path.basename(url.path)
|
||||
self.name = (
|
||||
|
|
@ -274,7 +274,7 @@ class Module:
|
|||
)
|
||||
else:
|
||||
self.file = None
|
||||
self.name = os.path.basename(url.path[:-1])
|
||||
self.name = os.path.basename(url.path if url.path[:-1]=='/' else url.path[:-1])
|
||||
else:
|
||||
if os.path.isfile(self.path):
|
||||
# Single file module.
|
||||
|
|
@ -608,6 +608,12 @@ def find_modules(backend, bundles_list):
|
|||
if not path.endswith(os.sep)
|
||||
else path[:-1].split(os.sep)[-1] + os.sep
|
||||
)
|
||||
print("old path module name: ", module_name)
|
||||
module_name = name if not path.contains(os.sep) else module_name # should probably check for os.sep and use previous version if found
|
||||
print("new path module name: ", module_name)
|
||||
print("Name: ", name)
|
||||
print("Path: ", path)
|
||||
print("Module_name: ", module_name)
|
||||
m = Module(
|
||||
module_name,
|
||||
backend,
|
||||
|
|
@ -618,6 +624,7 @@ def find_modules(backend, bundles_list):
|
|||
bundle,
|
||||
compatibility,
|
||||
)
|
||||
print("Module: ", m)
|
||||
result.append(m)
|
||||
return result
|
||||
except Exception as ex:
|
||||
|
|
@ -1225,13 +1232,16 @@ def install(ctx, modules, pyext, requirement, auto, auto_file): # pragma: no co
|
|||
elif auto or auto_file:
|
||||
if auto_file is None:
|
||||
auto_file = "code.py"
|
||||
print(f"Auto file: {auto_file}")
|
||||
# pass a local file with "./" or "../"
|
||||
is_relative = auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
|
||||
is_relative = not isinstance(ctx.obj["backend"], WebBackend) or auto_file.split(os.sep)[0] in [os.path.curdir, os.path.pardir]
|
||||
print("is rel: ", is_relative, "is abs: ", os.path.isabs(auto_file))
|
||||
if not os.path.isabs(auto_file) and not is_relative:
|
||||
auto_file = ctx.obj["backend"].get_file_path(auto_file or "code.py")
|
||||
print(f"Auto file: {auto_file}")
|
||||
|
||||
auto_file_path = ctx.obj["backend"].get_auto_file_path(auto_file)
|
||||
|
||||
print(f"Auto file path: {auto_file_path}")
|
||||
if not os.path.isfile(auto_file_path):
|
||||
click.secho(f"Auto file not found: {auto_file}", fg="red")
|
||||
sys.exit(1)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import os
|
|||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, urljoin
|
||||
import click
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
|
@ -114,7 +114,7 @@ class Backend:
|
|||
click.echo("'{}' is already installed.".format(name))
|
||||
return
|
||||
|
||||
library_path = os.path.join(device_path, self.LIB_DIR_PATH)
|
||||
library_path = os.path.join(device_path, self.LIB_DIR_PATH) if not isinstance(self,WebBackend) else urljoin(device_path,self.LIB_DIR_PATH)
|
||||
|
||||
# Create the library directory first.
|
||||
self._create_library_directory(device_path, library_path)
|
||||
|
|
@ -197,12 +197,13 @@ class WebBackend(Backend):
|
|||
Install file to device using web workflow.
|
||||
:param source source file.
|
||||
"""
|
||||
|
||||
file_name = source.split(os.path.sep)
|
||||
file_name = file_name[-2] if file_name[-1] == "" else file_name[-1]
|
||||
target = (
|
||||
self.device_location
|
||||
+ "/"
|
||||
+ self.LIB_DIR_PATH
|
||||
+ source.split(os.path.sep)[-1]
|
||||
+ file_name
|
||||
)
|
||||
url = urlparse(target)
|
||||
auth = HTTPBasicAuth("", url.password)
|
||||
|
|
@ -218,11 +219,13 @@ class WebBackend(Backend):
|
|||
Install directory to device using web workflow.
|
||||
:param source source directory.
|
||||
"""
|
||||
mod_name = source.split(os.path.sep)
|
||||
mod_name = mod_name[-2] if mod_name[-1] == "" else mod_name[-1]
|
||||
target = (
|
||||
self.device_location
|
||||
+ "/"
|
||||
+ self.LIB_DIR_PATH
|
||||
+ source.split(os.path.sep)[-1]
|
||||
+ mod_name
|
||||
)
|
||||
url = urlparse(target)
|
||||
auth = HTTPBasicAuth("", url.password)
|
||||
|
|
@ -230,7 +233,7 @@ class WebBackend(Backend):
|
|||
print(f"source: {source}")
|
||||
|
||||
# Create the top level directory.
|
||||
r = requests.put(target + ("/" if not target.endswith("/") else ""), auth=auth)
|
||||
r = requests.put((target + "/" if target[:-1]!="/" else target), auth=auth)
|
||||
print(f"resp {r.content}")
|
||||
r.raise_for_status()
|
||||
|
||||
|
|
@ -241,12 +244,16 @@ class WebBackend(Backend):
|
|||
rel_path = ""
|
||||
for name in files:
|
||||
with open(os.path.join(root, name), "rb") as fp:
|
||||
path_to_create = urljoin( urljoin(target , rel_path + "/", allow_fragments=False) , name, allow_fragments=False) if rel_path != "" else urljoin(target , name, allow_fragments=False)
|
||||
# print(f"file_path_to_create: {path_to_create}")
|
||||
r = requests.put(
|
||||
target + rel_path + "/" + name, fp.read(), auth=auth
|
||||
path_to_create, fp.read(), auth=auth
|
||||
)
|
||||
r.raise_for_status()
|
||||
for name in dirs:
|
||||
r = requests.put(target + rel_path + "/" + name, auth=auth)
|
||||
path_to_create = urljoin( urljoin(target , rel_path + "/", allow_fragments=False) , name, allow_fragments=False) if rel_path != "" else urljoin(target , name, allow_fragments=False)
|
||||
# print(f"dir_path_to_create: {path_to_create}")
|
||||
r = requests.put(path_to_create, auth=auth)
|
||||
r.raise_for_status()
|
||||
|
||||
def get_circuitpython_version(self):
|
||||
|
|
@ -312,7 +319,10 @@ class WebBackend(Backend):
|
|||
:param url: URL of the device.
|
||||
"""
|
||||
for dm in directory_mods:
|
||||
dm_url = url + dm + "/"
|
||||
if not str(urlparse(dm).scheme).lower() in ("http", "https"):
|
||||
dm_url = url + dm + "/"
|
||||
else:
|
||||
dm_url = dm
|
||||
r = requests.get(dm_url, auth=auth, headers={"Accept": "application/json"})
|
||||
r.raise_for_status()
|
||||
mpy = False
|
||||
|
|
@ -353,7 +363,10 @@ class WebBackend(Backend):
|
|||
:param url: URL of the device.
|
||||
"""
|
||||
for sfm in single_file_mods:
|
||||
sfm_url = url + sfm
|
||||
if not str(urlparse(sfm).scheme).lower() in ("http", "https"):
|
||||
sfm_url = url + sfm
|
||||
else:
|
||||
sfm_url = sfm
|
||||
r = requests.get(sfm_url, auth=auth)
|
||||
r.raise_for_status()
|
||||
idx = sfm.rfind(".")
|
||||
|
|
@ -468,7 +481,7 @@ class WebBackend(Backend):
|
|||
"""
|
||||
retuns the full path on the device to a given file name.
|
||||
"""
|
||||
return os.path.join(self.device_location, "fs", filename)
|
||||
return urljoin( urljoin(self.device_location, "fs/", allow_fragments=False), filename, allow_fragments=False)
|
||||
|
||||
def is_device_present(self):
|
||||
"""
|
||||
|
|
@ -480,6 +493,15 @@ class WebBackend(Backend):
|
|||
except requests.exceptions.ConnectionError:
|
||||
return False
|
||||
|
||||
def get_device_versions(self):
|
||||
"""
|
||||
Returns a dictionary of metadata from modules on the connected device.
|
||||
|
||||
:param str device_url: URL for the device.
|
||||
:return: A dictionary of metadata about the modules available on the
|
||||
connected device.
|
||||
"""
|
||||
return self.get_modules(urljoin(self.device_location, self.LIB_DIR_PATH))
|
||||
|
||||
class DiskBackend(Backend):
|
||||
"""
|
||||
|
|
|
|||
Loading…
Reference in a new issue