Skip to content

Function Block Creation

intermediate programming knowledge required:
ROS (Robotic Operating System)
Python

Background

The programming paradigm skill-based programming aims to define reusable building blocks of small pieces of logic. These building blocks can be combined in various ways to compose a complex program. In many ways this paradigm resembles LEGO bricks.

drag&bot uses clear names for its different skill types:

  1. There are two types of skills: You build Programs using Function Blocks
  2. There are three types of function blocks: Basic, Subprogram, Control
graph TD A[Skill] --> B[Program] A --> C[Function Block] C --> D[Basic Block] C --> E[Subprogram Block] C --> F[Control Block] style D stroke: none style E stroke: none style F stroke: none style B stroke: green style C stroke: blue

If you look at the main builder area, you can recognize the structure: programs and function blocks in the builder

Programs and function blocks share the same base skill definition in JSON. Each type then adds some additional fields for its specific usecase.

// BASE SKILL
{
    title: String,
    description: String,
    image: String,
    owner: ObjectId,
    visible: Boolean,
    rev_hash: String

    // + attribute here
}
+ ATTRIBUTE Program Subprogram Basic Block Control Block
inputParameters --
outputParameters --
script -- --
skillflow --
controlflow -- -- --

Function Blocks have parameters, programs do not!

Program
A program groups function blocks in its skillflow array
Function Blocks
Function Blocks can have parameters.
Subprogram Block: if a program is used in another program, it will be inserted as a subprogram block.
Basic Block: contains python script to execute some action in the robot system.
Control Block: can change the execution order Control Blocks contain one or more skillflows as nested array in skillflow, for example two if it is a if-then-else condition block. With controlflow is set how the skillflows are handled: "loop", "parallel" or "condition". When controlflow = "condition" Control Blocks may optionally contain python script to evaluate a condition deciding which skillflow to follow.

How To

Creating a Basic Block

Currently, blocks can only be created manually by using the developer tools in the drag&bot builder. In the toolbar of the builder is a Python Editor (for basic blocks and control blocks) and a JSON Editor available next to the usual graphical Block Editor.

available developer tools

First we will create a new Basic Block, then add input and output parameters in the meta data and then write the python script with the Python Editor.

A simplified editor for function blocks will be available soon

Create the Basis

New Program Button 1. Create a Skill via "Create Basic Block" in the program overview

Configure Input and Output Parameters

You can add standard parameters in the meta data section of the skill. You can open this area in the builder via "edit information" in the toolbar. The parameters defined here will be handed over to the python script as array in the same order.

If you look at the JSON Editor after adding parameters (reload/load the JSON in the editor with "get JSON"), you can see how the parameters are added to the skill.

In case of the primitive parameter datatype string, it's important to know that later an empty string is only allowed for the parameter, if the default value is not null. If the default value of the string parameter is null, an empty string will not be allowed.

Non-standard parameters can be defined in the JSON Editor directly (Save changes with "set JSON"):

Parameters can be nested using type="group" and the subparameter array parameters to form more complex types like "position" (example simplified):

{
    title: "Position",
    type: "group",
    dataType: "position",
    parameters: [{
        title: "X",
        type: "primitive",
        dataType: "float"
    },{
        title: "Y",
        type: "primitive",
        dataType: "float"
    },{
        title: "Z",
        type: "primitive",
        dataType: "float"
    }]
}

There are unlimited levels possible, just keep stacking type="group" and parameters.

Further its possible to create lists of parameters by using the type="list". A list can only contain elements of the same group or primitive (example simplified):

{
    title: "Some String List",
    type: "list",
    dataType: "string_list",
    parameters: [{
        title: "Element1",
        type: "primitive",
        dataType: "string"
    },{
        title: "Element2",
        type: "primitive",
        dataType: "string"
    },{
        title: "Element3",
        type: "primitive",
        dataType: "string"
    }]
}

List elements can be added during program creation time and list elements can be changed from the function block during execution time. A list has to have at least one element. Usually a list has one element when the function block is designed.

Setup the Execution Logic

Now switch to the Python Editor to start writing code! The boilerplate template for basic block python code looks like this:

#!/usr/bin/env python

# optional IMPORTS
# e.g import rospy

def execute(input_parameters, output_parameters):
    # RUNS ON BLOCK EXECUTION
    # do actions with hardware, software
    # produce outputs
    return output_parameters

def error(input_parameters, output_parameters):
    # RUNS ON ERROR
    # try recovery and return
    return output_parameters

def abort():
    # RUNS ON MANUAL ABORT OR AFTER ERROR
    # stop pending actions, clean up resources

The Python snippet contains:

  • (Required) Python Shebang at the beginning:

    #!/usr/bin/env python
    

  • (Optional) Library imports. For example:

    import rospy # ROS API
    import time
    

