Source code for joulescope.data_recorder

# Copyright 2018 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from joulescope.v0.calibration import Calibration
from joulescope.v0.stream_buffer import reduction_downsample, Statistics, stats_to_api, \
    stats_invalidate, \
    stats_factory, stats_array_factory, stats_array_clear, stats_array_invalidate, \
    STATS_FIELD_NAMES, NP_STATS_NAMES, \
    I_RANGE_MISSING, SUPPRESS_SAMPLES_MAX, RawProcessor, stats_compute
from joulescope.v0 import array_storage
from joulescope import datafile
import json
import numpy as np
import datetime
import logging

log = logging.getLogger(__name__)

DATA_RECORDER_FORMAT_VERSION = '2'
SAMPLES_PER_REDUCTION = 20000   # 100 Hz @ 2 MSPS
REDUCTIONS_PER_TLV = 20         # 5 Hz @ 2 MSPS
TLVS_PER_BLOCK = 5              # 1 Hz @ 2 MSPS

_STATS_VALUES_V1 = 4


_SIGNALS_UNITS = {
    'current': 'A',
    'voltage': 'V',
    'power': 'W',
    'raw_current': 'LSBs',
    'raw_voltage': 'LSBs',
    'raw': '',
    'bits': '',
    'current_range': '',
    'current_lsb': '',
    'voltage_lsb': '',
}


_DOWNSAMPLE_FORMATTERS = {
    0: lambda x: x.astype(dtype=np.float32),
    1: lambda x: x.astype(dtype=np.float32),
    2: lambda x: x.astype(dtype=np.float32),
    3: lambda x: (x * 16).astype(dtype=np.uint8),
    4: lambda x: (x * 15).astype(dtype=np.uint8),
    5: lambda x: (x * 15).astype(dtype=np.uint8),
}


_DOWNSAMPLE_UNFORMATTERS = {
    0: lambda x: x,
    1: lambda x: x,
    2: lambda x: x,
    3: lambda x: x.astype(dtype=np.float32) * (1.0 / 16),
    4: lambda x: x.astype(dtype=np.float32) * (1.0 / 15),
    5: lambda x: x.astype(dtype=np.float32) * (1.0 / 15),
}


def construct_record_filename():
    time_start = datetime.datetime.utcnow()
    timestamp_str = time_start.strftime('%Y%m%d_%H%M%S')
    return f'{timestamp_str}.jls'


