Version: 6.3.3

Build Jobs

General instructions for defining Skills of all types are found here. This page contains details specific to the development of job type Skills.

When to use jobs

Jobs are ideal for Agents that need to train an ML model, as opposed to an on-demand function or continuously running daemon. You should use jobs when:

  • You need to run high performance batched processes.
  • You need to consume more resources.
  • You do not need a Skill or action that is continuously running.

Jobs in Fabric consist of a definition/metadata that is orchestrated and run in Kubernetes.

A Task is the actual running instance of a job's definition in Kubernetes.

Several types of job templates are available to accelerate development and improve scaling and throughput. These job types are detailed in the Job Types section.

Template files

When you run the cortex workspaces generate command or create a Skill in any other way template files are generated. During the Skill Building lifecycle you may modify these files.

  • Dockerfile - Instructions for building the Docker image

  • main.py - A starting action definition that uses the cortex-python library

  • README.md - A getting started helper for Skill collaboration or sharing

  • requirements.txt - A list of dependency libraries

  • skill.yaml - A starter Skill definition for the runtime selected

  • message.json- Contains the payload needed to invoke the Skill

    The simple job file structure looks like this:

    /Users/smichalski/job
    └──┐
    ├─ skills
    │ └─ smmjob
    │ ├─ skill.yaml
    │ ├─ actions
    │ │ └─ smmjob
    │ │ ├─ requirements.txt
    │ │ ├─ main.py
    │ │ └─ Dockerfile
    │ └─ invoke
    │ └─ request
    │ └─ message.json
    └─ docs
    └─ smmjob
    └─ README.md

Define job main.py

The main.py file provides the instructions for running (invoking) the action and the response.

For job actions you must define:

  • Input

  • Print (output) parameters

    Job main.py

    import json
    if __name__ == '__main__':
    import sys
    params = sys.argv[1]
    params = json.loads(params)
    print(f'Received: {params["payload"]}')

Modify requirements.txt File

The contents of this file vary by use case. The requirements below are typical of a simple job.

requirements.txt example

cortex-python

Skill Definition

The Skill provides a wrapper for the action and (optionally the model) that specifies the properties, parameters, and routing required to run.

The Skill is defined in the skill.yaml. For general information about Skills go to the Define Skills page.

camel: 1.0.0
name: smmjob
title: smmjob Title
description: smmjob Description
inputs:
- name: request
title: Job Request
parameters:
- name: params
type: object
description: Request Parameters
required: true
routing:
all:
action: smmjob
runtime: cortex/jobs
output: response
outputs:
- name: response
title: Job Response
parameters:
- name: result
type: object
description: Job Response
required: true
actions:
- name: smmjob
type: job
image: smmjob
environmentVariables: '"TEST"="value"'

Save Skill and Build Skill Images

  1. Build (modify) the Dockerfile. Dockerfiles directly invoke executables. The Dockerfile sets an ENTRYPOINT that allows the image to run the python executable.

    For best practices on how to write a Dockerfile, see Dockerfile best practices.

    Default Dockerfile

    FROM c12e/cortex-python-lib:fabric6
    COPY --from=redboxoss/scuttle:latest /scuttle /bin/scuttle
    ENV ENVOY_ADMIN_API=http://localhost:15000
    ENV ISTIO_QUIT_API=http://localhost:15020
    ENV SCUTTLE_LOGGING=false
    ADD . /app
    RUN pip install -r /app/requirements.txt
    ENTRYPOINT ["scuttle", "python", "/app/main.py"]

    NOTE: Scuttle is required to run jobs in Fabric.

    NOTE: If you are running jobs with Istio sidecar injection enabled that communicate with the Fabric API Gateway through an Envoy proxy, see the section below.

  2. Build a new local Docker image in the Skill directory that contains your Dockerfile.

    cortex workspaces build

    This creates Docker images tagged with the <image-name>:<version>. Where image and Skill name are the same.

    To build a single Skill you may append the command with --skill skillName.

    Example

    cortex workspaces build --skill smmJob

Publish Skills to Image Repository and Deploy

To push/publish AND deploy ALL WORKSPACE SKILL IMAGES to an image registry that is connected to your Kubernetes instance run:

cortex workspaces publish

Images published to repository connected to you Fabric Kubernetes instance are also deployed automatically.

To publish a single Skill you may append the command with --skill skillName.

Example

cortex workspaces publish --skill smmJob

Additional Job Configuration

Log job messages

To log messages from your job, write messages to stderr or stdout, or you can configure your standard logging library to redirect output to stderr or stdout. All output written to these streams is returned in the job response payload and stored as log output.

Python example