To access the ROS API we need to import rospy. Also we can import any standard python library like time.

  • (Required) Execute function:
    def execute(input_parameters, output_parameters):
        return output_parameters
    

This is the main function of the python snippet where the Function Block logic happens. The function is executed inside an state machine executor in the drag&bot Runtime System. The function receives the input_parameters as array (python dictionary) in the order of definition in the JSON. If the parameters used referenced values instead of static values, they are already resolved by the executor. It also receives an array of output_parameters it is expected to return. It must return the output parameters again, otherwise the execution fails. If the function block ran before (in a loop), the output parameters array is already filled with the values of the last run. If no modification is done on the output parameters, they can be returned directly as in the example.

For example, if we have a block with 2 input parameters "IO number" and "IO value", we can access to their values from their corresponding value field as follows:

number = int(input_parameters[0]['value'])
value = bool(input_parameters[1]['value'])

Although the executor tries to match types, it is recommended to cast input parameter values to the desired python type to avoid exceptions. This makes the Function Block more robust. Outputs do not need to be cast again as they are already in the correct type if above recommendation is followed.

A parameter can contain subparameters. Subparameters are stored as a list of parameters in the parameters field of the parameter dictionary. This is how the complex parameter "position", we created earlier, is filled:

output_parameters[0]['parameters'][0]['value'] = position.x
output_parameters[0]['parameters'][1]['value'] = position.y
output_parameters[0]['parameters'][2]['value'] = position.z

  • (Optional) Error recovery function:
def error(input_parameters, output_parameters):
    return output_parameters

This function has a similar structure to the execute function. If an error occurs (Python exception) during the execute function execution, then the error function is called. This function is optional but if it doesn't exist, an exception will escalate the error to the upper-level error recovery / handling system. Escalation can be also manually triggered by raising an exception in this function. The function must return the output parameters. If this function terminates correctly and returns the output parameters, then the error will be considered recovered and the execution of the program will continue.

  • (Optional) Abort function:
def abort():
    pass

Abort function is executed during program termination as a dispose function in order to clean and close pending connections and provide the posibility to interrupt threads. For example, a blocking ROS service call in the execute function must be interrupted in the abort function to avoid future connection errors.

Python snippet execution process

