Source code for joulescope.v1.device

# Copyright 2022-2023 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from joulescope.parameters_v1 import PARAMETERS, PARAMETERS_DICT, name_to_value, value_to_name
from .stream_buffer import StreamBuffer
from joulescope.view import View
import copy
import logging
import numpy as np
import queue
import time


[docs] class Device: def __init__(self, driver, device_path): self.config = None self._driver = driver while device_path.endswith('/'): device_path = device_path[:-1] self._log = logging.getLogger(__name__ + '.' + device_path.replace('/', '.')) self._path = device_path self.is_open = False self._stream_cbk_objs = [] self._stream_cbk_objs_add = [] self._stop_fn = None self._input_sampling_frequency = 0 self._output_sampling_frequency = 0 self._statistics_callbacks = [] self._statistics_offsets = [] self._is_streaming = False self._stream_topics = [] self._buffer_duration = 30 self.stream_buffer = None self._on_stats_cbk = self._on_stats # hold reference for unsub self._on_stream_cbk = self._on_stream # hold reference for unsub self._parameters = {} self._parameter_set_queue = [] for p in PARAMETERS: if p.default is not None: self._parameters[p.name] = name_to_value(p.name, p.default) def __str__(self): _, model, serial_number = self._path.split('/') return f'{model.upper()}-{serial_number}' @property def device_path(self): return self._path @property def usb_device(self): return self._path @property def input_sampling_frequency(self): """The original input sampling frequency.""" return self._input_sampling_frequency @property def output_sampling_frequency(self): """The output sampling frequency.""" return self._output_sampling_frequency @output_sampling_frequency.setter def output_sampling_frequency(self, value): self._output_sampling_frequency = value if self.stream_buffer is not None: self.stream_buffer.output_sampling_frequency = self._output_sampling_frequency @property def sampling_frequency(self): """The output sampling frequency.""" return self.output_sampling_frequency @property def buffer_duration(self): """The stream buffer duration.""" return self._buffer_duration @buffer_duration.setter def buffer_duration(self, value): self._buffer_duration = value if self.stream_buffer is not None: self.stream_buffer.buffer_duration = self._buffer_duration @property def statistics_callback(self): """Get the registered statistics callback.""" cbks = self._statistics_callbacks if len(cbks): return cbks[0] else: return None @statistics_callback.setter def statistics_callback(self, cbk): """Set the statistics callback. :param cbk: The callable(data) where data is a statistics data structure. See the `statistics documentation <statistics.html>`_ for details on the data format. This function will be called from the USB processing thread. Any calls back into self MUST BE resynchronized. """ for unregister_cbk in list(self._statistics_callbacks): self.statistics_callback_unregister(unregister_cbk) self.statistics_callback_register(cbk)
[docs] def statistics_callback_register(self, cbk, source=None): """Register a statistics callback. :param cbk: The callable(data) where data is a statistics data structure. See the `statistics documentation <statistics.html>`_ for details on the data format. This function will be called from the USB processing thread. Any calls back into self MUST BE resynchronized. :param source: The statistics source where the computation is performed. Ignored, always use sensor-side statistics for the JS220. WARNING: calling :meth:`statistics_callback` after calling this method may result in unusual behavior. Do not mix these API calls. """ if cbk is None: return if not callable(cbk): self._log.warning('Requested callback is not callable') return if not len(self._statistics_callbacks): self._statistics_start() self._statistics_callbacks.append(cbk)
[docs] def statistics_callback_unregister(self, cbk, source=None): """Unregister a statistics callback. :param cbk: The callback previously provided to :meth:`statistics_callback_register`. :param source: The callback source. """ try: self._statistics_callbacks.remove(cbk) except ValueError: self._log.warning('statistics_callback_unregister but callback not registered.') if not len(self._statistics_callbacks): self._statistics_stop()
def _on_stats(self, topic, value): period = 1 / 2e6 s_start, s_stop = [x * period for x in value['time']['samples']['value']] if not len(self._statistics_offsets): duration = s_start charge = value['accumulators']['charge']['value'] energy = value['accumulators']['energy']['value'] offsets = [duration, charge, energy] self._statistics_offsets = [duration, charge, energy] duration, charge, energy = self._statistics_offsets value['time']['range'] = { 'value': [s_start - duration, s_stop - duration], 'units': 's' } value['time']['delta'] = {'value': s_stop - s_start, 'units': 's'} value['accumulators']['charge']['value'] -= charge value['accumulators']['energy']['value'] -= energy for k in value['signals'].values(): k['µ'] = k['avg'] k['σ2'] = {'value': k['std']['value'] ** 2, 'units': k['std']['units']} if 'integral' in k: k['∫'] = k['integral'] for cbk in self._statistics_callbacks: cbk(value) def _statistics_start(self): if self.is_open: if 'js110' in self.device_path and self.config == 'off': self.subscribe('s/sstats/value', 'pub', self._on_stats_cbk) else: self.publish('s/stats/ctrl', 1) self.subscribe('s/stats/value', 'pub', self._on_stats_cbk) def _statistics_stop(self): if self.is_open: if 'js110' in self.device_path and self.config == 'off': self.unsubscribe('s/sstats/value', self._on_stats_cbk) else: self.unsubscribe('s/stats/value', self._on_stats_cbk) self.publish('s/stats/ctrl', 0)
[docs] def statistics_accumulators_clear(self): """Clear the charge and energy accumulators.""" self._statistics_offsets.clear()
[docs] def view_factory(self): """Construct a new View into the device's data. :return: A View-compatible instance. """ if self.stream_buffer is None: raise RuntimeError('view_factory, but no stream buffer') view = View(self.stream_buffer, self.calibration) view.on_close = lambda: self.stream_process_unregister(view) self.stream_process_register(view) return view
[docs] def parameters(self, name=None): """Get the list of :class:`joulescope.parameter.Parameter` instances. :param name: The optional name of the parameter to retrieve. None (default) returns a list of all parameters. :return: The list of all parameters. If name is provided, then just return that single parameters. """ if name is not None: for p in PARAMETERS: if p.name == name: return copy.deepcopy(p) return None return copy.deepcopy(PARAMETERS)
[docs] def parameter_set(self, name, value): """Set a parameter value. :param name: The parameter name :param value: The new parameter value :raise KeyError: if name not found. :raise ValueError: if value is not allowed """ raise NotImplementedError()
[docs] def parameter_get(self, name, dtype=None): """Get a parameter value. :param name: The parameter name. :param dtype: The data type for the parameter. None (default) attempts to convert the value to the enum string. 'actual' returns the value in its actual type used by the driver. :raise KeyError: if name not found. """ if name == 'current_ranging': pnames = ['type', 'samples_pre', 'samples_window', 'samples_post'] values = [str(self.parameter_get('current_ranging_' + p)) for p in pnames] return '_'.join(values) p = PARAMETERS_DICT[name] if p.path == 'info': return self._parameter_get_info(name) value = self._parameters[name] if dtype == 'actual': return value try: return value_to_name(name, value) except KeyError: return value
def _topic_make(self, topic): if topic[0] != '/': topic = '/' + topic return self._path + topic
[docs] def publish(self, topic, value, timeout=None): """Publish to the underlying joulescope_driver instance. :param topic: The publish topic. :param value: The publish value. :param timeout: The timeout in float seconds to wait for this operation to complete. None waits the default amount. 0 does not wait and subscription will occur asynchronously. """ return self._driver.publish(self._topic_make(topic), value, timeout)
[docs] def query(self, topic, timeout=None): """Query the underlying joulescope_driver instance. :param topic: The publish topic to query. :param timeout: The timeout in float seconds to wait for this operation to complete. None waits the default amount. 0 does not wait and subscription will occur asynchronously. :return: The value associated with topic. """ return self._driver.query(self._topic_make(topic), timeout)
[docs] def subscribe(self, topic, flags, fn, timeout=None): """Subscribe to receive topic updates. :param self: The driver instance. :param topic: Subscribe to this topic string. :param flags: The flags or list of flags for this subscription. The flags can be int32 jsdrv_subscribe_flag_e or string mnemonics, which are: - pub: Subscribe to normal values - pub_retain: Subscribe to normal values and immediately publish all matching retained values. With timeout, this function does not return successfully until all retained values have been published. - metadata_req: Subscribe to metadata requests (not normally useful). - metadata_rsp: Subscribe to metadata updates. - metadata_rsp_retain: Subscribe to metadata updates and immediately publish all matching retained metadata values. - query_req: Subscribe to all query requests (not normally useful). - query_rsp: Subscribe to all query responses. - return_code: Subscribe to all return code responses. :param fn: The function to call on each publish. Note that python dynamically constructs bound methods. To unsubscribe a method, provide the exact same bound method instance to unsubscribe. This constrain usually means that the caller needs to hold onto the instance.method value passed to this function. :param timeout: The timeout in float seconds to wait for this operation to complete. None waits the default amount. 0 does not wait and subscription will occur asynchronously. :raise RuntimeError: on subscribe failure. """ return self._driver.subscribe(self._topic_make(topic), flags, fn, timeout)
[docs] def unsubscribe(self, topic, fn, timeout=None): """Unsubscribe a callback to a topic. :param topic: Unsubscribe from this topic string. :param fn: The callback function to unsubscribe. :param timeout: The timeout in float seconds to wait for this operation to complete. None waits the default amount. 0 does not wait and subscription will occur asynchronously. """ return self._driver.unsubscribe(self._topic_make(topic), fn, timeout)
[docs] def unsubscribe_all(self, fn, timeout=None): """Unsubscribe a callback from all topics. :param fn: The callback function to unsubscribe. :param timeout: The timeout in float seconds to wait for this operation to complete. None waits the default amount. 0 does not wait and subscription will occur asynchronously. """ return self._driver.unsubscribe(fn, timeout)
def _config_apply(self, config=None): """Apply a configuration set by scan. :param config: The configuration string. """ pass
[docs] def open(self, event_callback_fn=None, mode=None, timeout=None): """Open this device. :param event_callback_fn: The function(event, message) to call on asynchronous events, mostly to allow robust handling of device errors. "event" is one of the :class:`DeviceEvent` values, and the message is a more detailed description of the event. :param mode: The open mode which is one of: * 'defaults': Reconfigure the device for default operation. * 'restore': Update our state with the current device state. * 'raw': Open the device in raw mode for development or firmware update. * None: equivalent to 'defaults'. :param timeout: The timeout in seconds. None uses the default timeout. """ rc = self._driver.open(self._path, mode, timeout) self.is_open = True self.publish('h/fs', 2000000) while len(self._parameter_set_queue): name, value = self._parameter_set_queue.pop(0) self.parameter_set(name, value) if len(self._statistics_callbacks): self._statistics_start() device = 'js110' if 'js110' in self._path.lower() else 'js220' self.stream_buffer = StreamBuffer(self._buffer_duration, frequency=self._input_sampling_frequency, device=device, output_frequency=self._output_sampling_frequency) self._config_apply(self.config) return rc
[docs] def close(self, timeout=None): """Close this device and release resources. :param timeout: The timeout in seconds. None uses the default timeout. """ if len(self._statistics_callbacks): self._statistics_stop() self.stop() self.is_open = False self.stream_buffer = None return self._driver.close(self._path, timeout)
@property def model(self): return self._path.split('/')[1] @property def serial_number(self): return self._path.split('/')[-1] @property def device_serial_number(self): return self.serial_number @property def calibration(self): return None
[docs] def info(self): """Get the device information structure. :return: The device information structure. """ raise NotImplementedError()
def _stream_process_call(self, method, *args, **kwargs): rv = False b, self._stream_cbk_objs, self._stream_cbk_objs_add = self._stream_cbk_objs + self._stream_cbk_objs_add, [], [] for obj in b: fn = getattr(obj, method, None) if not callable(fn): self._stream_cbk_objs.append(obj) continue if obj.driver_active: try: rv |= bool(fn(*args, **kwargs)) self._stream_cbk_objs.append(obj) except Exception: self._log.exception('%s %s() exception', obj, method) obj.driver_active = False if not obj.driver_active: try: if hasattr(obj, 'close'): obj.close() except Exception: self._log.exception('%s close() exception', obj) return rv def _on_stream(self, topic, value): b = self.stream_buffer _, e1 = b.sample_id_range b.insert(topic, value) e0, e2 = b.sample_id_range if e1 == e2: return False if e0 == e2: return False rv = self._stream_process_call('stream_notify', self.stream_buffer) if rv: self.stop() if self.stream_buffer.is_duration_max or self.stream_buffer.is_contiguous_duration_max: self.stop()
[docs] def start(self, stop_fn=None, duration=None, contiguous_duration=None): """Start data streaming. :param stop_fn: The function(event, message) called when the device stops. The device can stop "automatically" on errors. Call :meth:`stop` to stop from the caller. This function will be called from the USB processing thread. Any calls back into self MUST BE resynchronized. :param duration: The duration in seconds for the capture. :param contiguous_duration: The contiguous duration in seconds for the capture. As opposed to duration, this ensures that the duration has no missing samples. Missing samples usually occur when the device first starts. If streaming was already in progress, it will be restarted. """ self.stop() self.stream_buffer.reset() self.stream_buffer.duration_max = duration self.stream_buffer.contiguous_duration_max = contiguous_duration self._stop_fn = stop_fn for topic, b in zip(self._stream_topics, self.stream_buffer.buffers.values()): if topic is None: b.active = False continue b.active = True self.subscribe(topic + '!data', 'pub', self._on_stream_cbk) self.publish(topic + 'ctrl', 1) self._is_streaming = True self._stream_process_call('start', self.stream_buffer)
[docs] def stop(self): """Stop data streaming. :return: True if stopped. False if was already stopped. This method is always safe to call, even after the device has been stopped or removed. """ if self._is_streaming: self._is_streaming = False for topic in self._stream_topics: if topic is not None: self.unsubscribe(topic + '!data', self._on_stream_cbk, timeout=0) self.publish(topic + 'ctrl', 0, timeout=0) fn, self._stop_fn = self._stop_fn, None if callable(fn): fn(0, '') # status, message self._stream_process_call('stop')
[docs] def read(self, duration=None, contiguous_duration=None, out_format=None, fields=None): """Read data from the device. :param duration: The duration in seconds for the capture. The duration must fit within the stream_buffer. :param contiguous_duration: The contiguous duration in seconds for the capture. As opposed to duration, this ensures that the duration has no missing samples. Missing samples usually occur when the device first starts. The duration must fit within the stream_buffer. :param out_format: The output format which is one of: * calibrated: The Nx2 np.ndarray(float32) with columns current and voltage. * samples_get: The StreamBuffer samples get format. Use the fields parameter to optionally specify the signals to include. * None: equivalent to 'calibrated'. :param fields: The fields for samples_get when out_format=samples_get. If streaming was already in progress, it will be restarted. If neither duration or contiguous duration is specified, the capture will only be stopped by callbacks registered through :meth:`stream_process_register`. """ self._log.info('read(duration=%s, contiguous_duration=%s, out_format=%s)', duration, contiguous_duration, out_format) if out_format not in ['calibrated', 'samples_get', None]: raise ValueError(f'Invalid out_format {out_format}') if duration is None and contiguous_duration is None: raise ValueError('Must specify duration or contiguous_duration') duration_max = len(self.stream_buffer) / self._output_sampling_frequency if contiguous_duration is not None and contiguous_duration > duration_max: raise ValueError(f'contiguous_duration {contiguous_duration} > {duration_max} max seconds') if duration is not None and duration > duration_max: raise ValueError(f'duration {duration} > {duration_max} max seconds') q = queue.Queue() def on_stop(*args, **kwargs): self._log.info('received stop callback: pending stop') q.put(None) self.start(on_stop, duration=duration, contiguous_duration=contiguous_duration) q.get() self.stop() start_id, end_id = self.stream_buffer.sample_id_range self._log.info('read available range %s, %s', start_id, end_id) if contiguous_duration is not None: start_id = end_id - int(contiguous_duration * self._output_sampling_frequency) elif duration is not None: start_id = end_id - int(duration * self._output_sampling_frequency) if start_id < 0: start_id = 0 self._log.info('read actual %s, %s', start_id, end_id) if out_format in ['calibrated', None]: data = self.stream_buffer.samples_get(start_id, end_id, fields=['current', 'voltage']) i = data['signals']['current']['value'] v = data['signals']['voltage']['value'] return np.hstack([np.reshape(i, (-1, 1)), np.reshape(v, (-1, 1))]) else: return self.stream_buffer.samples_get(start_id, end_id, fields=fields)
@property def is_streaming(self): """Check if the device is streaming. :return: True if streaming. False if not streaming. """ return self._is_streaming
[docs] def stream_process_register(self, obj): """Register a stream process object. :param obj: The instance compatible with :class:`StreamProcessApi`. The instance must remain valid until its :meth:`close` is called. Call :meth:`stream_process_unregister` to disconnect the instance. """ if self._is_streaming and hasattr(obj, 'start'): obj.start(self.stream_buffer) obj.driver_active = True self._stream_cbk_objs_add.append(obj)
[docs] def stream_process_unregister(self, obj): """Unregister a stream process object. :param obj: The instance compatible with :class:`StreamProcessApi` that was previously registered using :meth:`stream_process_register`. """ obj.driver_active = False
[docs] def status(self): """Get the current device status. :return: A dict containing status information. """ return { 'driver': { 'return_code': { 'value': 0, } } }
def extio_status(self): """Read the EXTIO GPI value. :return: A dict containing the extio status. Each key is the status item name. The value is itself a dict with the following keys: * name: The status name, which is the same as the top-level key. * value: The actual value * units: The units, if applicable. * format: The recommended formatting string (optional). """ return {} def __enter__(self): """Device context manager, automatically open.""" self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): """Device context manager, automatically close.""" self.close() def _query_gpi_value(self): gpi_value = None def on_gpi_value(topic, value): nonlocal gpi_value gpi_value = value self.subscribe('s/gpi/+/!value', 'pub', on_gpi_value) self.publish('s/gpi/+/!req', 0) t_start = time.time() while gpi_value is None: if time.time() - t_start > 1.0: raise RuntimeError('_query_gpi_value timed out') time.sleep(0.001) self.unsubscribe('s/gpi/+/!value', on_gpi_value) return gpi_value
[docs] def extio_status(self): """Read the EXTIO GPI value. :return: A dict containing the extio status. Each key is the status item name. The value is itself a dict with the following keys: * name: The status name, which is the same as the top-level key. * value: The actual value * units: The units, if applicable. * format: The recommended formatting string (optional). The most interesting key is "gpi_value" which returns the present general purpose input signal values. The remaining keys simply copy parameter settings for convenience. """ gpi_value = self._query_gpi_value() status = { 'flags': { 'value': 0, 'units': ''}, 'trigger_source': { 'value': self.parameter_get('trigger_source'), 'units': ''}, 'current_lsb': { 'value': self.parameter_get('current_lsb'), 'units': ''}, 'voltage_lsb': { 'value': self.parameter_get('voltage_lsb'), 'units': ''}, 'gpo0': { 'value': self.parameter_get('gpo0'), 'units': ''}, 'gpo1': { 'value': self.parameter_get('gpo1'), 'units': ''}, 'gpi_value': { 'value': gpi_value, 'units': '', }, 'io_voltage': { 'value': self.parameter_get('io_voltage'), 'units': 'mV', }, } for key, value in status.items(): value['name'] = key return status