Function Block Creation¶
- intermediate programming knowledge required:
- ROS (Robotic Operating System)
- Python
Background¶
The programming paradigm skill-based programming aims to define reusable building blocks of small pieces of logic. These building blocks can be combined in various ways to compose a complex program. In many ways this paradigm resembles LEGO bricks.
drag&bot uses clear names for its different skill types:
- There are two types of skills: You build Programs using Function Blocks
- There are three types of function blocks: Basic, Subprogram, Control
If you look at the main builder area, you can recognize the structure:
Programs and function blocks share the same base skill definition in JSON. Each type then adds some additional fields for its specific usecase.
// BASE SKILL
{
title: String,
description: String,
image: String,
owner: ObjectId,
visible: Boolean,
rev_hash: String
// + attribute here
}
+ ATTRIBUTE | Program | Subprogram | Basic Block | Control Block |
---|---|---|---|---|
inputParameters | -- | ✔ | ✔ | ✔ |
outputParameters | -- | ✔ | ✔ | ✔ |
script | -- | -- | ✔ | ✔ |
skillflow | ✔ | ✔ | -- | ✔ |
controlflow | -- | -- | -- | ✔ |
Function Blocks have parameters, programs do not!
- Program
- A program groups function blocks in its
skillflow
array - Function Blocks
- Function Blocks can have
parameters
. - Subprogram Block: if a program is used in another program, it will be inserted as a subprogram block.
- Basic Block: contains python
script
to execute some action in the robot system. - Control Block: can change the execution order
Control Blocks contain one or more skillflows as nested array in
skillflow
, for example two if it is a if-then-else condition block. Withcontrolflow
is set how the skillflows are handled: "loop", "parallel" or "condition". Whencontrolflow
= "condition" Control Blocks may optionally contain pythonscript
to evaluate a condition deciding which skillflow to follow.
How To¶
Creating a Basic Block¶
Currently, blocks can only be created manually by using the developer tools in the drag&bot builder. In the toolbar of the builder is a Python Editor (for basic blocks and control blocks) and a JSON Editor available next to the usual graphical Block Editor.
First we will create a new Basic Block, then add input and output parameters in the meta data and then write the python script with the Python Editor.
A simplified editor for function blocks will be available soon
Create the Basis¶
1. Create a Skill via "Create Basic Block" in the program overview
Configure Input and Output Parameters¶
You can add standard parameters in the meta data section of the skill. You can open this area in the builder via "edit information" in the toolbar. The parameters defined here will be handed over to the python script as array in the same order.
If you look at the JSON Editor after adding parameters (reload/load the JSON in the editor with "get JSON"), you can see how the parameters are added to the skill.
In case of the primitive parameter datatype string
, it's important to know that later an empty string is only allowed for the parameter, if the default value is not null
. If the default value of the string parameter is null
, an empty string will not be allowed.
Non-standard parameters can be defined in the JSON Editor directly (Save changes with "set JSON"):
Parameters can be nested using type
="group" and the subparameter array parameters
to form more complex types like "position" (example simplified):
{
title: "Position",
type: "group",
dataType: "position",
parameters: [{
title: "X",
type: "primitive",
dataType: "float"
},{
title: "Y",
type: "primitive",
dataType: "float"
},{
title: "Z",
type: "primitive",
dataType: "float"
}]
}
There are unlimited levels possible, just keep stacking type
="group" and parameters
.
Further its possible to create lists of parameters by using the type
="list". A list can only contain elements of the same group or primitive (example simplified):
{
title: "Some String List",
type: "list",
dataType: "string_list",
parameters: [{
title: "Element1",
type: "primitive",
dataType: "string"
},{
title: "Element2",
type: "primitive",
dataType: "string"
},{
title: "Element3",
type: "primitive",
dataType: "string"
}]
}
List elements can be added during program creation time and list elements can be changed from the function block during execution time. A list has to have at least one element. Usually a list has one element when the function block is designed.
Setup the Execution Logic¶
Now switch to the Python Editor to start writing code! The boilerplate template for basic block python code looks like this:
#!/usr/bin/env python
# optional IMPORTS
# e.g import rospy
def execute(input_parameters, output_parameters):
# RUNS ON BLOCK EXECUTION
# do actions with hardware, software
# produce outputs
return output_parameters
def error(input_parameters, output_parameters):
# RUNS ON ERROR
# try recovery and return
return output_parameters
def abort():
# RUNS ON MANUAL ABORT OR AFTER ERROR
# stop pending actions, clean up resources
The Python snippet contains:
-
(Required) Python Shebang at the beginning:
#!/usr/bin/env python
-
(Optional) Library imports. For example:
import rospy # ROS API import time
To access the ROS API we need to import rospy
. Also we can import any standard python library like time
.
- (Required) Execute function:
def execute(input_parameters, output_parameters): return output_parameters
This is the main function of the python snippet where the Function Block logic happens. The function is executed inside an state machine executor in the drag&bot Runtime System. The function receives the input_parameters
as array (python dictionary) in the order of definition in the JSON. If the parameters used referenced values instead of static values, they are already resolved by the executor. It also receives an array of output_parameters
it is expected to return. It must return the output parameters again, otherwise the execution fails. If the function block ran before (in a loop), the output parameters array is already filled with the values of the last run. If no modification is done on the output parameters, they can be returned directly as in the example.
For example, if we have a block with 2 input parameters "IO number" and "IO value", we can access to their values from their corresponding value field as follows:
number = int(input_parameters[0]['value'])
value = bool(input_parameters[1]['value'])
Although the executor tries to match types, it is recommended to cast input parameter values to the desired python type to avoid exceptions. This makes the Function Block more robust. Outputs do not need to be cast again as they are already in the correct type if above recommendation is followed.
A parameter can contain subparameters. Subparameters are stored as a list of parameters in the parameters
field of the parameter dictionary.
This is how the complex parameter "position", we created earlier, is filled:
output_parameters[0]['parameters'][0]['value'] = position.x
output_parameters[0]['parameters'][1]['value'] = position.y
output_parameters[0]['parameters'][2]['value'] = position.z
- (Optional) Error recovery function:
def error(input_parameters, output_parameters):
return output_parameters
This function has a similar structure to the execute function. If an error occurs (Python exception) during the execute function execution, then the error function is called. This function is optional but if it doesn't exist, an exception will escalate the error to the upper-level error recovery / handling system. Escalation can be also manually triggered by raising an exception in this function. The function must return the output parameters. If this function terminates correctly and returns the output parameters, then the error will be considered recovered and the execution of the program will continue.
- (Optional) Abort function:
def abort():
pass
Abort function is executed during program termination as a dispose function in order to clean and close pending connections and provide the posibility to interrupt threads. For example, a blocking ROS service call in the execute function must be interrupted in the abort function to avoid future connection errors.
Python snippet execution process¶
The state machine executor in the drag&bot Runtime System first loads a program. During loading, all the libraries needed for each snippet are imported and global variables initialized. Then it starts the program execution. Once it is a block`s turn, the execute function is called. If it returns the output parameters correctly, the normal state machine execution continues. If not or if an exception was raised, the error function is called to try to recover programmatically from the error. If the error function returns the output parameters correctly, the error is considered recovered and normal execution continues. If a new exception happens or is manually raised, the state machine may look for other ways to handle the error in the layer above or stops the execution. If the error is not recovered, the program execution terminates and the abort function is called to dispose the snippet.
The Python snippet will be loaded as independent module. This means the global scope will be local to the module and be kept between snippet function callings. This allows for example to create counters inside the loops:
#!/usr/bin/env python # this global counter will preserve its value over block runs counter = 0 def execute(input_parameters, output_parameters): global counter counter = counter + 1 output_parameters[0]['value'] = str(counter) output_parameters[1]['value'] = counter return output_parameters
More Examples¶
Wait Function block This blocks waits a given number of seconds as input parameter. Event library is used because it can be interrupted.
#!/usr/bin/env python
from threading import Event
exit = Event()
def execute(input_parameters, output_parameters):
exit.wait(float(input_parameters[0]['value']))
return output_parameters
def abort():
exit.set()
Get Position block This block reads the current robot position from ROS and publishes it as output parameter:
#!/usr/bin/env python
import rospy
from robot_movement_interface.msg import *
subscriber = None
position = None
abort = False
def callback(data):
global position
position = data
def execute(input_parameters, output_parameters):
global subscriber
global position
global abort
abort = False
position = None
subscriber = rospy.Subscriber("/tool_frame", EulerFrame, callback)
while not position and not abort:
rospy.sleep(0.01)
output_parameters[0]['parameters'][0]['parameters'][0]['value'] = position.x
output_parameters[0]['parameters'][0]['parameters'][1]['value'] = position.y
output_parameters[0]['parameters'][0]['parameters'][2]['value'] = position.z
output_parameters[0]['parameters'][1]['parameters'][0]['value'] = position.alpha
output_parameters[0]['parameters'][1]['parameters'][1]['value'] = position.beta
output_parameters[0]['parameters'][1]['parameters'][2]['value'] = position.gamma
subscriber.unregister()
return output_parameters
def abort():
global abort
abort = True
Move IIWA robot block This block calls the ROS action called commands_action_server which is the standard ROS Robot Movement Interface that drag&bot supports for moving the robots. In this case, abort mechanism first cancel pending action calls and then stops the robot by calling robot stop service to be sure that the robot stops.
#!/usr/bin/env python
import rospy
import yaml
import json
import actionlib
import unicodedata
import random
import std_srvs.srv
from robot_movement_interface.msg import *
def execute(input_parameters, output_parameters):
global client
# Stablish the connection
client = actionlib.SimpleActionClient('commands_action_server', robot_movement_interface.msg.CommandsAction)
client.wait_for_server()
command_action_goal = robot_movement_interface.msg.CommandsGoal()
command_list = CommandList()
command_list.commands = []
command_list.replace_previous_commands = True
command_id = random.randint(0,2047483647)
# Create a ROS action call message with the incoming information
for waypoint in input_parameters[0]['parameters']:
all_params = dict((y['title'], y) for y in waypoint['parameters'])
# target pose
pose_target = dict((y['title'], y) for y in all_params['Pose']['parameters'])
pose_target_position = dict((y['title'], y) for y in pose_target['Position']['parameters'])
pose_target_orientation = dict((y['title'], y) for y in pose_target['Orientation']['parameters'])
# the reference system
pose_ref = dict((y['title'], y) for y in all_params['Pose reference']['parameters'])
pose_ref_position = dict((y['title'], y) for y in pose_ref['Position']['parameters'])
pose_ref_orientation = dict((y['title'], y) for y in pose_ref['Orientation']['parameters'])
# the movement parameters
speed = dict((y['title'], y) for y in all_params['Speed']['parameters'])
acceleration = dict((y['title'], y) for y in all_params['Acceleration']['parameters'])
blending = dict((y['title'], y) for y in all_params['Blending']['parameters'])
# create the reference and prefill
ref = EulerFrame()
ref.x = float(pose_ref_position['x']['value'])
ref.y = float(pose_ref_position['y']['value'])
ref.z = float(pose_ref_position['z']['value'])
# alpha, beta, gamma is essentially rz, ry, rx
ref.alpha = float(pose_ref_orientation['z']['value'])
ref.beta = float(pose_ref_orientation['y']['value'])
ref.gamma = float(pose_ref_orientation['x']['value'])
command = Command()
command.command_id = command_id
command.pose_reference_frame = ref
command.command_type = unicodedata.normalize('NFKD', all_params['Movement type']['value']).encode('ascii','ignore')
command.pose_type = 'EULER_INTRINSIC_ZYX'
command.pose = [float(pose_target_position['x']['value']),
float(pose_target_position['y']['value']),
float(pose_target_position['z']['value']),
float(pose_target_orientation['z']['value']),
float(pose_target_orientation['y']['value']),
float(pose_target_orientation['x']['value'])]
command.velocity_type = speed['speed_type']['value']
command.velocity = [float(speed['speed_value']['value'])]
command.blending_type = blending['blending_type']['value']
command.blending = [float(blending['blending_value']['value'])]
command.acceleration_type = acceleration['acceleration_type']['value']
command.acceleration = [float(acceleration['acceleration_value']['value'])]
#command.pose_reference = all_params['pose_reference']['value']
command.pose_reference = ''
command.additional_parameters = ['iiwa']
command.additional_parameters.append(str(all_params['Tool']['value']))
command.additional_parameters.append('')
#command.additional_parameters.append(str(all_params['Workpiece']['value']))
redundancy = float(all_params['Redundancy']['value']) * 3.14159265 / 180.0
if all_params['Control command']['value']:
command.additional_parameters.append(str(all_params['Control command']['value']) + ' redundancy ' + str(redundancy))
else:
command.additional_parameters.append('redundancy ' + str(redundancy))
command_list.commands.append(command)
command_id += 1
#command_action_goal.goal = command_list
command_action_goal.commands = command_list
#print command_action_goal
# Send the action call and wait until it is finished
client.send_goal(command_action_goal)
client.wait_for_result()
result_msg = client.get_result()
if result_msg.result.result_code != 0:
raise Exception("Robot command didn't finish correctly")
return output_parameters
def abort():
global client
rospy.loginfo("Handling abort()-Section in move skill.")
client.cancel_all_goals()
stop_service = rospy.ServiceProxy('stop_robot_right_now', std_srvs.srv.Trigger)
stop_service()
# IMPORTANT: Wait for result, otherwise actions will continue infinitely
client.wait_for_result()