Script module implementation

This commit is contained in:
Erki 2025-09-22 21:45:51 +03:00
parent 28af5b1175
commit f5253f089a
3 changed files with 141 additions and 60 deletions

85
enclave_shelly/script.py Normal file
View File

@ -0,0 +1,85 @@
import json
class Script:
def __init__(self, data: dict, shelly: "ShellyBase"):
self.id = data["id"]
self.name = data["name"]
self.enable = data["enable"]
self.running = data["running"]
self.shelly = shelly
def __repr__(self) -> str:
data = {
"id": self.id,
"name": self.name,
"enable": self.enable,
"running": self.running
}
return json.dumps(data)
@staticmethod
async def create_from_id(id: int, shelly: "ShellyBase") -> "Script":
config = await shelly.rpc_request("Script.GetConfig", {"id": id})
status = await shelly.rpc_request("Script.GetStatus", {"id": id})
data = {
"id": id,
"name": config["name"],
"enable": config["enable"],
"running": status["running"]
}
return Script(data, shelly)
async def delete(self):
return await self.shelly.rpc_request("Script.Delete", {"id": self.id})
async def get_config(self):
return await self.shelly.rpc_request("Script.GetConfig", {"id": self.id})
async def set_config(self, config: dict):
return await self.shelly.rpc_request("Script.SetConfig", {"id": self.id, "config": config})
async def get_status(self):
return await self.shelly.rpc_request("Script.GetStatus", {"id": self.id})
async def get_code(self) -> str:
to_read = 256
offset = 0
remaining = to_read
request_data = {
"id": self.id,
"offset": offset,
"len": to_read
}
code = ""
while offset < remaining:
response = await self.shelly.rpc_request("Script.GetCode", request_data)
offset += to_read
remaining = response["left"]
code += response["data"]
return code
async def put_code(self, code: str):
to_put = 256
offset = 0
request_data = {
"id": self.id,
"append": False,
}
while offset < len(code):
request_data["code"] = code[offset:(offset + to_put)]
await self.shelly.rpc_request("Script.PutCode", request_data)
offset += to_put
request_data["append"] = True

View File

@ -1,5 +1,7 @@
import aiohttp
import asyncio
from typing import List
from .script import Script
class ShellyBase:
def __init__(self, base_url: str, **kwargs):
@ -23,73 +25,46 @@ class ShellyBase:
}
async def get_device_info(self):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Shelly.GetDeviceInfo")
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
return await self.rpc_request("Shelly.GetDeviceInfo")
async def get_status(self):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Shelly.GetStatus")
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
return await self.rpc_request("Shelly.GetStatus")
async def get_config(self):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Shelly.GetConfig")
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
return await self.rpc_request("Shelly.GetConfig")
async def reboot(self, delay_ms: int = 1000):
async with aiohttp.ClientSession() as session:
url = self._rpc_url()
data = self._compose_rpc_data("Shelly.Reboot", {"delay_ms": delay_ms})
return await self.rpc_request("Shelly.Reboot", {"delay_ms": delay_ms})
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
async def get_scripts(self) -> List[Script]:
script_list = await self.rpc_request("Script.List")
scripts: List[Script] = []
for script_data in script_list["scripts"]:
scripts.append(Script(script_data, self))
async def get_script_list(self):
return scripts
async def get_script(self, id_or_name: int|str) -> Script:
scripts = await self.get_scripts()
for script in scripts:
if type(id_or_name) == int and script.id == id_or_name:
return script
elif type(id_or_name) == str and script.name == id_or_name:
return script
raise RuntimeError(f"No script with id or name {id_or_name} found.")
async def create_script(self, name: str) -> Script:
response = await self.rpc_request("Script.Create", {"name": name})
return await Script.create_from_id(response["id"], self)
async def delete_script(self, id: int) -> dict:
return await self.rpc_request("Script.Delete", {"id": id})
async def rpc_request(self, method: str, args: dict = None) -> dict:
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Script.List")
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
async def get_script_status(self, id: int):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Script.GetStatus", {"id": id})
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
async def get_script_config(self, id: int):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Script.GetConfig", {"id": id})
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
async def create_script(self, name: str):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Script.Create", {"name": name})
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)
async def delete_script(self, id: int):
async with aiohttp.ClientSession() as session:
url = self._rpc_url
data = self._compose_rpc_data("Script.Delete", {"id": id})
data = self._compose_rpc_data(method, args)
async with session.post(url, json=data) as resp:
return await self._validate_response(resp)

View File

@ -20,19 +20,40 @@ async def main():
delete_parser.add_argument("id", type=int, help="ID of the script.")
delete_parser.set_defaults(command="delete")
get_code = subparsers.add_parser("code", help="Get the code of a script.")
get_code.add_argument("id", type=int, help="ID of the script.")
get_code.set_defaults(command="code")
upload_code = subparsers.add_parser("upload", help="Upload a file to the script.")
upload_code.add_argument("id", type=int, help="ID of the script.")
upload_code.add_argument("path", type=str, help="Path to the file to upload.")
upload_code.set_defaults(command="upload")
args = parser.parse_args()
shelly = ShellyBase(args.url)
if args.command == "list":
result = await shelly.get_script_list()
print(json.dumps(result, indent=4))
scripts = await shelly.get_scripts()
print(scripts)
elif args.command == "create":
result = await shelly.create_script(args.name)
print(json.dumps(result, indent=4))
script = await shelly.create_script(args.name)
print(script)
elif args.command == "delete":
result = await shelly.delete_script(args.id)
print(json.dumps(result, indent=4))
elif args.command == "code":
script = await shelly.get_script(args.id)
code = await script.get_code()
print(code)
elif args.command == "upload":
script = await shelly.get_script(args.id)
with open(args.path) as f:
code = f.read()
await script.put_code(code)
print("Upload successful.")
if __name__ == "__main__":
asyncio.run(main())