[docs] class DataRecorder: """Record Joulescope data to a file. :param filehandle: The file-like object or file name. :param calibration: The calibration bytes in datafile format. None (default) uses the unit gain calibration. :param user_data: Arbitrary JSON-serializable user data that is added to the file. """ def __init__(self, filehandle, calibration=None, user_data=None): log.info('init') if isinstance(filehandle, str): self._fh = open(filehandle, 'wb') filehandle = self._fh else: self._fh = None self._sampling_frequency = 0 self._samples_per_tlv = 0 self._samples_per_block = 0 self._samples_per_reduction = 0 self._reductions_per_tlv = 0 self._reduction = None self._sample_id_tlv = None # sample id for start of next TLV self._sample_id_block = None # sample id for start of current block, None if not started yet self._stream_buffer = None # to ensure same self._sb_sample_id_last = None self._voltage_range = None self._data_buffer = [] self._sample_buffers = {} self._writer = datafile.DataFileWriter(filehandle) self._closed = False self._total_size = 0 if user_data is not None: b = json.dumps(user_data).encode('utf-8') self._writer.append(datafile.TAG_USER_JSON, b) if calibration is not None: if isinstance(calibration, Calibration): calibration = calibration.data self._writer.append_subfile('calibration', calibration) def _initialize(self, sampling_frequency, data_format): self._sampling_frequency = sampling_frequency self._samples_per_reduction = SAMPLES_PER_REDUCTION self._samples_per_tlv = self._samples_per_reduction * REDUCTIONS_PER_TLV self._samples_per_block = self._samples_per_tlv * TLVS_PER_BLOCK # dependent vars self._reductions_per_tlv = self._samples_per_tlv // self._samples_per_reduction reduction_block_size = self._samples_per_block // self._samples_per_reduction self._reduction = stats_array_factory(reduction_block_size) self._append_configuration(data_format) self._writer.collection_start(0, 0) def _append_configuration(self, data_format=None): data_format = 'none' if data_format is None else str(data_format) config = { 'type': 'config', 'data_recorder_format_version': DATA_RECORDER_FORMAT_VERSION, 'sampling_frequency': self._sampling_frequency, 'samples_per_reduction': self._samples_per_reduction, 'samples_per_tlv': self._samples_per_tlv, 'samples_per_block': self._samples_per_block, 'reduction_fields': ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb'], 'data_format': data_format, 'reduction_format': 'v2', } cfg_data = json.dumps(config).encode('utf-8') self._writer.append(datafile.TAG_META_JSON, cfg_data) def _collection_start(self, data=None): log.debug('_collection_start()') c = self._writer.collection_start(1, 0, data=data) c.metadata = {'start_sample_id': self._sample_id_tlv} c.on_end = self._collection_end self._sample_id_block = self._sample_id_tlv def _collection_end(self, collection): sample_count = (self._sample_id_tlv - self._sample_id_block) r_stop = sample_count // self._samples_per_reduction log.debug('_collection_end(%s, %s)', r_stop, len(self._reduction)) data = self._reduction[:r_stop, :] arrays = {0x00: data[:, 0]['length'].astype(np.uint64)} for field_idx, field in enumerate(STATS_FIELD_NAMES): for stat_idx, stat in enumerate(NP_STATS_NAMES[1:]): x_id = ((field_idx & 0x0f) << 4) | ((stat_idx + 1) & 0x0f) x = data[:, field_idx][stat] arrays[x_id] = x.astype(np.float32) value = array_storage.pack(arrays, r_stop) value = value.tobytes() self._writer.collection_end(collection, value) self._sample_id_block = None stats_array_clear(self._reduction)
[docs] def stream_notify(self, stream_buffer): """Process data from a stream buffer. :param stream_buffer: The stream_buffer instance which has: * "output_sampling_frequency" member -> float * "has_raw" member -> in [True, False] * "sample_id_range" member => (start, stop) * "voltage_range" member -> in [0, 1] * samples_get(start_sample_id, stop_sample_id, dtype) * data_get(start_sample_id, stop_sample_id, increment, out) * __len__ : maximum number of samples that stream_buffer holds """ finalize = False if stream_buffer is None: if self._stream_buffer is None: return finalize = True stream_buffer = self._stream_buffer sb_start, sb_stop = stream_buffer.sample_id_range if self._stream_buffer is None: self._stream_buffer = stream_buffer data_format = 'raw' if stream_buffer.has_raw else 'float32_v2' self._initialize(stream_buffer.output_sampling_frequency, data_format) self._sample_id_tlv = sb_stop self._sample_id_block = None if self._samples_per_tlv > len(stream_buffer): raise ValueError('stream_buffer length too small. %s > %s' % (self._samples_per_tlv, len(stream_buffer))) elif self._stream_buffer != stream_buffer: raise ValueError('Supports only a single stream_buffer instance') if sb_start > self._sample_id_tlv: raise ValueError('stream_buffer does not contain sample_id') while True: finalize_now = False sample_id_next = self._sample_id_tlv + self._samples_per_tlv if sb_stop < sample_id_next: if finalize and self._sample_id_tlv < sb_stop: finalize_now = True sample_id_next = sb_stop else: break self._voltage_range = stream_buffer.voltage_range if self._sample_id_block is None: collection_data = { 'v_range': self._voltage_range, 'sample_id': self._sample_id_tlv, } collection_data = json.dumps(collection_data).encode('utf-8') self._collection_start(data=collection_data) log.debug('_process() add tlv %d', self._sample_id_tlv) if stream_buffer.has_raw: self._append_raw_data(self._sample_id_tlv, sample_id_next) else: self._append_float_data(self._sample_id_tlv, sample_id_next) self._total_size += (sample_id_next - self._sample_id_tlv) tlv_offset = (self._sample_id_tlv - self._sample_id_block) // self._samples_per_tlv r_start = tlv_offset * self._reductions_per_tlv r_stop = r_start + self._reductions_per_tlv stream_buffer.data_get(self._sample_id_tlv, sample_id_next, self._samples_per_reduction, out=self._reduction[r_start:r_stop, :]) self._sample_id_tlv = sample_id_next if finalize_now or self._sample_id_tlv - self._sample_id_block >= self._samples_per_block: self._collection_end(self._writer.collections[-1])
def _append_raw_data(self, start, stop): if self._closed: return b = self._stream_buffer.samples_get(start, stop, fields='raw') data = b.tobytes() self._writer.append(datafile.TAG_DATA_BINARY, data, compress=False) def _append_float_data(self, start, stop): if self._closed: return data = self._stream_buffer.samples_get(start, stop, fields=STATS_FIELD_NAMES) arrays = {} for field_idx, field in enumerate(STATS_FIELD_NAMES): x_id = (field_idx << 4) | 0x01 # stat_idx=mean x = data['signals'][field]['value'] x = _DOWNSAMPLE_FORMATTERS[field_idx](x) arrays[x_id] = x data = array_storage.pack(arrays, stop - start) data = data.tobytes() self._writer.append(datafile.TAG_DATA_BINARY, data, compress=False) def _insert_remove_processed_data(self): while len(self._data_buffer) and self._data_buffer[0]['time']['sample_id_range']['value'][1] < self._sample_id_tlv: self._data_buffer.pop() def _insert_size_pending(self): self._insert_remove_processed_data() if not len(self._data_buffer): return 0 sz = np.sum([x['time']['samples']['value'] for x in self._data_buffer]) start_idx = self._data_buffer[0]['time']['sample_id_range']['value'][0] sz -= self._sample_id_tlv - start_idx return sz def _insert_next(self): # Will write any pending data up to a full data TLV # WARNING: does not check for sufficient data to fill a data TLV # since it needs to also handle the final partial. if self._sample_id_block is None: if not len(self._data_buffer): return has_raw = 'raw' in self._data_buffer[0]['signals'] collection_data = {'sample_id': self._sample_id_tlv} if has_raw: collection_data['v_range'] = self._data_buffer[0]['signals']['raw']['voltage_range'] collection_data = json.dumps(collection_data).encode('utf-8') self._collection_start(data=collection_data) offset = 0 finalize_now = True while len(self._data_buffer): do_pop = True data = self._data_buffer[0] s0, s1 = data['time']['sample_id_range']['value'] # samples, not indices assert(self._sample_id_tlv + offset >= s0) idx0 = self._sample_id_tlv + offset - s0 # input buffer starting index idx1 = s1 - s0 # input buffer ending index if s1 > self._sample_id_tlv + self._samples_per_tlv: idx1 = self._sample_id_tlv + self._samples_per_tlv - s0 do_pop = False idx_end = offset + idx1 - idx0 for field, d in self._sample_buffers.items(): if field == 'raw': self._sample_buffers[field][offset:idx_end, :] = data['signals'][field]['value'][idx0:idx1, :] else: self._sample_buffers[field][offset:idx_end] = data['signals'][field]['value'][idx0:idx1] offset += idx1 - idx0 if do_pop: self._data_buffer.pop(0) if offset >= self._samples_per_tlv: finalize_now = False break # write the data TLV if 'raw' in self._sample_buffers: data = self._sample_buffers['raw'][:offset, :].tobytes() self._writer.append(datafile.TAG_DATA_BINARY, data, compress=False) else: arrays = {} for field_idx, field in enumerate(STATS_FIELD_NAMES): x_id = (field_idx << 4) | 0x01 # stat_idx=mean x = self._sample_buffers[field][:offset] x = _DOWNSAMPLE_FORMATTERS[field_idx](x) arrays[x_id] = x data = array_storage.pack(arrays, self._samples_per_tlv) data = data.tobytes() self._writer.append(datafile.TAG_DATA_BINARY, data, compress=False) # update reductions tlv_offset = (self._sample_id_tlv - self._sample_id_block) // self._samples_per_tlv r_start = tlv_offset * self._reductions_per_tlv r_stop = r_start + (offset * self._reductions_per_tlv) // self._samples_per_tlv for field_idx, field in enumerate(STATS_FIELD_NAMES): d = self._sample_buffers[field] for idx in range(0, r_stop - r_start): r = r_start + idx k0 = idx * self._samples_per_reduction k1 = k0 + self._samples_per_reduction stats_compute(d[k0:k1], self._reduction[r, field_idx:field_idx + 1]) self._sample_id_tlv += self._samples_per_tlv self._total_size += offset if finalize_now or self._sample_id_tlv - self._sample_id_block >= self._samples_per_block: self._collection_end(self._writer.collections[-1]) return
[docs] def insert(self, data): """Insert sample data. :param data: A dict in the :meth:`StreamBuffer.samples_get` format. """ if self._sample_id_tlv is None: # first call data_format = 'raw' if 'raw' in data['signals'] else 'float32_v2' self._initialize(data['time']['sampling_frequency']['value'], data_format) self._sample_id_tlv = data['time']['sample_id_range']['value'][0] self._sample_buffers = {} for field in data['signals'].keys(): if field == 'raw': d = np.empty((self._samples_per_tlv, 2), dtype=np.uint16) else: d = np.empty(self._samples_per_tlv, dtype=np.float32) self._sample_buffers[field] = d if data is not None: self._data_buffer.append(data) while True: if self._insert_size_pending() < self._samples_per_tlv: break self._insert_next() if data is None: self._insert_next() # short data finalize
def _append_meta(self, footer_user_data=None): index = { 'type': 'footer', 'size': self._total_size, # in samples } data = json.dumps(index).encode('utf-8') self._writer.append(datafile.TAG_META_JSON, data) if footer_user_data is not None: data = json.dumps(footer_user_data).encode('utf-8') self._writer.append(datafile.TAG_USER_JSON, data)
[docs] def close(self, footer_user_data=None): """Finalize and close the recording.""" if self._closed: return if self._sample_id_tlv is None: self._append_configuration() if self._stream_buffer: self.stream_notify(None) if self._data_buffer: self.insert(None) self._closed = True while len(self._writer.collections): collection = self._writer.collections[-1] if len(collection.metadata): self._collection_end(collection) else: self._writer.collection_end() self._append_meta(footer_user_data) self._writer.finalize() if self._fh is not None: self._fh.close() self._fh = None
[docs] class DataReader: """Read Joulescope data from a file.""" def __init__(self): self.calibration = None self.config = None self.footer = None self._fh_close = False self._fh = None self._f = None # type: datafile.DataFileReader self._data_start_position = 0 self._voltage_range = 0 self._sample_cache = None self._data_get_handler = None self._reduction_handler = None self._samples_get_handler = None self._statistics_get_handler = None self.raw_processor = RawProcessor() self.user_data = None self.footer_user_data = None def __str__(self): if self._f is not None: return 'DataReader %.2f seconds (%d samples)' % (self.duration, self.footer['size'])
[docs] def close(self): """Close the recording file.""" if self._fh_close: self._fh.close() self._fh_close = False self._fh = None self._f = None self._sample_cache = None self._reduction_cache = None
[docs] def open(self, filehandle): """Open a recording file. :param filehandle: The seekable filehandle or filename string. """ self.close() self.calibration = Calibration() # default calibration self.config = None self.footer = None self._data_start_position = 0 if isinstance(filehandle, str): log.info('DataReader(%s)', filehandle) self._fh = open(filehandle, 'rb') self._fh_close = True else: self._fh = filehandle self._fh_close = False self._f = datafile.DataFileReader(self._fh) while True: tag, value = self._f.peek() if tag is None: if self._data_start_position and self.config is not None: log.warning('Unexpected file truncation, attempting recovery') sample_count = self._sample_count() log.warning('Recovery found %d samples', sample_count) self.footer = { 'type': 'footer', 'size': sample_count, # in samples } else: raise ValueError('could not read file') break elif tag == datafile.TAG_SUBFILE: name, data = datafile.subfile_split(value) if name == 'calibration': self.calibration = Calibration().load(data) elif tag == datafile.TAG_COLLECTION_START: self._data_start_position = self._f.tell() elif tag == datafile.TAG_META_JSON: meta = json.loads(value.decode('utf-8')) type_ = meta.get('type') if type_ == 'config': self._configure(meta) elif type_ == 'footer': self.footer = meta else: log.warning('Unknown JSON section type=%s', type_) elif tag == datafile.TAG_USER_JSON: if self.footer is None: self.user_data = json.loads(value.decode('utf-8')) else: self.footer_user_data = json.loads(value.decode('utf-8')) elif tag == datafile.TAG_END: break self._f.skip() if self.config is None or self.footer is None: raise ValueError('could not read file') log.info('DataReader with %d samples:\n%s', self.footer['size'], json.dumps(self.config, indent=2)) if self._data_start_position == 0 and self.footer['size']: raise ValueError(f"data not found, but expect {self.footer['size']} samples") if int(self.config['data_recorder_format_version']) > int(DATA_RECORDER_FORMAT_VERSION): raise ValueError('Invalid file format') self.config.setdefault('reduction_fields', ['current', 'voltage', 'power']) cal = self.calibration self.raw_processor.calibration_set(cal.current_offset, cal.current_gain, cal.voltage_offset, cal.voltage_gain) return self
def _configure(self, config): config.setdefault('data_format', 'raw') config.setdefault('reduction_format', 'v1') self.config = config self._samples_get_handler = getattr(self, '_samples_get_handler_' + config['data_format']) self._data_get_handler = getattr(self, '_data_get_handler_' + config['data_format']) self._statistics_get_handler = getattr(self, '_statistics_get_handler_' + config['data_format']) self._reduction_handler = getattr(self, '_reduction_handler_' + config['reduction_format']) @property def sample_id_range(self): """The sample ID range. :return: [start, stop] sample identifiers. """ if self._f is not None: s_start = 0 s_end = int(s_start + self.footer['size']) return [s_start, s_end] return [0, 0] @property def sampling_frequency(self): """The output sampling frequency.""" f = 0.0 if self._f is not None: f = float(self.config['sampling_frequency']) if f <= 0.0: log.warning('Invalid sampling frequency %r, assume 1.0 Hz.', f) f = 1.0 return f @property def input_sampling_frequency(self): """The original input sampling frequency.""" f = 0.0 if self._f is None: pass elif 'input_sampling_frequency' in self.config: f = float(self.config['sampling_frequency']) else: f = self.sampling_frequency if f <= 0.0: log.warning('Invalid input sampling frequency %r, assume 1.0 Hz.', f) f = 1.0 return f @property def output_sampling_frequency(self): """The output sampling frequency.""" return self.sampling_frequency @property def reduction_frequency(self): """The reduction frequency or 1 if no reduction.""" f = 0.0 try: if self._f is not None: f = self.config['sampling_frequency'] / self.config['samples_per_reduction'] except Exception: log.warning('Could not get reduction frequency.') if f <= 0.0: log.warning('Invalid input sampling frequency %r, assume 1.0 Hz.', f) f = 1.0 return f @property def duration(self): """The data file duration, in seconds.""" f = self.sampling_frequency if f > 0: r = self.sample_id_range return (r[1] - r[0]) / f return 0.0 @property def voltage_range(self): """The data file voltage range.""" return self._voltage_range def _sample_count(self): """Count the actual samples in the file. WARNING: this operation may be slow. Use footer when possible. """ samples_per_block = self.config['samples_per_block'] sample_count = 0 self._fh.seek(self._data_start_position) while True: tag, _ = self._f.peek_tag_length() if tag is None: break if tag == datafile.TAG_COLLECTION_START: self._f.skip() sample_count += samples_per_block else: self._f.advance() return sample_count def _validate_range(self, start=None, stop=None, increment=None): idx_start, idx_end = self.sample_id_range if increment is not None: idx_end = ((idx_end + increment - 1) // increment) * increment # log.debug('[%d, %d] : [%d, %d]', start, stop, idx_start, idx_end) if start == idx_start and start == stop: pass # empty is allowed elif not idx_start <= start < idx_end: raise ValueError('start out of range: %d <= %d < %d' % (idx_start, start, idx_end)) elif not idx_start <= stop <= idx_end: raise ValueError('stop out of range: %d <= %d <= %d: %s' % (idx_start, stop, idx_end, increment)) if stop < start: stop = start return start, stop def _sample_tlv(self, sample_idx): """Get the sample data entry for the TLV containing sample_idx. :param sample_idx: The sample index. :return: The dict containing the data. """ if self._sample_cache and self._sample_cache['start'] <= sample_idx < self._sample_cache['stop']: # cache hit return self._sample_cache idx_start, idx_end = self.sample_id_range if not idx_start <= sample_idx < idx_end: raise ValueError('sample index out of range: %d <= %d < %d' % (idx_start, sample_idx, idx_end)) if self._sample_cache is not None: log.debug('_sample_cache cache miss: %s : %s %s', sample_idx, self._sample_cache['start'], self._sample_cache['stop']) # seek samples_per_tlv = self.config['samples_per_tlv'] samples_per_block = self.config['samples_per_block'] tgt_block = sample_idx // samples_per_block if self._sample_cache is not None and sample_idx > self._sample_cache['start']: # continue forward self._fh.seek(self._sample_cache['tlv_pos']) voltage_range = self._sample_cache['voltage_range'] block_fh_pos = self._sample_cache['block_pos'] current_sample_idx = self._sample_cache['start'] block_counter = current_sample_idx // samples_per_block else: # add case for rewind? log.debug('_sample_tlv resync to beginning') self._fh.seek(self._data_start_position) voltage_range = 0 block_fh_pos = 0 block_counter = 0 current_sample_idx = 0 if self._f.advance() != datafile.TAG_COLLECTION_START: raise ValueError('data section must be single collection') while True: tag, _ = self._f.peek_tag_length() if tag is None: log.error('sample_tlv not found before end of file: %s > %s', sample_idx, current_sample_idx) break if tag == datafile.TAG_COLLECTION_START: if block_counter < tgt_block: self._f.skip() block_counter += 1 else: block_fh_pos = self._f.tell() tag, collection_bytes = next(self._f) c = datafile.Collection.decode(collection_bytes) if c.data: collection_start_meta = json.loads(c.data) voltage_range = collection_start_meta.get('v_range', 0) self._voltage_range = voltage_range current_sample_idx = block_counter * samples_per_block elif tag == datafile.TAG_COLLECTION_END: block_counter += 1 self._f.advance() elif tag == datafile.TAG_DATA_BINARY: tlv_stop = current_sample_idx + samples_per_tlv if current_sample_idx <= sample_idx < tlv_stop: # found it! tlv_pos = self._f.tell() tag, value = next(self._f) self._sample_cache = { 'voltage_range': voltage_range, 'start': current_sample_idx, 'stop': tlv_stop, 'value': value, 'tlv_pos': tlv_pos, 'block_pos': block_fh_pos, } return self._sample_cache else: self._f.advance() current_sample_idx = tlv_stop else: self._f.advance() def _raw(self, start=None, stop=None): """Get the raw data. :param start: The starting sample (must already be validated). :param stop: The ending sample (must already be validated). :return: The output which is (out_raw, bits, out_cal). """ x_start, x_stop = self.sample_id_range self._fh.seek(self._data_start_position) self._validate_range(start, stop) length = stop - start if length <= 0: return np.empty((0, 2), dtype=np.uint16) # process extra before & after to handle filtering if start > SUPPRESS_SAMPLES_MAX: sample_idx = start - SUPPRESS_SAMPLES_MAX prefix_count = SUPPRESS_SAMPLES_MAX else: sample_idx = 0 prefix_count = start if stop + SUPPRESS_SAMPLES_MAX <= x_stop: end_idx = stop + SUPPRESS_SAMPLES_MAX else: end_idx = x_stop out_idx = 0 d_raw = np.empty((end_idx - sample_idx, 2), dtype=np.uint16) if self._f.advance() != datafile.TAG_COLLECTION_START: raise ValueError('data section must be single collection') while sample_idx < end_idx: sample_cache = self._sample_tlv(sample_idx) if sample_cache is None: break value = sample_cache['value'] data = np.frombuffer(value, dtype=np.uint16).reshape((-1, 2)) b_start = sample_idx - sample_cache['start'] length = sample_cache['stop'] - sample_cache['start'] - b_start out_remaining = end_idx - sample_idx length = min(length, out_remaining) if length <= 0: break b_stop = b_start + length d = data[b_start:b_stop, :] d_raw[out_idx:(out_idx + length), :] = d out_idx += length sample_idx += length d_raw = d_raw[:out_idx, :] self.raw_processor.reset() self.raw_processor.voltage_range = self._voltage_range d_cal, d_bits = self.raw_processor.process_bulk(d_raw.reshape((-1, ))) j = prefix_count k = min(prefix_count + stop - start, out_idx) return d_raw[j:k, :], d_bits[j:k], d_cal[j:k, :] def _downsampled(self, start, stop, fields): """Get the _downsampled data. :param start: The starting sample (must already be validated). :param stop: The ending sample (must already be validated). :return: The output which is dict of field name to values. """ self._fh.seek(self._data_start_position) start, stop = self._validate_range(start, stop) length = stop - start field_idxs = [] if fields is None: fields = ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb'] for field in fields: if field not in STATS_FIELD_NAMES: raise ValueError(f'Field {field} not available') idx = (STATS_FIELD_NAMES.index(field) << 4) | 1 field_idxs.append((field, idx)) rv = {} for _, field_idx in field_idxs: rv[field_idx] = np.empty(length, dtype=np.float32) out_idx = 0 while start < stop: sample_cache = self._sample_tlv(start) if sample_cache is None: break v = np.frombuffer(sample_cache['value'], dtype=np.uint8) value, sample_count = array_storage.unpack(v) b_start = start - sample_cache['start'] length = sample_cache['stop'] - sample_cache['start'] - b_start out_remaining = stop - start length = min(length, out_remaining) if length <= 0: break b_stop = b_start + length for _, field_idx in field_idxs: rv[field_idx][out_idx:(out_idx + length)] = value[field_idx][b_start:b_stop] out_idx += length start += length result = {} for field, field_idx in field_idxs: x = rv[field_idx][:out_idx] idx = STATS_FIELD_NAMES.index(field) fn = _DOWNSAMPLE_UNFORMATTERS[idx] result[field] = fn(x) return result def _reduction_tlv(self, reduction_idx): sz = self.config['samples_per_reduction'] incr = self.config['samples_per_block'] // sz tgt_r_idx = reduction_idx if self._reduction_cache and self._reduction_cache['r_start'] <= tgt_r_idx < self._reduction_cache['r_stop']: return self._reduction_cache if self._reduction_cache is not None: log.debug('_reduction_tlv cache miss: %s : %s %s', tgt_r_idx, self._reduction_cache['r_start'], self._reduction_cache['r_stop']) idx_start, idx_end = self.sample_id_range r_start = idx_start // sz r_stop = idx_end // sz if not r_start <= tgt_r_idx < r_stop: raise ValueError('reduction index out of range: %d <= %d < %d', r_start, tgt_r_idx, r_stop) if self._reduction_cache is not None and tgt_r_idx > self._reduction_cache['r_start']: # continue forward self._fh.seek(self._reduction_cache['next_collection_pos']) r_idx = self._reduction_cache['r_stop'] else: # add case for rewind? log.debug('_reduction_tlv resync to beginning') self._fh.seek(self._data_start_position) r_idx = 0 if self._f.advance() != datafile.TAG_COLLECTION_START: raise ValueError('data section must be single collection') self._fh.seek(self._data_start_position) if self._f.advance() != datafile.TAG_COLLECTION_START: raise ValueError('data section must be single collection') while True: tag, _ = self._f.peek_tag_length() if tag is None or tag == datafile.TAG_COLLECTION_END: log.error('reduction_tlv not found before end of file: %s > %s', r_stop, r_idx) break elif tag != datafile.TAG_COLLECTION_START: raise ValueError('invalid file format: not collection start') r_idx_next = r_idx + incr if tgt_r_idx >= r_idx_next: self._f.skip() r_idx = r_idx_next continue self._f.collection_goto_end() tag, value = next(self._f) if tag != datafile.TAG_COLLECTION_END: raise ValueError('invalid file format: not collection end') data = self._reduction_handler(value) self._reduction_cache = { 'r_start': r_idx, 'r_stop': r_idx_next, 'buffer': data, 'next_collection_pos': self._f.tell() } return self._reduction_cache def _reduction_handler_v1(self, value): field_count = len(self.config['reduction_fields']) b = np.frombuffer(value, dtype=np.float32).reshape((-1, field_count, _STATS_VALUES_V1)) data = stats_array_factory(len(b)) stats_array_invalidate(data) samples_per_reduction = self.config['samples_per_reduction'] data[:, :]['length'] = samples_per_reduction for idx in range(field_count): data[:, idx]['mean'] = b[:, 0, 0] data[:, idx]['variance'] = (samples_per_reduction - 1) * b[:, 0, 1] * b[:, 0, 1] data[:, idx]['min'] = b[:, 0, 2] data[:, idx]['min'] = b[:, 0, 3] return data def _reduction_handler_v2(self, value): value = np.frombuffer(value, dtype=np.uint8) data, sample_count = array_storage.unpack(value) stats_array = stats_array_factory(sample_count) stats_array_invalidate(stats_array) for key, x in data.items(): field_idx = (key >> 4) & 0x0f stats_idx = key & 0x0f stats_name = NP_STATS_NAMES[stats_idx] stats_array[:, field_idx][stats_name] = x # .astype(dtype=np.float64) for idx in range(1, len(STATS_FIELD_NAMES)): stats_array[:, idx]['length'] = stats_array[:, 0]['length'] return stats_array
[docs] def get_reduction(self, start=None, stop=None, units=None, out=None): """Get the fixed reduction with statistics. :param start: The starting sample identifier (inclusive). :param stop: The ending sample identifier (exclusive). :param units: The units for start and stop. 'seconds' or None is in floating point seconds relative to the view. 'samples' is in stream buffer sample indices. :return: The The np.ndarray((N, STATS_FIELD_COUNT), dtype=DTYPE) reduction data which normally is memory mapped to the underlying data, but will be copied on rollover. """ start, stop = self.normalize_time_arguments(start, stop, units) self._fh.seek(self._data_start_position) self._validate_range(start, stop) sz = self.config['samples_per_reduction'] if sz < 1: sz = 1 r_start = start // sz total_length = (stop - start) // sz r_stop = r_start + total_length log.info('DataReader.get_reduction(r_start=%r,r_stop=%r)', r_start, r_stop) if total_length <= 0: return stats_array_factory(0) if out is None: out = stats_array_factory(total_length) elif len(out) < total_length: raise ValueError('out too small') r_idx = r_start out_idx = 0 while r_idx < r_stop: reduction_cache = self._reduction_tlv(r_idx) if reduction_cache is None: break data = reduction_cache['buffer'] b_start = r_idx - reduction_cache['r_start'] length = reduction_cache['r_stop'] - reduction_cache['r_start'] - b_start out_remaining = r_stop - r_idx length = min(length, out_remaining) if length <= 0: break out[out_idx:(out_idx + length), :] = data[b_start:(b_start + length), :] out_idx += length r_idx += length if out_idx != total_length: log.warning('DataReader length mismatch: out_idx=%s, length=%s', out_idx, total_length) total_length = min(out_idx, total_length) return out[:total_length, :]
def _get_reduction_stats(self, start, stop): """Get statistics over the reduction :param start: The starting sample identifier (inclusive). :param stop: The ending sample identifier (exclusive). :return: The tuple of ((sample_start, sample_stop), :class:`Statistics`). """ # log.debug('_get_reduction_stats(%s, %s)', start, stop) s = Statistics() sz = self.config['samples_per_reduction'] incr = self.config['samples_per_block'] // sz r_start = start // sz if (r_start * sz) < start: r_start += 1 r_stop = stop // sz if r_start >= r_stop: # cannot use the reductions s_start = r_start * sz return (s_start, s_start), s r_idx = r_start while r_idx < r_stop: reduction_cache = self._reduction_tlv(r_idx) if reduction_cache is None: break data = reduction_cache['buffer'] b_start = r_idx - reduction_cache['r_start'] length = reduction_cache['r_stop'] - reduction_cache['r_start'] - b_start out_remaining = r_stop - r_idx length = min(length, out_remaining) if length <= 0: break r = reduction_downsample(data, b_start, b_start + length, length) s.combine(Statistics(stats=r[0, :])) r_idx += length return (r_start * sz, r_stop * sz), s def _data_get_handler_none(self, start, stop, out): return None def _data_get_handler_raw(self, start, stop, out): _, d_bits, d_cal = self._raw(start, stop) i, v = d_cal[:, 0], d_cal[:, 1] out[:, 0]['mean'] = i out[:, 1]['mean'] = v out[:, 2]['mean'] = i * v out[:, 3]['mean'] = np.bitwise_and(d_bits, 0x0f) out[:, 4]['mean'] = np.bitwise_and(np.right_shift(d_bits, 4), 0x01) out[:, 5]['mean'] = np.bitwise_and(np.right_shift(d_bits, 5), 0x01) out[:, :]['variance'] = 0.0 # zero variance, only one sample! out[:, :]['min'] = np.nan # min out[:, :]['max'] = np.nan # max def _data_get_handler_float32_v2(self, start, stop, out): k = self._downsampled(start, stop, STATS_FIELD_NAMES) out[:, 0]['mean'] = k['current'] out[:, 1]['mean'] = k['voltage'] out[:, 2]['mean'] = k['power'] out[:, 3]['mean'] = k['current_range'] out[:, 4]['mean'] = k['current_lsb'] out[:, 5]['mean'] = k['voltage_lsb'] out[:, :]['variance'] = 0.0 # zero variance, only one sample! out[:, :]['min'] = np.nan # min out[:, :]['max'] = np.nan # max
[docs] def data_get(self, start, stop, increment=None, units=None, out=None): """Get the samples with statistics. :param start: The starting sample id (inclusive). :param stop: The ending sample id (exclusive). :param increment: The number of raw samples. :param units: The units for start and stop. 'seconds' or None is in floating point seconds relative to the view. 'samples' is in stream buffer sample indices. :param out: The optional output np.ndarray((N, STATS_FIELD_COUNT), dtype=DTYPE) to populate with the result. None (default) will construct and return a new array. :return: The np.ndarray((N, STATS_FIELD_COUNT), dtype=DTYPE) data. """ log.debug('DataReader.get(start=%r,stop=%r,increment=%r)', start, stop, increment) start, stop = self.normalize_time_arguments(start, stop, units) if increment is None: increment = 1 if self._fh is None: raise IOError('file not open') increment = max(1, int(np.round(increment))) out_len = (stop - start) // increment if out_len <= 0: return stats_array_factory(0) if out is not None: if len(out) < len(out_len): raise ValueError('out too small') else: out = stats_array_factory(out_len) out_len = len(out) if increment == 1: self._data_get_handler(start, stop, out) elif increment == self.config['samples_per_reduction']: out = self.get_reduction(start, stop, out=out) elif increment > self.config['samples_per_reduction']: r_out = self.get_reduction(start, stop) increment = int(increment / self.config['samples_per_reduction']) out = reduction_downsample(r_out, 0, len(r_out), increment) else: k_start = start for idx in range(out_len): k_stop = k_start + increment self._statistics_get_handler(k_start, k_stop, out[idx, :]) k_start = k_stop return out
def summary_string(self): s = [str(self)] config_fields = ['sampling_frequency', 'samples_per_reduction', 'samples_per_tlv', 'samples_per_block'] for field in config_fields: s.append(' %s = %r' % (field, self.config[field])) return '\n'.join(s) def time_to_sample_id(self, t): if t is None: return None s_min, s_max = self.sample_id_range s = int(t * self.sampling_frequency) if s < s_min or s > s_max: log.warning(f'time %.6f out of range', t) return None return s def sample_id_to_time(self, s): if s is None: return None return s / self.sampling_frequency
[docs] def normalize_time_arguments(self, start, stop, units=None): """Normalize time arguments to range. :param start: The start time or samples. None gets the first sample, equivalent to self.sample_id_range[0]. :param stop: The stop time or samples. None gets the last sample, equivalent to self.sample_id_range[1]. :param units: The time units which is one of ['seconds', 'samples'] None (default) id equivalent to 'samples'. :return: (start_sample, stop_sample). When units=='samples', negative values are interpreted in standard Python notation relative to the last sample. None """ s_min, s_max = self.sample_id_range if units == 'seconds': start = self.time_to_sample_id(start) stop = self.time_to_sample_id(stop) elif units is None or units == 'samples': if start is not None and start < 0: start = s_max + start if stop is not None and stop < 0: stop = s_max + stop else: raise ValueError(f'invalid time units: {units}') s1 = s_min if start is None else start s2 = s_max if stop is None else stop if s1 == s_min and s1 == s2: pass # ok, zero length capture elif not s_min <= s1 < s_max: raise ValueError(f'start sample out of range: {s1}') elif not s_min <= s2 <= s_max: raise ValueError(f'start sample out of range: {s2}') return s1, s2
def _stats_update(self, stats, x, length): stats['mean'] = np.mean(x, axis=0) stats['variance'] = np.var(x, axis=0) * length stats['min'] = np.amin(x, axis=0) stats['max'] = np.amax(x, axis=0) def _statistics_get_handler_none(self, start, stop, stats): return stop - start def _statistics_get_handler_raw(self, s1, s2, stats): _, d_bits, z = self._raw(s1, s2) i, v = z[:, 0], z[:, 1] p = i * v zi = np.isfinite(i) i_view = i[zi] length = len(i_view) stats[:]['length'] = length if length: i_range = np.bitwise_and(d_bits, 0x0f) i_lsb = np.right_shift(np.bitwise_and(d_bits, 0x10), 4) v_lsb = np.right_shift(np.bitwise_and(d_bits, 0x20), 5) for idx, field in enumerate([i_view, v[zi], p[zi], i_range, i_lsb[zi], v_lsb[zi]]): self._stats_update(stats[idx], field, length) else: stats_invalidate(stats) return length def _statistics_get_handler_float32_v2(self, s1, s2, stats): rv = {'signals': {}} self._samples_get_handler_float32_v2(s1, s2, STATS_FIELD_NAMES, rv) zi = np.isfinite(rv['signals']['current']['value']) length = np.count_nonzero(zi) stats[:]['length'] = length if length: for idx, field in enumerate(STATS_FIELD_NAMES): self._stats_update(stats[idx], rv['signals'][field]['value'][zi], length) else: stats_invalidate(stats) return length def _samples_get_handler_none(self, start, stop, fields, rv): return None def _samples_get_handler_raw(self, start, stop, fields, rv): raw, bits, cal = self._raw(start, stop) signals = rv['signals'] if fields is None: fields = ['current', 'voltage', 'power', 'current_range', 'current_lsb', 'voltage_lsb', 'raw'] for field in fields: d = {'units': _SIGNALS_UNITS.get(field, '')} if field == 'current': v = cal[:, 0] elif field == 'voltage': v = cal[:, 1] elif field == 'power': v = cal[:, 0] * cal[:, 1] elif field == 'raw': v = raw d['voltage_range'] = self._voltage_range elif field == 'raw_current': v = np.right_shift(raw[0::2], 2) elif field == 'raw_voltage': v = np.right_shift(raw[1::2], 2) elif field == 'bits': v = bits elif field == 'current_range': v = np.bitwise_and(bits, 0x0f) elif field == 'current_lsb': v = np.bitwise_and(np.right_shift(bits, 4), 1) elif field == 'voltage_lsb': v = np.bitwise_and(np.right_shift(bits, 5), 1) else: log.warning('Unsupported field %s', field) v = np.array([]) d['value'] = v signals[field] = d return rv def _samples_get_handler_float32_v2(self, start, stop, fields, rv): k = self._downsampled(start, stop, fields) for field, value in k.items(): units = _SIGNALS_UNITS.get(field, '') rv['signals'][field] = {'value': value, 'units': units} return rv
[docs] def samples_get(self, start=None, stop=None, units=None, fields=None): """Get exact samples over a range. :param start: The starting time. :param stop: The ending time. :param units: The units for start and stop. 'seconds' or None is in floating point seconds relative to the view. 'samples' is in stream buffer sample indicies. :param fields: The fields to get. """ log.debug('samples_get(%s, %s, %s)', start, stop, units) s1, s2 = self.normalize_time_arguments(start, stop, units) t1, t2 = s1 / self.sampling_frequency, s2 / self.sampling_frequency rv = { 'time': { 'range': {'value': [t1, t2], 'units': 's'}, 'delta': {'value': t2 - t1, 'units': 's'}, 'sample_id_range': {'value': [s1, s2], 'units': 'samples'}, 'sample_id_limits': {'value': self.sample_id_range, 'units': 'samples'}, 'samples': {'value': s2 - s1, 'units': 'samples'}, 'input_sampling_frequency': {'value': self.input_sampling_frequency, 'units': 'Hz'}, 'output_sampling_frequency': {'value': self.output_sampling_frequency, 'units': 'Hz'}, 'sampling_frequency': {'value': self.sampling_frequency, 'units': 'Hz'}, }, 'signals': {}, } return self._samples_get_handler(s1, s2, fields, rv)
def _statistics_get(self, start, stop): s1, s2 = start, stop (k1, k2), s = self._get_reduction_stats(s1, s2) if k1 >= k2: # compute directly over samples stats = stats_factory() if not self._statistics_get_handler(s1, s2, stats): stats[3]['mean'] = I_RANGE_MISSING stats[3]['variance'] = 0 stats[3]['min'] = I_RANGE_MISSING stats[3]['max'] = I_RANGE_MISSING s = Statistics(stats=stats) else: if s1 < k1: s_start = stats_factory() self._statistics_get_handler(s1, k1, s_start) s.combine(Statistics(stats=s_start)) if s2 > k2: s_stop = stats_factory() self._statistics_get_handler(k2, s2, s_stop) s.combine(Statistics(stats=s_stop)) return s
[docs] def statistics_get(self, start=None, stop=None, units=None): """Get the statistics for the collected sample data over a time range. :param start: The starting time relative to the first sample. :param stop: The ending time. :param units: The units for start and stop. 'seconds' is in floating point seconds relative to the view. 'samples' or None is in stream buffer sample indices. :return: The statistics data structure. See the `statistics documentation <statistics.html>`_ for details. """ log.debug('statistics_get(%s, %s, %s)', start, stop, units) s1, s2 = self.normalize_time_arguments(start, stop, units) if s1 == s2: s2 = s1 + 1 # always try to produce valid statistics s = self._statistics_get(s1, s2) t_start = s1 / self.sampling_frequency t_stop = s2 / self.sampling_frequency return stats_to_api(s.value, t_start, t_stop)