src.processors package

Submodules

src.processors.models module

class src.processors.models.Processor(processor_id: str, blueprint_id: str, blueprint_path: str, init_params: dict, topic: str, source_topic: str, source_format: str, min_input_spacing: float, min_step_spacing: float, min_output_spacing: float, processor_root_dir: str, kafka_server: str)

Bases: object

The main process endpoint for processor processes

retrieve_init_results()

Waits for and returns the results from the process initalization

Can only be called once after initialization. Should be run in a separate thread to prevent the connection from blocking the main thread :return: the processors status as a dict

set_inputs(input_refs, measurement_refs, measurement_proportions)

Sets the input values, must not be called before start

Parameters

output_refs – the indices of the inputs that will be used

set_outputs(output_refs)

Sets the output values, must not be called before start

Parameters
  • input_refs – the indices of the inputs that will be used

  • measurement_refs – the indices of the input data values that will be used. Must be in the same order as input_ref.

  • measurement_proportions – list of scales to be used on values before inputting them. Must be in the same order as input_ref.

start(input_refs, measurement_refs, measurement_proportions, output_refs, start_params)

Starts the process, must not be called before init_results

Parameters
  • input_refs – the indices of the inputs that will be used

  • measurement_refs – the indices of the input data values that will be used. Must be in the same order as input_ref.

  • measurement_proportions – list of scales to be used on values before inputting them. Must be in the same order as input_ref.

  • output_refs – the indices of the inputs that will be used

  • start_params – the processors start parameters as a dict

Returns

the processors status as a dict

async stop()

Attempts to stop the process nicely, killing it otherwise

class src.processors.models.Variable(valueReference: int, name: str)

Bases: object

A simple container class for variable attributes

src.processors.models.processor_process(connection: multiprocessing.connection.Connection, blueprint_path: str, init_params: dict, processor_dir: str, topic: str, source_topic: str, source_format: str, kafka_server: str, min_input_spacing: float, min_step_spacing: float, min_output_spacing: float)

Runs the given blueprint as a processor

Is meant to be run in a separate process

Parameters
  • connection – a connection object to communicate with the main process

  • blueprint_path – the path to the blueprint folder

  • init_params – the initialization parameters to the processor as a dictionary

  • processor_dir – the directory the created process will run in

  • topic – the topic the process will send results to

  • source_topic – the topic the process will receive data from

  • source_format – the byte format of the data the process will receive

  • kafka_server – the address of the kafka bootstrap server the process will use

  • min_input_spacing – the minimum time between each input to the processor

  • min_step_spacing – the minimum time between each step function call on the processor

  • min_output_spacing – the minimum time between each results retrieval from the processor

Returns

src.processors.views module

src.processors.views.dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=<function make_serializable>, sort_keys=False, **kw)

A version of json.dumps that uses make serializable recursively to make objects serializable

src.processors.views.get_initialization_results(app, processor_instance)

Get the initialization results from a processor

Is meant to be run as a target in a Thread. Will put the results in app[‘topics’].

async src.processors.views.processor_create(request: aiohttp.web_request.Request)

Create a new processor from post request.

Post params:

  • id:* id of new processor instance max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore

  • blueprint:* id of blueprint to be used max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore

  • init_params: the processor specific initialization variables as a json string

  • topic:* topic to use as input to processor

  • min_output_interval: the shortest time allowed between each output from processor in seconds

async src.processors.views.processor_delete(request: aiohttp.web_request.Request)

Delete the processor with the given id.

async src.processors.views.processor_detail(request: aiohttp.web_request.Request)

Get detailed information for the processor with the given id

Append /subscribe to subscribe to the processor Append /unsubscribe to unsubscribe to the processor Append /stop to stop the processor Append /delete to delete the processor Append /outputs to get the outputs of the processor Append /inputs to get the inputs of the processor

async src.processors.views.processor_inputs_update(request: aiohttp.web_request.Request)

Update the processor inputs

Post params:

  • input_ref: reference values to the inputs to be used

  • measurement_ref: reference values to the measurement inputs to be used for the inputs. Must be in the same order as input_ref.

  • measurement_proportion: scale to be used on measurement values before inputting them. Must be in the same order as input_ref.

async src.processors.views.processor_list(request: aiohttp.web_request.Request)

List all created processors.

Returns a json object of processor id to processor status objects.

Append a processor id to get more information about a listed processor. Append /create to create a new processor instance Append /clear to delete stopped processors

async src.processors.views.processor_outputs_update(request: aiohttp.web_request.Request)

Update the processor outputs

Post params:

  • output_ref: reference values to the outputs to be used

async src.processors.views.processor_start(request: aiohttp.web_request.Request)

Start a processor from post request.

Post params:

  • id:* id of processor instance max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore

  • start_params: the processor specific start parameters as a json string

  • input_ref: list of reference values to the inputs to be used

  • output_ref: list of reference values to the outputs to be used

  • measurement_ref: list of reference values to the measurement inputs to be used for the inputs. Must be in the same order as input_ref.

  • measurement_proportion: list of scales to be used on measurement values before inputting them. Must be in the same order as input_ref.

async src.processors.views.processor_stop(request: aiohttp.web_request.Request)

Stop the processor with the given id.

async src.processors.views.processor_subscribe(request: aiohttp.web_request.Request)

Subscribe to the processor with the given id

async src.processors.views.processor_unsubscribe(request: aiohttp.web_request.Request)

Unsubscribe to the processor with the given id

async src.processors.views.processors_clear(request: aiohttp.web_request.Request)

Delete data from all processors that are not running