> ## Documentation Index
> Fetch the complete documentation index at: https://motiadev-add-real-system-tutorial-round-2.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Workers

> Deploying and integrating workers into a iii project.

## How workers expand iii

Workers add capability to an iii system. Each one contributes functions and triggers the engine can
route to. This page covers deploying and wiring workers into a project.

Once connected, a worker exposes:

* Functions, callable by `function_id` from anywhere in the system (see
  [Using iii / Functions](../using-iii/functions)).
* Triggers it advertises, which other workers can bind their functions to (see
  [Using iii / Triggers](../using-iii/triggers)).

<Note>
  For the full SDK surface each Worker can use when interacting with iii, see the complete SDK
  reference by language: [Node](../sdk-reference/node-sdk), [Python](../sdk-reference/python-sdk),
  [Rust](../sdk-reference/rust-sdk), or [Browser](../sdk-reference/browser-sdk).
</Note>

## Scaffold a new worker

`iii worker init` creates a new standalone worker from scratch. The command writes a
language-specific project directory with the iii SDK installed, an `iii.worker.yaml` manifest, and
example function and trigger registrations you can replace with your own.

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
# Interactive: prompts for the language
iii worker init my-worker

# Fully scripted: pass --language to skip the prompt
iii worker init my-worker --language typescript
```

Supported languages: `typescript` (`ts`), `javascript` (`js`), `python` (`py`), `rust` (`rs`).

The positional `NAME` is the target directory; pass `--directory` to override it.

Re-running `iii worker init` on a directory that already holds an iii worker (ie. has
`.iii/worker.ini`) will make no changes to the worker.

Worker init will fail by default when targeting a non-empty directory, use `--allow-non-empty` to
scaffold into any other non-empty directory.

<Note>
  To install an existing worker from the registry instead of scaffolding a new one, use `iii worker
      add`. See [Using iii / Workers](../using-iii/workers#finding-workers) for the registry surface.
</Note>

## Connecting to the engine

A worker connects to the engine over WebSocket. The convention is to set the engine URL via the
`III_URL` environment variable, but it can also be passed explicitly to `register_worker`. The
connection string is the only coupling between a worker and the iii instance it joins, so the worker
process can be deployed anywhere reachable on the network.

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import { registerWorker } from "iii-sdk";

    const url = process.env.III_URL;
    if (!url) throw new Error("III_URL must be set");
    const worker = registerWorker(url, {
      workerName: "my-worker",
    });
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import os
    from iii import register_worker, InitOptions

    worker = register_worker(
        os.environ.get("III_URL"),
        InitOptions(worker_name="my-worker"),
    )
    ```
  </Tab>

  <Tab title="Rust">
    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    use iii_sdk::{InitOptions, WorkerMetadata, register_worker};

    let url = std::env::var("III_URL").expect("III_URL must be set");
    let worker = register_worker(
        &url,
        InitOptions {
            metadata: Some(WorkerMetadata {
                name: "my-worker".into(),
                ..Default::default()
            }),
            ..Default::default()
        },
    );
    ```
  </Tab>
</Tabs>

## Worker lifecycle

### States

Workers transition through a small set of states after connecting:
`connecting → connected → available / busy → disconnected`. `connecting` is the WebSocket handshake.
`connected` means the Worker has joined the Engine's registry. `available` and `busy` describe
whether the Worker is currently handling invocations. `disconnected` is the terminal state when the
WebSocket closes. The Engine tracks these transitions and surfaces them to other Workers and tooling
through its discovery functions, so the rest of the system can react.

### Inspecting the live registry

To see what's currently connected to the Engine, invoke one of the `engine::*::list` Functions to
get the current state of the registry. Each returns a list:

| Function                      | What it returns                                                 |
| ----------------------------- | --------------------------------------------------------------- |
| `engine::workers::list`       | Every connected Worker with metrics.                            |
| `engine::functions::list`     | Every registered Function. Filterable by `include_internal`.    |
| `engine::triggers::list`      | Every registered Trigger. Filterable by `include_internal`.     |
| `engine::trigger-types::list` | Every advertised Trigger type with its config and call schemas. |

<Accordion title="Example: list registry contents">
  <Tabs>
    <Tab title="Node / TypeScript">
      ```typescript theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      // engine::workers::list, pass { worker_id: "<uuid>" } to look up one worker
      const { workers } = await worker.trigger({
        function_id: "engine::workers::list",
        payload: {},
      });

      // engine::functions::list
      const { functions } = await worker.trigger({
        function_id: "engine::functions::list",
        payload: { include_internal: false },
      });

      // engine::triggers::list
      const { triggers } = await worker.trigger({
        function_id: "engine::triggers::list",
        payload: { include_internal: false },
      });

      // engine::trigger-types::list
      const { trigger_types } = await worker.trigger({
        function_id: "engine::trigger-types::list",
        payload: { include_internal: false },
      });
      ```
    </Tab>

    <Tab title="Python">
      ```python theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      # engine::workers::list, pass {"worker_id": "<uuid>"} to look up one worker
      workers = worker.trigger({
          "function_id": "engine::workers::list",
          "payload": {},
      })["workers"]

      # engine::functions::list
      functions = worker.trigger({
          "function_id": "engine::functions::list",
          "payload": {"include_internal": False},
      })["functions"]

      # engine::triggers::list
      triggers = worker.trigger({
          "function_id": "engine::triggers::list",
          "payload": {"include_internal": False},
      })["triggers"]

      # engine::trigger-types::list
      trigger_types = worker.trigger({
          "function_id": "engine::trigger-types::list",
          "payload": {"include_internal": False},
      })["trigger_types"]
      ```
    </Tab>

    <Tab title="Rust">
      ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      use iii_sdk::TriggerRequest;
      use serde_json::json;

      // engine::workers::list, pass json!({ "worker_id": "<uuid>" }) to look up one worker
      let workers = worker
          .trigger(TriggerRequest {
              function_id: "engine::workers::list".into(),
              payload: json!({}),
              action: None,
              timeout_ms: None,
          })
          .await?;

      // engine::functions::list
      let functions = worker
          .trigger(TriggerRequest {
              function_id: "engine::functions::list".into(),
              payload: json!({ "include_internal": false }),
              action: None,
              timeout_ms: None,
          })
          .await?;

      // engine::triggers::list
      let triggers = worker
          .trigger(TriggerRequest {
              function_id: "engine::triggers::list".into(),
              payload: json!({ "include_internal": false }),
              action: None,
              timeout_ms: None,
          })
          .await?;

      // engine::trigger-types::list
      let trigger_types = worker
          .trigger(TriggerRequest {
              function_id: "engine::trigger-types::list".into(),
              payload: json!({ "include_internal": false }),
              action: None,
              timeout_ms: None,
          })
          .await?;
      ```
    </Tab>
  </Tabs>
