# Copyright 2022-2023 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The pyjoulescope_driver wrapper to implement the v0 API."""
from pyjoulescope_driver import Driver
from .device import Device
from .js220 import DeviceJs220
from .js110 import DeviceJs110
import atexit
import logging
from typing import List
_log = logging.getLogger(__name__)
class DriverWrapper:
"""Singleton to wrap pyjoulescope_driver.Driver"""
__singleton__ = None
def __new__(cls, *args, **kwds):
s = cls.__dict__.get("__singleton__")
if s is not None:
return s
s = object.__new__(cls)
cls.__singleton__ = s
s._initialize(*args, **kwds)
return s
def _initialize(self):
self.driver = Driver()
self.driver.log_level = 'INFO'
atexit.register(self._finalize)
self.devices = {}
self.driver.subscribe('@/!add', 'pub', self._on_device_add)
self.driver.subscribe('@/!remove', 'pub', self._on_device_remove)
for d in self.driver.device_paths():
self._on_device_add('@/!add', d)
def _finalize(self):
while len(self.devices):
_, device = self.devices.popitem()
device.close()
d, self.driver = self.driver, None
d.finalize()
def _on_device_add(self, topic, value):
if value in self.devices:
return
if 'js220' in value:
cls = DeviceJs220
elif 'js110' in value:
cls = DeviceJs110
else:
_log.info('Unsupported device: %s', value)
return
self.devices[value] = cls(self.driver, value)
def _on_device_remove(self, topic, value):
d = self.devices.pop(value, None)
if d is not None:
d.close()
def scan(self, name: str = None, config=None):
if name is None or name.lower() == 'joulescope':
devices = self.devices.values()
devices = [d for d in devices if d.device_path[2] != '&']
devices = sorted(devices, key=lambda x: str(x))
for d in devices:
d.config = config
elif name.lower() == 'bootloader':
devices = self.devices.values()
devices = [d for d in devices if d.device_path[2] == '&']
devices = sorted(devices, key=lambda x: str(x))
for d in devices:
d.config = config
return devices
[docs]
def scan(name: str = None, config=None) -> List[Device]:
"""Scan for connected devices.
:param name: The case-insensitive device name to scan.
None (default) is equivalent to 'Joulescope'.
:param config: The configuration for the :class:`Device`.
:return: The list of :class:`Device` instances. A new instance is created
for each detected device. Use :func:`scan_for_changes` to preserved
existing instances.
:raises: None - guaranteed not to raise an exception
"""
d = DriverWrapper()
return d.scan(name, config)
[docs]
def scan_require_one(name: str = None, config=None) -> Device:
"""Scan for one and only one device.
:param name: The case-insensitive device name to scan.
None (default) is equivalent to 'Joulescope'.
:param config: The configuration for the :class:`Device`.
:return: The :class:`Device` found.
:raise RuntimeError: If no devices or more than one device was found.
"""
devices = scan(name, config=config)
if not len(devices):
raise RuntimeError("no devices found")
if len(devices) > 1:
raise RuntimeError("multiple devices found")
return devices[0]
[docs]
def scan_for_changes(name: str = None, devices=None, config=None):
"""Scan for device changes.
:param name: The case-insensitive device name to scan.
None (default) is equivalent to 'Joulescope'.
:param devices: The list of existing :class:`Device` instances returned
by a previous scan. Pass None or [] if no scan has yet been performed.
:param config: The configuration for the :class:`Device` which is one of
['auto', 'ignore', 'off']. None is equivalent to 'auto'.
:return: The tuple of lists (devices_now, devices_added, devices_removed).
"devices_now" is the list of all currently connected devices. If the
device was in "devices", then return the :class:`Device` instance from
"devices".
"devices_added" is the list of connected devices not in "devices".
"devices_removed" is the list of devices in "devices" but not "devices_now".
"""
devices_prev = [] if devices is None else devices
devices_next = scan(name, config=config)
devices_added = []
devices_removed = []
devices_now = []
for d in devices_next:
matches = [x for x in devices_prev if str(x) == str(d)]
if len(matches):
devices_now.append(matches[0])
else:
devices_added.append(d)
devices_now.append(d)
for d in devices_prev:
matches = [x for x in devices_next if str(x) == str(d)]
if not len(matches):
devices_removed.append(d)
_log.info('scan_for_changes %d devices: %d added, %d removed',
len(devices_now), len(devices_added), len(devices_removed))
return devices_now, devices_added, devices_removed
class DeviceNotify:
def __init__(self, cbk):
"""Start device insertion/removal notification.
:param cbk: The function called on device insertion or removal. The
arguments are (inserted, info). "inserted" is True on insertion
and False on removal. "info" contains platform-specific details
about the device. In general, the application should rescan for
relevant devices.
"""
self._on_add_fn = self._on_add
self._on_remove_fn = self._on_remove
self._cbk = cbk
self._driver = None
self._cbk(True, None)
self.open()
def _on_add(self, topic, value):
self._cbk(True, value)
def _on_remove(self, topic, value):
self._cbk(False, value)
def open(self):
self.close()
self._driver = DriverWrapper()
d = self._driver.driver
d.subscribe('@/!add', 'pub', self._on_add_fn)
d.subscribe('@/!remove', 'pub', self._on_remove_fn)
def close(self):
if self._driver is not None:
d = self._driver.driver
self._driver = None
d.unsubscribe('@/!add', self._on_add_fn)
d.unsubscribe('@/!remove', self._on_remove_fn)