Developing Task Workers
Each LittleHorse SDK provides an LHTaskWorker
object or struct which lets you turn an arbitrary function or method into a LittleHorse Task.
Quickstart
The LHTaskWorker
object allows you to create and start a Task Worker in all three of our SDK's. Below, you will find compiler-ready programs that you can run which will:
- Register a
TaskDef
calledgreet
which takes in oneSTR
variable as input. - Starts a Task Worker to poll the LH Cluster asking for a task to execute.
- Java
- Go
- Python
To create a Task Worker, you need to do the following:
- Create an
LHConfig
(see this configuration documentation). - Write a Task Worker class with an annotated
@LHTaskMethod
method. - Create an
LHTaskWorker
object with your config and Task Worker Object - Register the
TaskDef
withworker.registerTaskDef()
- And finally call
.start()
.
Let's build a Task Worker for a TaskDef
named my-task
that takes in a String and returns a String. First, the Task Worker Object:
package io.littlehorse.quickstart;
import java.io.IOException;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.worker.LHTaskMethod;
import io.littlehorse.sdk.worker.LHTaskWorker;
class MyWorker {
@LHTaskMethod("greet")
public String greeting(String firstName) {
String result = "Hello there, " + firstName + "!";
System.out.println(result);
return result;
}
}
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
LHConfig config = new LHConfig();
MyWorker executable = new MyWorker();
LHTaskWorker greetWorker = new LHTaskWorker(executable, "greet", config);
Runtime.getRuntime().addShutdownHook(new Thread(greetWorker::close));
greetWorker.registerTaskDef();
greetWorker.start();
}
}
The TaskDef
is generated by the registerTaskDef()
call. It uses reflection to determine the parameter names. If you provide the options.compilerArgs << '-parameters'
setting, the resulting TaskDef Variable Names will be more descriptive, rather than just argo
, arg1
...argn
.
To create a Task Worker, you need to do three things:
- Create a
common.LHConfig
(see this configuration documentation). - Write a GoLang
func
which you will use as your Task Function. - Use the
taskworker.NewTaskWorker()
function to create anLHTaskWorker
with your config and Task Function.
At this point, you can use your LHTaskWorker
to register your TaskDef
and to start executing tasks.
package main
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
func Greet(firstName string) string {
return "Hello there, " + firstName + "!"
}
func main() {
config := littlehorse.NewConfigFromEnv()
worker, _ := littlehorse.NewTaskWorker(config, Greet, "greet")
worker.RegisterTaskDef()
worker.Start()
}
To create a Task Worker, you need to do the following:
- Create an
LHConfig
(see this configuration documentation). - Write an
async
python function which you will use as your Task Function. - Create and start an
LHTaskWorker
with that function.
Here is an example:
import asyncio
import littlehorse
from littlehorse.config import LHConfig
from littlehorse.worker import LHTaskWorker
async def greeting(first_name: str) -> str:
msg = f"Hello there, {first_name}!"
print(msg)
return msg
async def main() -> None:
config = LHConfig()
worker = LHTaskWorker(greeting, "greet", config)
worker.register_task_def()
await asyncio.sleep(1.0)
await littlehorse.start(worker)
if __name__ == "__main__":
asyncio.run(main())
Advanced Usage
The Task Worker library has some features that make advanced use cases easier.
Throwing Workflow EXCEPTION
s
As described in our Failure Handling Concept Docs, LittleHorse distinguishes between technical ERROR
s and business EXCEPTION
s:
- A technical
ERROR
denotes a technological failure, such as a Timeout caused by a network outage, or an unexpected error returned by your Task Worker. - A Business
EXCEPTION
represents an unhappy-path case in your business logic, such as when an item is out of stock or a credit card got declined.
If your Task Worker throws an uncaught error (depending on your language), then it is treated as a LittleHorse ERROR
with the error code LHErrorType.TASK_FAILURE
. However, sometimes your Task Worker notices that a business process-level failure (what LittleHorse calls an EXCEPTION
) has occurred. For example, the Task Worker could notice that a credit card got declined. In this case, you can make the TaskRun
throw a LittleHorse EXCEPTION
by using the LHTaskException
object.
The LittleHorse EXCEPTION
result is NOT retryable. That means that if your Task Method throws an LHTaskException
, it will not be retried. If it throws any error/exception other than the LHTaskException
, it will be treated as a LittleHorse ERROR
, which is retryable.
In the following example, we will throw the out-of-stock
user-defined business EXCEPTION
if the item is out of stock.
- Java
- Go
- Python
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;
class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
throw new LHTaskException("out-of-stock", "Some human readable message");
}
return "Item " + itemSku + " successfully shipped!";
}
}
package quickstart
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
const TaskDefName string = "ship-item"
func ShipItem(itemSku string) (string, error) {
if isOutOfStock(itemSku) {
return "", &littlehorse.LHTaskException{
Name: "out-of-stock",
Message: "some human readable message",
Content: &lhproto.VariableValue{},
}
}
return "Item " + itemSku + " successfully shipped!", nil
}
from littlehorse.exceptions import LHTaskException
async def ship_item(item_sku: str) -> str:
if is_out_of_stock():
raise LHTaskException("out-of-stock", "some descriptive message")
return f"successfully shipped {item_sku}!"
The first argument to the LHTaskException
constructor is the name of the EXCEPTION
we are going to throw. This is useful if you want to be able to catch specific EXCEPTION
s with specific types in your Failure Handlers. The second argument is a human-readable error message that shows up on the NodeRun
's output as the error_message
field, which is useful for debugging purposes.
If you want to throw a Failure
that has content which can be caught in your Failure Handler using the INPUT
variable name, you can use a third argument named content
. It is optional in python and is available in an overloaded method signature in Java. The below is an example of how you might throw such an EXCEPTION
:
- Java
- Go
- Python
package io.littlehorse.quickstart;
import io.littlehorse.sdk.common.exception.LHTaskException;
import io.littlehorse.sdk.worker.LHTaskMethod;
class MyWorker {
@LHTaskMethod("ship-item")
public String shipItem(String itemSku) {
if (isOutOfStock(itemSku)) {
int daysUntilBackInStock = calculateDaysUntilBackInStock(itemSku);
// The `content` of the `Failure` that is thrown will be an INT variable containing
// the number of days until the item is expected to be back in stock.
throw new LHTaskException(
"out-of-stock",
"Some human readable message",
daysUntilBackInStock);
}
return "Item " + itemSku + " successfully shipped!";
}
}
package quickstart
import (
"github.com/littlehorse-enterprises/littlehorse/sdk-go/lhproto"
"github.com/littlehorse-enterprises/littlehorse/sdk-go/littlehorse"
)
const WorkflowName string = "basic-workflow"
const TaskDefName string = "ship-item"
func ShipItem(itemSku string) (string, error) {
if isOutOfStock(itemSku) {
daysUntilBackInStock := calculateDaysUntilBackInStock(itemSku)
return "", &littlehorse.LHTaskException{
Name: "out-of-stock",
Message: "some human readable message",
Content: &lhproto.VariableValue{
Value: &lhproto.VariableValue_Int{
Int: daysUntilBackInStock,
},
},
}
}
return "Item " + itemSku + " successfully shipped!", nil
}
from littlehorse import to_variable_value
from littlehorse.exceptions import LHTaskException
async def ship_item(item_sku: str) -> None:
if is_out_of_stock(item_sku):
days_until_back_in_stock = get_days_until_back_in_stock(item_sku)
failure_content = to_variable_value(days_until_back_in_stock)
raise LHTaskException("out-of-stock", "some descriptive message", content=failure_content)
return f"successfully shipped {item_sku}!"
Json Deserialization
In some SDK's, LittleHorse will automatically deserialize JSON variables into objects or structs for you.
- Java
- Go
- Python
Let's say we have a class MyCar
as follows:
class MyCar {
String make;
String model;
public MyCar(String make, String model) {
this.make = make;
this.model = model;
}
// getters, setters omitted
}
And one of the Variable
s (for example, my-obj
) in our WfSpec
is of type JSON_OBJ
.
Let's say there's a TaskDef
called json-example
with one input variable of type JSON_OBJ
. We can have a Task Worker defined as follows:
class MyWorker {
@LHTaskMethod("json-example")
public void executeTask(MyCar input) {
System.out.println(input.getMake());
System.out.println(input.getModel());
}
}
The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"}
to an actual MyCar
object.
Let's say we have a struct MyCar
as follows:
car := &MyCar{
Make: "Ford",
Model: "Explorer",
}
And one of the Variable
s (for example, my-obj
) in our WfSpec
is of type JSON_OBJ
.
Let's say there's a TaskDef
called json-example
with one input variable of type JSON_OBJ
. We can have a Task Function that looks like:
func MyTaskFunc(car *MyCar) string {
return "the make of your car is " + car.Make + "!"
}
The Library will deserialize the JSON from something like: {"make": "Ford", "model": "Explorer"}
to an actual MyCar
struct.
Let's say we have a python Task Function as follows:
async def describe_car(car: dict[str, Any]) -> str:
msg = f"You drive a {car['brand']} model {car['model']}"
return msg
The Library will deserialize the JSON from something like: {"brand": "Ford", "model": "Explorer"}
to a python dict
.
Accessing Metadata
Sometimes, your Task Worker needs to know something about where the TaskRun
came from. Each LittleHorse SDK offers a WorkerContext
object or struct that exposes this metadata to the Task Worker.
- Java
- Go
- Python
If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext
parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef
with one input parameter of type INT
. You can access the WorkerContext
by doing the following:
class SomeWorker {
@LHTaskMethod("my-task")
public void doTask(long inputLong, WorkerContext context) {
String wfRunId = context.getWfRunId();
TaskRunId taskRunId = context.getTaskRunId();
NodeRunId nodeRunId = context.getNodeRunId();
Date timeWhenTaskWasScheduled = context.getScheduledTime();
context.log(
"This is a message that gets sent to the log output on the scheduler"\
);
int attemptNumber = context.getAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}
// This is a constant value between all attempts for this TaskRun.
// Useful to allow retries to third-party API's that accept idempotency
// keys, such as Stripe.
String idempotencyKey = context.getIdempotencyKey();
}
}
If you need to access metadata about the Task Run that is being executed, you can add a WorkerContext
parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef
with one input parameter of type INT
. You can access the WorkerContext
by doing the following:
func DoTask(long inputLong, context *littlehorse.WorkerContext) {
wfRunId := context.GetWfRunId();
taskRunId := context.GetTaskRunId();
nodeRunId := context.GetNodeRunId();
timeWhenTaskWasScheduled := context.GetScheduledTime();
context.Log(
"This is a message that gets sent to the log output on the scheduler",
);
attemptNumber := context.GetAttemptNumber();
if (attemptNumber == 0) {
// then this is the first time this Task Run has been attempted.
} else {
// then this is a retry.
}
idempotencyKey := context.GetIdempotencyKey();
}
If you need to access metadata about the Task Run that is being executed, you can add an LHWorkerContext
parameter to the end of your method signature for the Task Method.
Let's say you have a TaskDef
with one input parameter of type INT
. You can access the LHWorkerContext
by doing the following:
async def greeting(name: str, ctx: LHWorkerContext) -> str:
task_run_id = ctx.task_run_id
node_run_id = ctx.node_run_id
wf_run_id = ctx.node_run_id
time_task_was_scheduled = ctx.scheduled_time
attempt_number = ctx.attempt_number
if attempt_number > 0:
# this is a retry
pass
else:
# this is not a retry
pass
idempotency_key = ctx.idempotency_key
return "asdf"
Best Practices
Client ID
Every Task Worker instance should have a unique LHC_CLIENT_ID
set in its configuration. This is important so that you can audit which client executed which Task, and also so that the LH Server can efficiently assign partitions of work to your Task Workers.
Idempotence
With all workflow engines, it is best when your tasks are idempotent. You can use the NodeRunIdPb
from WorkerContext::getNodeRunId()
as an idempotency key.