3. Build and configure your own client/worker
You can develop a custom client to submit jobs to ArmoniK and invoke services embedded in a worker shared library.
3.1. Background: execution paths, blobs, and the convention
3.1.1. Two execution paths
The C++ SDK supports two ways for a client to communicate with a worker:
Convention path |
Legacy path |
|
|---|---|---|
Task input type |
|
|
Payload wire format |
JSON envelope with named inputs |
C++-specific binary encoding |
Library identification |
|
filename: |
Cross-SDK compatible |
Yes (C++, Java, C#) |
No (C++ only) |
Status |
Recommended |
Deprecated |
The DynamicWorker selects the path at runtime: the presence of the ConventionVersion key in TaskOptions.options triggers the convention path; its absence falls through to the unchanged legacy path.
3.1.2. Blobs
A blob is a named, immutable byte sequence stored in ArmoniK’s result storage and identified by a UUID. Blobs are the standard unit for passing data between clients and workers in the convention path — the concept and terminology are shared with the Java and C# SDKs.
In the convention path:
Task inputs are uploaded as blobs before submission. The SDK does this automatically when you use
BlobDefinition::FromData.The worker library itself can also be uploaded as a blob via
SessionService::UploadLibrary, so the worker pod can fetch and load it at execution time without needing it on the local filesystem.The output of a completed task is stored as a blob. Its ID (
result_id) is passed toHandleResponseand can be fed directly into a downstream task viaBlobDefinition::FromBlobId— no re-upload needed.
In the legacy path the payload is embedded directly in the task submission request. Blobs are still used internally — the SDK serialises the payload into a blob and the worker retrieves it from blob storage — but this is entirely transparent: the TaskPayload interface hides the blob lifecycle from the caller, and HandleResponse does not expose a result_id for the result blob.
3.1.3. The convention
The convention is an agreed encoding that allows a client written on top of any supported SDK (C++, Java, C#) to submit tasks to a worker written on top of any supported SDK. It consists of two parts:
Task options keys — carried in the flat TaskOptions.options map:
Key |
Set by |
Purpose |
|---|---|---|
|
|
Marks the task as a convention task; value is |
|
|
Path to the |
|
|
Blob ID of the uploaded |
|
|
Method name forwarded to |
Payload format — a JSON envelope {"inputs":{...},"outputs":{...}} replacing the C++-specific binary encoding. This is an internal wire format managed by the SDK; workers and clients do not need to parse it directly.
3.1.4. Backward compatibility
All changes are backward compatible. Existing code that uses TaskPayload and HandleResponse(result_payload, taskId) continues to compile and run without modification. The only visible change is a compiler deprecation warning at TaskPayload call sites. The convention is strictly opt-in: if ConventionVersion is absent from task options the DynamicWorker follows the legacy path unchanged.
For new code, use TaskDefinition and override HandleResponse(result_payload, taskId, result_id). If you have existing code that overrides HandleResponse(result_payload, taskId), it will still compile and run — the SDK calls the result_id overload internally and the default implementation delegates to the legacy one — but HandleResponse(result_payload, taskId) is deprecated and will be removed in a future release.
3.2. Client
The SessionService class in ArmoniK::Sdk::Client is the entry point for task submission. Its constructor takes a Properties object (configuration + default task options) and a logger.
3.2.1. Configure the session
The Properties object is loaded from a JSON file or environment variables:
Endpoint— control plane addressmTLS— enable/disable mTLScaCert/clientCert/clientKey— certificate files for mTLSsslValidation— strict SSL validation (default:true)
Environment variables override the JSON file. The default task options carried in Properties include:
MaxRetries(default: 3)Priority(default: 2)AppName,AppVersion,AppNamespace,AppService— library and service identificationPartitionId— infrastructure partition to route tasks toFor the convention path:
DynamicLibraryconfiguration encoded viaSetDynamicLibrary
3.2.2. Invocation handler
Implement IServiceInvocationHandler before submitting tasks. Override HandleResponse (with result_id) and HandleError:
class MyHandler : public ArmoniK::Sdk::Client::IServiceInvocationHandler {
public:
void HandleResponse(const std::string &result_payload,
const std::string &taskId,
const std::string &result_id) override {
// result_payload: the worker's return value (raw bytes)
// result_id: blob ID of the result — pass to BlobDefinition::FromBlobId
// to chain tasks without re-uploading
}
void HandleError(const std::exception &e, const std::string &taskId) override {
// handle task failure
}
};
Warning:
HandleResponseandHandleErrorcan be called concurrently for different tasks. Your implementation must be thread-safe: protect any shared state (counters, result collections, etc.) with a mutex or equivalent.
HandleError is called once per task, after the task is permanently aborted. This happens in two cases:
The worker threw
ArmoniKSdkException— permanent failure, ArmoniK does not retry regardless ofmax_retries.The worker threw any other exception — transient failure, ArmoniK retried up to
max_retriestimes and all attempts failed.
3.2.3. Submitting tasks — convention path (recommended)
Two deployment options exist for the worker library. Choose based on whether the .so is already present on every worker node.
3.2.3.1. Option A — library pre-installed on the worker filesystem
Use this when the .so is copied to shared storage or baked into the worker image. Set library_path to its absolute path on the worker filesystem:
ArmoniK::Sdk::Common::DynamicLibrary lib;
lib.library_path = "/data/libMyWorker.so"; // absolute path on the worker filesystem
lib.symbol = "square"; // method name forwarded to call()
ArmoniK::Sdk::Common::TaskOptions opts("libMyWorker.so", "1.0.0",
"MyNamespace", "MyService", partitionId);
opts.SetDynamicLibrary(lib);
// opts.options contains ConventionVersion, LibraryPath, Symbol
Then submit tasks. The SDK uploads raw input data automatically:
ArmoniK::Sdk::Client::SessionService service(properties, logger);
auto handler = std::make_shared<MyHandler>();
service.Submit(
{ArmoniK::Sdk::Common::TaskDefinition(
"square",
{{"x", ArmoniK::Sdk::Common::BlobDefinition::FromData("7")}})},
handler, opts);
service.WaitResults();
3.2.3.2. Option B — upload the .so as a blob
Use this when the library is not pre-installed on the worker. UploadLibrary reads the file from the client filesystem, uploads it to ArmoniK blob storage, and sets lib.library_blob_id. The worker then downloads and dlopens it at task execution time; library_path is not used by the worker.
ArmoniK::Sdk::Common::DynamicLibrary lib;
lib.symbol = "square";
// Reads from the client filesystem and stores the blob ID in lib.library_blob_id.
service.UploadLibrary("/path/to/libMyWorker.so", lib);
ArmoniK::Sdk::Common::TaskOptions opts("libMyWorker.so", "1.0.0",
"MyNamespace", "MyService", partitionId);
opts.SetDynamicLibrary(lib);
// opts.options contains ConventionVersion, Symbol, LibraryBlobId
// (LibraryPath is absent — not needed when LibraryBlobId is set)
Then submit tasks exactly as in Option A:
service.Submit(
{ArmoniK::Sdk::Common::TaskDefinition(
"square",
{{"x", ArmoniK::Sdk::Common::BlobDefinition::FromData("7")}})},
handler, opts);
service.WaitResults();
To reference a blob that was already uploaded (avoiding a redundant upload):
BlobDefinition::FromBlobId(existing_blob_id)
3.2.4. Submitting tasks — legacy path
TaskPayload is deprecated but still functional. No convention keys are needed in task options.
service.Submit(
{ArmoniK::Sdk::Common::TaskPayload("methodName", serialized_args)},
handler);
Migrate to TaskDefinition when convenient. There is no flag-day deadline but TaskPayload will be removed in a future major release.
3.2.5. Task chaining with result_id
result_id in HandleResponse is the blob ID of the completed task’s output blob. Pass it directly to BlobDefinition::FromBlobId for a downstream task — no re-download and re-upload needed:
class ChainHandler : public ArmoniK::Sdk::Client::IServiceInvocationHandler {
public:
std::string last_result_id;
void HandleResponse(const std::string &, const std::string &,
const std::string &result_id) override {
last_result_id = result_id;
}
void HandleError(const std::exception &e, const std::string &) override { /*...*/ }
};
// Compute 2² and 3² in parallel, then feed their results into an "add" task.
// handler_a and handler_b collect the result blob IDs.
service.Submit({TaskDefinition("square", {{"x", BlobDefinition::FromData("2")}})}, handler_a, opts);
service.Submit({TaskDefinition("square", {{"x", BlobDefinition::FromData("3")}})}, handler_b, opts);
service.WaitResults();
// handler_a->last_result_id and handler_b->last_result_id are now set.
service.Submit(
{TaskDefinition("add",
{{"a", BlobDefinition::FromBlobId(handler_a->last_result_id)},
{"b", BlobDefinition::FromBlobId(handler_b->last_result_id)}})},
handler_c, opts);
service.WaitResults();
// result: 4 + 9 = 13
3.2.6. Waiting for results
WaitResults blocks until the requested tasks complete and calls the handler for each result:
service.WaitResults(); // wait for all submitted tasks
service.WaitResults({"id1", "id2"}); // wait for a specific subset
service.WaitResults({}, Any | BreakOnError); // return on first completion or abort
Parameters:
task_ids— IDs to wait on; empty means all tasks submitted with this service.waitBehavior—All(default) orAny; combine withBreakOnErrorto exit as soon as a result is aborted.waitOptions— polling interval and other timing controls.
3.2.7. Cross-SDK interoperability Samples
A C++ worker built with the convention path can receive tasks from clients written in other SDKs. See the reference in the ArmoniK Samples repository:
The worker library used by both of them is also in the Samples repository, see the worker code in the ChainedArithmetic sample.
3.3. Worker
The ArmoniK DynamicWorker receives tasks and dispatches them to a shared library. It selects the execution path at runtime:
If
ConventionVersionis present inTaskOptions.options→ convention path: the worker resolves the library (from filesystem or blob storage), parses the JSON payload, and callsServiceBase::call(ctx, name, inputs_map).Otherwise → legacy path: the worker locates the library by
application_name/application_versionfilename and callsServiceBase::call(ctx, name, raw_payload).
In both cases the implementation lives in a shared library (ServiceBase subclass + armonik_create_service).