mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-04-28 09:43:03 +00:00
201 lines
8 KiB
Python
201 lines
8 KiB
Python
# Based on the original rnsh program by Aaron Heise (@acehoss)
|
|
# https://github.com/acehoss/rnsh - MIT License - Copyright (c) 2023 Aaron Heise
|
|
# This version of rnsh is included in RNS under the Reticulum License
|
|
#
|
|
# Reticulum License
|
|
#
|
|
# Copyright (c) 2016-2026 Mark Qvist
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# - The Software shall not be used in any kind of system which includes amongst
|
|
# its functions the ability to purposefully do harm to human beings.
|
|
#
|
|
# - The Software shall not be used, directly or indirectly, in the creation of
|
|
# an artificial intelligence, machine learning or language model training
|
|
# dataset, including but not limited to any use that contributes to the
|
|
# training or development of such a model or algorithm.
|
|
#
|
|
# - The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
# SOFTWARE.
|
|
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
import RNS.Utilities.rnsh.exception as exception
|
|
from typing import Callable
|
|
from contextlib import AbstractContextManager
|
|
import types
|
|
import typing
|
|
|
|
|
|
class RetryStatus:
|
|
def __init__(self, tag: any, try_limit: int, wait_delay: float, retry_callback: Callable[[any, int], any],
|
|
timeout_callback: Callable[[any, int], None], tries: int = 1):
|
|
|
|
self.tag = tag
|
|
self.try_limit = try_limit
|
|
self.tries = tries
|
|
self.wait_delay = wait_delay
|
|
self.retry_callback = retry_callback
|
|
self.timeout_callback = timeout_callback
|
|
self.try_time = time.time()
|
|
self.completed = False
|
|
|
|
@property
|
|
def ready(self):
|
|
ready = time.time() > self.try_time + self.wait_delay
|
|
RNS.log(f"ready check {self.tag} try_time {self.try_time} wait_delay {self.wait_delay} " +
|
|
f"next_try {self.try_time + self.wait_delay} now {time.time()} " +
|
|
f"exceeded {time.time() - self.try_time - self.wait_delay} ready {ready}", RNS.LOG_DEBUG)
|
|
return ready
|
|
|
|
@property
|
|
def timed_out(self):
|
|
return self.ready and self.tries >= self.try_limit
|
|
|
|
def timeout(self):
|
|
self.completed = True
|
|
self.timeout_callback(self.tag, self.tries)
|
|
|
|
def retry(self) -> any:
|
|
self.tries = self.tries + 1
|
|
self.try_time = time.time()
|
|
return self.retry_callback(self.tag, self.tries)
|
|
|
|
|
|
class RetryThread(AbstractContextManager):
|
|
def __init__(self, loop_period: float = 0.25, name: str = "retry thread"):
|
|
self._loop_period = loop_period
|
|
self._statuses: list[RetryStatus] = []
|
|
self._tag_counter = 0
|
|
self._lock = threading.RLock()
|
|
self._run = True
|
|
self._finished: asyncio.Future = None
|
|
self._thread = threading.Thread(name=name, target=self._thread_run, daemon=True)
|
|
self._thread.start()
|
|
|
|
def is_alive(self):
|
|
return self._thread.is_alive()
|
|
|
|
def close(self, loop: asyncio.AbstractEventLoop = None) -> asyncio.Future:
|
|
RNS.log("Stopping timer thread", RNS.LOG_DEBUG)
|
|
if loop is None:
|
|
self._run = False
|
|
self._thread.join()
|
|
return None
|
|
else:
|
|
self._finished = loop.create_future()
|
|
return self._finished
|
|
|
|
def wait(self, timeout: float = None):
|
|
if timeout:
|
|
timeout = timeout + time.time()
|
|
|
|
while timeout is None or time.time() < timeout:
|
|
with self._lock:
|
|
task_count = len(self._statuses)
|
|
if task_count == 0:
|
|
return
|
|
time.sleep(0.1)
|
|
|
|
|
|
def _thread_run(self):
|
|
while self._run and self._finished is None:
|
|
time.sleep(self._loop_period)
|
|
ready: list[RetryStatus] = []
|
|
prune: list[RetryStatus] = []
|
|
with self._lock: ready.extend(list(filter(lambda s: s.ready, self._statuses)))
|
|
|
|
for retry in ready:
|
|
try:
|
|
if not retry.completed:
|
|
if retry.timed_out:
|
|
RNS.log(f"Timed out {retry.tag} after {retry.try_limit} tries", RNS.LOG_DEBUG)
|
|
retry.timeout()
|
|
prune.append(retry)
|
|
elif retry.ready:
|
|
RNS.log(f"Retrying {retry.tag}, try {retry.tries + 1}/{retry.try_limit}", RNS.LOG_DEBUG)
|
|
should_continue = retry.retry()
|
|
if not should_continue: self.complete(retry.tag)
|
|
|
|
except Exception as e:
|
|
RNS.log(f"Error processing retry id {retry.tag}: {e}", RNS.LOG_ERROR)
|
|
prune.append(retry)
|
|
|
|
with self._lock:
|
|
for retry in prune:
|
|
RNS.log(f"pruned retry {retry.tag}, retry count {retry.tries}/{retry.try_limit}", RNS.LOG_DEBUG)
|
|
with exception.permit(SystemExit): self._statuses.remove(retry)
|
|
|
|
if self._finished is not None: self._finished.set_result(None)
|
|
|
|
def _get_next_tag(self):
|
|
self._tag_counter += 1
|
|
return self._tag_counter
|
|
|
|
def has_tag(self, tag: any) -> bool:
|
|
with self._lock: return next(filter(lambda s: s.tag == tag, self._statuses), None) is not None
|
|
|
|
def begin(self, try_limit: int, wait_delay: float, try_callback: Callable[[any, int], any],
|
|
timeout_callback: Callable[[any, int], None]) -> any:
|
|
|
|
RNS.log(f"Running first try", RNS.LOG_DEBUG)
|
|
tag = try_callback(None, 1)
|
|
RNS.log(f"First try got id {tag}", RNS.LOG_DEBUG)
|
|
|
|
if not tag:
|
|
RNS.log(f"Callback returned None/False/0, considering complete.", RNS.LOG_DEBUG)
|
|
return None
|
|
|
|
with self._lock:
|
|
if tag is None: tag = self._get_next_tag()
|
|
self.complete(tag)
|
|
|
|
self._statuses.append(RetryStatus(tag=tag,
|
|
tries=1,
|
|
try_limit=try_limit,
|
|
wait_delay=wait_delay,
|
|
retry_callback=try_callback,
|
|
timeout_callback=timeout_callback))
|
|
|
|
RNS.log(f"Added retry timer for {tag}", RNS.LOG_DEBUG)
|
|
return tag
|
|
|
|
def complete(self, tag: any):
|
|
assert tag is not None
|
|
with self._lock:
|
|
status = next(filter(lambda l: l.tag == tag, self._statuses), None)
|
|
if status is not None:
|
|
status.completed = True
|
|
self._statuses.remove(status)
|
|
RNS.log(f"completed {tag}", RNS.LOG_DEBUG)
|
|
return
|
|
|
|
RNS.log(f"status not found to complete {tag}", RNS.LOG_DEBUG)
|
|
|
|
def complete_all(self):
|
|
with self._lock:
|
|
for status in self._statuses:
|
|
status.completed = True
|
|
RNS.log(f"completed {status.tag}", RNS.LOG_DEBUG)
|
|
|
|
self._statuses.clear()
|
|
|
|
def __exit__(self, __exc_type: typing.Type[BaseException], __exc_value: BaseException,
|
|
__traceback: types.TracebackType) -> bool:
|
|
self.close()
|
|
return False
|