execution flow
The state machine executor in the drag&bot Runtime System first loads a program. During loading, all the libraries needed for each snippet are imported and global variables initialized. Then it starts the program execution. Once it is a block`s turn, the execute function is called. If it returns the output parameters correctly, the normal state machine execution continues. If not or if an exception was raised, the error function is called to try to recover programmatically from the error. If the error function returns the output parameters correctly, the error is considered recovered and normal execution continues. If a new exception happens or is manually raised, the state machine may look for other ways to handle the error in the layer above or stops the execution. If the error is not recovered, the program execution terminates and the abort function is called to dispose the snippet.

The Python snippet will be loaded as independent module. This means the global scope will be local to the module and be kept between snippet function callings. This allows for example to create counters inside the loops:

#!/usr/bin/env python

# this global counter will preserve its value over block runs
counter = 0

def execute(input_parameters, output_parameters):
    global counter
    counter = counter + 1
    output_parameters[0]['value'] = str(counter)
    output_parameters[1]['value'] = counter
    return output_parameters

More Examples

Wait Function block This blocks waits a given number of seconds as input parameter. Event library is used because it can be interrupted.

#!/usr/bin/env python
from threading import Event

exit = Event()

def execute(input_parameters, output_parameters):
    exit.wait(float(input_parameters[0]['value']))
    return output_parameters

def abort():
    exit.set()

Get Position block This block reads the current robot position from ROS and publishes it as output parameter:

#!/usr/bin/env python

import rospy
from robot_movement_interface.msg import *

subscriber = None
position = None
abort = False

def callback(data):
    global position
    position = data

def execute(input_parameters, output_parameters):
    global subscriber
    global position
    global abort

    abort = False
    position = None
    subscriber = rospy.Subscriber("/tool_frame", EulerFrame, callback)

    while not position and not abort:
        rospy.sleep(0.01)

    output_parameters[0]['parameters'][0]['parameters'][0]['value'] = position.x
    output_parameters[0]['parameters'][0]['parameters'][1]['value'] = position.y
    output_parameters[0]['parameters'][0]['parameters'][2]['value'] = position.z
    output_parameters[0]['parameters'][1]['parameters'][0]['value'] = position.alpha
    output_parameters[0]['parameters'][1]['parameters'][1]['value'] = position.beta
    output_parameters[0]['parameters'][1]['parameters'][2]['value'] = position.gamma

    subscriber.unregister()

    return output_parameters

def abort():
    global abort
    abort = True

Move IIWA robot block This block calls the ROS action called commands_action_server which is the standard ROS Robot Movement Interface that drag&bot supports for moving the robots. In this case, abort mechanism first cancel pending action calls and then stops the robot by calling robot stop service to be sure that the robot stops.

#!/usr/bin/env python
import rospy
import yaml
import json
import actionlib
import unicodedata
import random
import std_srvs.srv

from robot_movement_interface.msg import *

def execute(input_parameters, output_parameters):

    global client

    # Stablish the connection
    client = actionlib.SimpleActionClient('commands_action_server', robot_movement_interface.msg.CommandsAction)
    client.wait_for_server()

    command_action_goal = robot_movement_interface.msg.CommandsGoal()
    command_list = CommandList()
    command_list.commands = []
    command_list.replace_previous_commands = True
    command_id = random.randint(0,2047483647)

    # Create a ROS action call message with the incoming information
    for waypoint in input_parameters[0]['parameters']:
        all_params = dict((y['title'], y) for y in waypoint['parameters'])

        # target pose
        pose_target = dict((y['title'], y) for y in all_params['Pose']['parameters'])
        pose_target_position = dict((y['title'], y) for y in pose_target['Position']['parameters'])
        pose_target_orientation = dict((y['title'], y) for y in pose_target['Orientation']['parameters'])

        # the reference system
        pose_ref = dict((y['title'], y) for y in all_params['Pose reference']['parameters'])
        pose_ref_position = dict((y['title'], y) for y in pose_ref['Position']['parameters'])
        pose_ref_orientation = dict((y['title'], y) for y in pose_ref['Orientation']['parameters'])

        # the movement parameters
        speed = dict((y['title'], y) for y in all_params['Speed']['parameters'])
        acceleration = dict((y['title'], y) for y in all_params['Acceleration']['parameters'])
        blending = dict((y['title'], y) for y in all_params['Blending']['parameters'])

        # create the reference and prefill
        ref = EulerFrame()
        ref.x = float(pose_ref_position['x']['value'])
        ref.y = float(pose_ref_position['y']['value'])
        ref.z = float(pose_ref_position['z']['value'])
        # alpha, beta, gamma is essentially rz, ry, rx
        ref.alpha = float(pose_ref_orientation['z']['value'])
        ref.beta =  float(pose_ref_orientation['y']['value'])
        ref.gamma = float(pose_ref_orientation['x']['value'])

        command = Command()
        command.command_id = command_id
        command.pose_reference_frame = ref
        command.command_type = unicodedata.normalize('NFKD', all_params['Movement type']['value']).encode('ascii','ignore')
        command.pose_type = 'EULER_INTRINSIC_ZYX'
        command.pose = [float(pose_target_position['x']['value']),
                        float(pose_target_position['y']['value']),
                        float(pose_target_position['z']['value']),
                        float(pose_target_orientation['z']['value']),
                        float(pose_target_orientation['y']['value']),
                        float(pose_target_orientation['x']['value'])]
        command.velocity_type = speed['speed_type']['value']
        command.velocity = [float(speed['speed_value']['value'])]
        command.blending_type = blending['blending_type']['value']
        command.blending = [float(blending['blending_value']['value'])]
        command.acceleration_type = acceleration['acceleration_type']['value']
        command.acceleration = [float(acceleration['acceleration_value']['value'])]
        #command.pose_reference = all_params['pose_reference']['value']
        command.pose_reference = ''

        command.additional_parameters = ['iiwa']
        command.additional_parameters.append(str(all_params['Tool']['value']))
        command.additional_parameters.append('')
        #command.additional_parameters.append(str(all_params['Workpiece']['value']))

        redundancy = float(all_params['Redundancy']['value']) * 3.14159265 / 180.0
        if all_params['Control command']['value']:
            command.additional_parameters.append(str(all_params['Control command']['value']) + ' redundancy ' + str(redundancy))
        else:
            command.additional_parameters.append('redundancy ' + str(redundancy))

        command_list.commands.append(command)
        command_id += 1

    #command_action_goal.goal = command_list
    command_action_goal.commands = command_list

    #print command_action_goal

    # Send the action call and wait until it is finished
    client.send_goal(command_action_goal)
    client.wait_for_result()

    result_msg = client.get_result()

    if result_msg.result.result_code != 0:
        raise Exception("Robot command didn't finish correctly")

    return output_parameters

def abort():
    global client
    rospy.loginfo("Handling abort()-Section in move skill.")

    client.cancel_all_goals()

    stop_service = rospy.ServiceProxy('stop_robot_right_now', std_srvs.srv.Trigger)
    stop_service()

    # IMPORTANT: Wait for result, otherwise actions will continue infinitely
    client.wait_for_result()