This documentation describes an old release, version 3.5.0. Documentation for the latest release, 3.6.2, can be found here.
Injecting Python Code¶
Certain Zuar Runner inputters, transforms, and steps allow the injection of Python code into a running job:
inputter -
ExampleInputtransform -
PythonTransformstep -
PythonStep
TODO: add links to relevant docs
General¶
The value of python_code can be any of the following:
A string containing the name of a file located in
/var/mitto/datacontaining valid Python code.A string containing the fully-qualified path to a file containing valid Python code.
A list of one or more strings, with each string being a line of valid Python. The individual strings are joined into a single string that is passed to the Python
execfunction.
Depending upon where the python_code is used, additional constraints
may be placed on the code.
Formatting the List of Strings¶
When python_code is a list of strings, a non-standard formatting
convention is used due to inconsistent handling of indentation by
HJSON. This is best explained by example:
{
use: mitto.iov2.steps.builtin#PythonStep
python_code: [
# Executed in the context of an instance of the PythonStep class
# Because this uses the store as input, the job must be configured
# with a store.
def _dynamic_step(self):
. logging.info("start")
. from mitto.iov2.input import StoreInput
. from mitto.io.db.utils import (DEFAULT_ENCODE_ERRORS, to_copyfrom_line)
. from mitto.io.db.redshift import StreamIter
. streamer = StreamIter(
. to_copyfrom_line(record, DEFAULT_ENCODE_ERRORS).encode("utf-8")
. for record in self.environ[STORE].list()
. )
. data = streamer.read()
. logging.info("stop")
# Function must be assigned to `step`
self.step = _dynamic_step
]
}
Things to note:
The first non-space character on the line is considered to be “column 1”.
If the first non-space character is a
., it is converted to a space.Python comments can be used
The variables available for use depend upon the context of execution
Execution Context and Other Requirements¶
PythonStep¶
When using the PythonStep step, python_code must define a function
that will be valid as a method of the PythonStep class. The
function must:
Accept a single argument:
selfExpect to be called once during the execution of the job
Not return a value
Be assigned to the
stepattribute of the class instance
PythonTransform¶
When using the PythonTransform transform, python_code must define
a function that will be valid as a method of the PythonTransform
class. The function must:
Accept two arguments:
selfandrecordExpect to be called once for each row of data
Return
recordor a modified version ofrecordBe assigned to the
transform_attributed of the class instance
Tips and Tricks¶
If you are running the job manually using the CLI via
job_io.py config.json, you can invoke the python debugger via, e.g.:{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ import pdb; pdb.set_trace() ] }
Note: this is not possible when the job is being run from the UI, the scheduler, a sequence, or via
mitto run.You can easily add logging statements.
To log every row at a certain point in a set of transforms:
{ use: mitto.iov2.transform.builtin#PythonTransform python_code: [ def transform_(self, record): . logging.info("record=%s", record) . return record self.transform_ = transform_ ] }
To log the job execution environment at a certain point in the steps:
{ use: mitto.iov2.steps.builtin#PythonStep python_code: [ logging.info("environ=%s", self.environ) ] }