</Accordion>

### Handling Worker disconnects

When a Worker's WebSocket closes, the Engine cleans up after it automatically. Its Functions and
Triggers leave the live registry, and any in-flight invocations of those Functions are cancelled.

#### In flight requests

In flight requests will get a `invocation_stopped` error, catch these errors and treat them like a
cancellation. Retrying will fail until the Worker that owns this function reconnects.

<Accordion title="Example: catch `invocation_stopped`">
  <Tabs>
    <Tab title="Node / TypeScript">
      ```typescript theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      import { IIIInvocationError } from "iii-sdk";

      try {
        const result = await worker.trigger({
          function_id: "math::add",
          payload: { a: 1, b: 2 },
        });
      } catch (err) {
        if (err instanceof IIIInvocationError && err.code === "invocation_stopped") {
          // Worker disconnected mid-invocation. Subscribe to `engine::functions-available`
          // (see "Subscribe to changes" below) to know when to retry.
          return;
        }
        throw err;
      }
      ```
    </Tab>

    <Tab title="Python">
      ```python theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      from iii import IIIInvocationError

      try:
          result = worker.trigger({
              "function_id": "math::add",
              "payload": {"a": 1, "b": 2},
          })
      except IIIInvocationError as err:
          if err.code == "invocation_stopped":
              # Worker disconnected mid-invocation. Subscribe to `engine::functions-available`
              # (see "Subscribe to changes" below) to know when to retry.
              return
          raise
      ```
    </Tab>

    <Tab title="Rust">
      ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      use iii_sdk::{IIIError, TriggerRequest};
      use serde_json::json;

      let result = worker
          .trigger(TriggerRequest {
              function_id: "math::add".into(),
              payload: json!({ "a": 1, "b": 2 }),
              action: None,
              timeout_ms: None,
          })
          .await;

      match result {
          Err(IIIError::Remote { code, .. }) if code == "invocation_stopped" => {
              // Worker disconnected mid-invocation. Subscribe to `engine::functions-available`
              // (see "Subscribe to changes" below) to know when to retry.
          }
          Err(e) => return Err(e.into()),
          Ok(value) => { /* use value */ }
      }
      ```
    </Tab>
  </Tabs>
</Accordion>

#### Subscribe to changes

You can register a Trigger against one of the engine's discovery events to react to topology changes
as they happen. This is particularly useful for continuing work when a Worker comes back online.

| Trigger                       | When it fires                             |
| ----------------------------- | ----------------------------------------- |
| `engine::workers-available`   | A Worker connects or disconnects.         |
| `engine::functions-available` | A Function is registered or unregistered. |

<Accordion title="Example: subscribe to discovery events">
  <Tabs>
    <Tab title="Node / TypeScript">
      ```typescript theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      worker.registerFunction(
        "discovery::on-workers",
        async (data: { event: string; worker_id: string }) => {
          if (data.event === "worker_connected") {
            // A Worker just joined the registry; its Functions are callable now.
          }
        },
      );
      worker.registerTrigger({
        type: "engine::workers-available",
        function_id: "discovery::on-workers",
        config: {},
      });

      worker.registerFunction(
        "discovery::on-functions",
        async (data: { event: string; functions: { function_id: string }[] }) => {
          // `functions` is the full snapshot after the change.
          const ids = data.functions.map((f) => f.function_id);
        },
      );
      worker.registerTrigger({
        type: "engine::functions-available",
        function_id: "discovery::on-functions",
        config: {},
      });
      ```
    </Tab>

    <Tab title="Python">
      ```python theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      async def on_workers(data: dict) -> None:
          if data["event"] == "worker_connected":
              # A Worker just joined the registry; its Functions are callable now.
              pass

      worker.register_function("discovery::on-workers", on_workers)
      worker.register_trigger({
          "type": "engine::workers-available",
          "function_id": "discovery::on-workers",
          "config": {},
      })

      async def on_functions(data: dict) -> None:
          # `functions` is the full snapshot after the change.
          ids = [f["function_id"] for f in data.get("functions", [])]

      worker.register_function("discovery::on-functions", on_functions)
      worker.register_trigger({
          "type": "engine::functions-available",
          "function_id": "discovery::on-functions",
          "config": {},
      })
      ```
    </Tab>

    <Tab title="Rust">
      ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
      use iii_sdk::{RegisterFunction, RegisterTriggerInput};
      use schemars::JsonSchema;
      use serde::Deserialize;
      use serde_json::{Value, json};

      #[derive(Deserialize, JsonSchema)]
      struct WorkersAvailable { event: String, worker_id: String }

      #[derive(Deserialize, JsonSchema)]
      struct FunctionsAvailable { event: String, functions: Vec<Value> }

      worker.register_function(RegisterFunction::new_async(
          "discovery::on-workers",
          |input: WorkersAvailable| async move {
              if input.event == "worker_connected" {
                  // A Worker just joined the registry; its Functions are callable now.
              }
              Ok::<_, String>(())
          },
      ));
      worker.register_trigger(RegisterTriggerInput {
          trigger_type: "engine::workers-available".into(),
          function_id: "discovery::on-workers".into(),
          config: json!({}),
          metadata: None,
      })?;

      worker.register_function(RegisterFunction::new_async(
          "discovery::on-functions",
          |input: FunctionsAvailable| async move {
              // `functions` is the full snapshot after the change.
              let _count = input.functions.len();
              Ok::<_, String>(())
          },
      ));
      worker.register_trigger(RegisterTriggerInput {
          trigger_type: "engine::functions-available".into(),
          function_id: "discovery::on-functions".into(),
          config: json!({}),
          metadata: None,
      })?;
      ```
    </Tab>
  </Tabs>
