src.processors package¶
Submodules¶
src.processors.models module¶
-
class
src.processors.models.
Processor
(processor_id: str, blueprint_id: str, blueprint_path: str, init_params: dict, topic: str, source_topic: str, source_format: str, min_input_spacing: float, min_step_spacing: float, min_output_spacing: float, processor_root_dir: str, kafka_server: str)¶ Bases:
object
The main process endpoint for processor processes
-
retrieve_init_results
()¶ Waits for and returns the results from the process initalization
Can only be called once after initialization. Should be run in a separate thread to prevent the connection from blocking the main thread :return: the processors status as a dict
-
set_inputs
(input_refs, measurement_refs, measurement_proportions)¶ Sets the input values, must not be called before start
- Parameters
output_refs – the indices of the inputs that will be used
-
set_outputs
(output_refs)¶ Sets the output values, must not be called before start
- Parameters
input_refs – the indices of the inputs that will be used
measurement_refs – the indices of the input data values that will be used. Must be in the same order as input_ref.
measurement_proportions – list of scales to be used on values before inputting them. Must be in the same order as input_ref.
-
start
(input_refs, measurement_refs, measurement_proportions, output_refs, start_params)¶ Starts the process, must not be called before init_results
- Parameters
input_refs – the indices of the inputs that will be used
measurement_refs – the indices of the input data values that will be used. Must be in the same order as input_ref.
measurement_proportions – list of scales to be used on values before inputting them. Must be in the same order as input_ref.
output_refs – the indices of the inputs that will be used
start_params – the processors start parameters as a dict
- Returns
the processors status as a dict
-
async
stop
()¶ Attempts to stop the process nicely, killing it otherwise
-
-
class
src.processors.models.
Variable
(valueReference: int, name: str)¶ Bases:
object
A simple container class for variable attributes
-
src.processors.models.
processor_process
(connection: multiprocessing.connection.Connection, blueprint_path: str, init_params: dict, processor_dir: str, topic: str, source_topic: str, source_format: str, kafka_server: str, min_input_spacing: float, min_step_spacing: float, min_output_spacing: float)¶ Runs the given blueprint as a processor
Is meant to be run in a separate process
- Parameters
connection – a connection object to communicate with the main process
blueprint_path – the path to the blueprint folder
init_params – the initialization parameters to the processor as a dictionary
processor_dir – the directory the created process will run in
topic – the topic the process will send results to
source_topic – the topic the process will receive data from
source_format – the byte format of the data the process will receive
kafka_server – the address of the kafka bootstrap server the process will use
min_input_spacing – the minimum time between each input to the processor
min_step_spacing – the minimum time between each step function call on the processor
min_output_spacing – the minimum time between each results retrieval from the processor
- Returns
src.processors.views module¶
-
src.processors.views.
dumps
(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=<function make_serializable>, sort_keys=False, **kw)¶ A version of json.dumps that uses make serializable recursively to make objects serializable
-
src.processors.views.
get_initialization_results
(app, processor_instance)¶ Get the initialization results from a processor
Is meant to be run as a target in a Thread. Will put the results in app[‘topics’].
-
async
src.processors.views.
processor_create
(request: aiohttp.web_request.Request)¶ Create a new processor from post request.
Post params:
id:* id of new processor instance max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore
blueprint:* id of blueprint to be used max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore
init_params: the processor specific initialization variables as a json string
topic:* topic to use as input to processor
min_output_interval: the shortest time allowed between each output from processor in seconds
-
async
src.processors.views.
processor_delete
(request: aiohttp.web_request.Request)¶ Delete the processor with the given id.
-
async
src.processors.views.
processor_detail
(request: aiohttp.web_request.Request)¶ Get detailed information for the processor with the given id
Append /subscribe to subscribe to the processor Append /unsubscribe to unsubscribe to the processor Append /stop to stop the processor Append /delete to delete the processor Append /outputs to get the outputs of the processor Append /inputs to get the inputs of the processor
-
async
src.processors.views.
processor_inputs_update
(request: aiohttp.web_request.Request)¶ Update the processor inputs
Post params:
input_ref: reference values to the inputs to be used
measurement_ref: reference values to the measurement inputs to be used for the inputs. Must be in the same order as input_ref.
measurement_proportion: scale to be used on measurement values before inputting them. Must be in the same order as input_ref.
-
async
src.processors.views.
processor_list
(request: aiohttp.web_request.Request)¶ List all created processors.
Returns a json object of processor id to processor status objects.
Append a processor id to get more information about a listed processor. Append /create to create a new processor instance Append /clear to delete stopped processors
-
async
src.processors.views.
processor_outputs_update
(request: aiohttp.web_request.Request)¶ Update the processor outputs
Post params:
output_ref: reference values to the outputs to be used
-
async
src.processors.views.
processor_start
(request: aiohttp.web_request.Request)¶ Start a processor from post request.
Post params:
id:* id of processor instance max 20 chars, first char must be alphabetic or underscore, other chars must be alphabetic, digit or underscore
start_params: the processor specific start parameters as a json string
input_ref: list of reference values to the inputs to be used
output_ref: list of reference values to the outputs to be used
measurement_ref: list of reference values to the measurement inputs to be used for the inputs. Must be in the same order as input_ref.
measurement_proportion: list of scales to be used on measurement values before inputting them. Must be in the same order as input_ref.
-
async
src.processors.views.
processor_stop
(request: aiohttp.web_request.Request)¶ Stop the processor with the given id.
-
async
src.processors.views.
processor_subscribe
(request: aiohttp.web_request.Request)¶ Subscribe to the processor with the given id
-
async
src.processors.views.
processor_unsubscribe
(request: aiohttp.web_request.Request)¶ Unsubscribe to the processor with the given id
-
async
src.processors.views.
processors_clear
(request: aiohttp.web_request.Request)¶ Delete data from all processors that are not running