import sys
#some things happen
sys.stdout.write('Progress: 10 percent\n ')
#some more things happen
#something goes wrong
sys.stderr.write('Error: description of error\n ')

Job response

The following image displays messages collected from a job instance as a single line in the response payload.

job-log-response

Invoking Jobs on clusters with Envoy enabled

The Istio sidecar or Envoy is required for Fabric jobs to participate in the Istio service mesh. By default Fabric disables Istio sidecar injection for jobs, so the job container can communicate directly with the Fabric API Gateway.

Jobs with Istio sidecar injection enabled communicate with the Fabric API Gateway through an Envoy proxy. The extra container in the job’s pod introduces two main issues:

  • First, Because it takes time for the Envoy container to download metadata from the Istio daemon, the Job container may error out or exit if a networking issue occurs.
  • Second, because the Envoy container is designed not to exit or shutdown while it is handling network requests, Kubernetes cannot identify when the job is completed.

To work around these issues some logic MUST be added to your job container to both wait for Envoy to be ready to communicate and to terminate Envoy when the job’s main process has exited.

Traditionally, this would be implemented in a bash script wrapper with sleeps and a trap. However there are more robust options available such as Scuttle.

Scuttle waits for Envoy to be ready and terminates Envoy when the job’s process exits.

To work with jobs in Kubernetes, the Scuttle binary must be added to your job’s Docker image as follows:

FROM python:3.7.4-buster
COPY --from=c12e/scuttle:latest /scuttle /bin/scuttle
ADD . /app
ENV ENVOY_ADMIN_API=http://localhost:15000
ENV ISTIO_QUIT_API=http://localhost:15020
RUN pip install -r /app/requirements.txt
ENTRYPOINT ["scuttle", "python", "/app/main.py"]

Send Message Feature

The Send Message feature allows users to send individual output records to the next Skill in an agent.

For example: If you are an insurance company with an application that reviews claims for evidence of fraud, you may want to see the output for each claim as it is processed by the system. In an Agent with a normal set of Skills performing this kind of action, the job output is not passed to the next Skill until it is COMPLETED.

The send_message API method, allows the output to be sent as each claim is processed.

The send-message feature calls the send-message API method in the Skill.

EXAMPLE

In this example the send-message Job generates N record that are sent to the Get Message Skill.

skill.yaml file config:

camel: 1.0.0
name: send-message
title: Generate random people
description: Job generates N json records calling send message
inputs:
- name: input
title: Job Input
parameters:
- name: num_records
type: object
description: Number or records
required: false
routing:
all:
action : send-message-example
runtime: cortex/jobs
output: output
outputs:
- name: output
title: Job result
parameters:
- name: document_key
type: string
description: Location of file in managed content
actions:
- name: send-message-example
type: job
image: send-message-example:latest
environmentVariables: "\"TEST\"=\"value\""

main.py file config:

import json
import sys
from cortex import Cortex
from cortex.skill import SkillClient
def process(params):
client = Cortex.from_message(params, verify_ssl_cert=False)
payload = params['payload']
activationId = params['activationId']
channelId = params['channelId']
if 'activationId' not in params:
raise Exception("'activationId' is required in the payload")
num_records = 10
if 'num_records' in payload:
num_records = payload['num_records']
print(f'Generating datafile with {num_records} records')
skillClient = SkillClient(client)
for i in range(1,num_records+1):
rec = {'text': f'Send message {i}'}
res = skillClient.send_message(activationId, channelId, 'output', rec )
print(res)
print(f'Generated {num_records} messages')
if __name__ == "__main__":
if len(sys.argv)<2:
print("Message/payload argument is required")
exit(1)
# The last argument in sys.argv is the payload from cortex
process(json.loads(sys.argv[-1]))

Example Output:

"success": true,
"requestId": "41675c78-ffc2-47d7-9655-a80461924275",
"agentName": "send-message",
"serviceName": "input",
"sessionId": "41675c78-ffc2-47d7-9655-a80461924275",
"projectId": "vtest",
"username": "cortex@example.com",
"payload": {
"num_records": 10
},
"start": 1646933424374,
"status": "COMPLETE",
"end": 1646933438199,
"response": [
{
"wrote": {
"text": "Send message 1"
}
},
{
"wrote": {
"text": "Send message 2"
}
},
{
"wrote": {
"text": "Send message 3"
}
},
{
"wrote": {
"text": "Send message 4"
}
},
{
"wrote": {
"text": "Send message 5"
}
},
{
"wrote": {
"text": "Send message 6"
}
},
{
"wrote": {
"text": "Send message 7"
}
},
{
"wrote": {
"text": "Send message 8"
}
},
{
"wrote": {
"text": "Send message 9"
}
},
{
"wrote": {
"text": "Send message 10"
}
}
]
}