</Accordion>

## Worker manifest

`iii.worker.yaml` is the manifest at the worker's root that tells iii how to install dependencies,
run the worker, and pass through configuration. This applies to both the iii worker CLI commands
(e.g. [`start`, `stop`, `restart`](../using-iii/workers#starting-and-stopping-workers)) and to workers
that iii starts automatically when they're specified in iii's
[`config.yaml`](../using-iii/engine#configuration-file-structure).

```yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
name: math-worker
runtime:
  kind: python
  package_manager: pip
  entry: math_worker.py
scripts:
  install: "pip install -r requirements.txt"
  start: "python math_worker.py"
```

The manifest is metadata about *starting* the Worker. Once the Worker is running iii treats them all
the same. A Worker started by `iii` via its `config.yaml`, via `iii worker start`, or a manually run
process that uses the iii SDK all behave identically with the Engine.

<Note>
  If a worker isn't starting correctly then make sure to check its manifest and [`iii worker
      logs`](../using-iii/workers#inspecting-a-worker).
</Note>

## Shutting down a worker

Call the SDK's `shutdown` to close the WebSocket cleanly. The engine removes the worker's Functions
and Triggers from the registry, fires `engine::workers-available` with `worker_disconnected`, and
cancels in-flight invocations targeting them with `invocation_stopped`.

Without `shutdown`, an abrupt process exit reaches the same state once the engine notices the
dropped socket; graceful shutdown makes it deterministic and faster.

<Tabs>
  <Tab title="Node / TypeScript">
    ```typescript theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    process.on("SIGTERM", async () => {
      await worker.shutdown();
      process.exit(0);
    });
    ```
  </Tab>

  <Tab title="Python">
    ```python theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    import signal

    def _on_term(*_):
        worker.shutdown()
        raise SystemExit(0)

    signal.signal(signal.SIGTERM, _on_term)
    ```
  </Tab>

  <Tab title="Rust">
    ```rust theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
    // Rust threads do not keep the process alive on their own; await this
    // before `main` returns so the connection thread exits cleanly.
    worker.shutdown_async().await;
    ```
  </Tab>
</Tabs>

<Note>
  Shutdown is very useful for **One-shot / ephemeral workers**. Kubernetes Jobs, serverless
  containers, or scheduled scripts can connect just like any other Worker, do their work, and
  `shutdown()` (`shutdown_async().await` in Rust).
</Note>
