> ## Documentation Index
> Fetch the complete documentation index at: https://motiadev-add-real-system-tutorial-round-2.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Ch. 4: Make it durable

> Move click-row writes onto a queue, then broadcast link events to independent subscribers.

In Chapter 3 every redirect triggers `link::record_click` directly, so the database write runs on
the redirect's hot path and a slow write slows the redirect. In this chapter you move that work onto
a **queue** so redirects return immediately, then use **pub/sub** to broadcast link events to
independent subscribers: a Python analytics worker and a cache refresher, both decoupled from the
`link` worker.

## Add the workers

This chapter uses two workers. `iii-queue` is already in your project from `iii project init`. Add
`iii-pubsub` now; you publish your first event to it later in the chapter:

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
iii worker add iii-pubsub
```

## Make redirects fast with a queue

A queue holds work that is accepted now and run later: each message in the `clicks` queue is one
"insert a row recording that someone followed code X at time T." Define the queue on the `iii-queue`
worker (already in your project from `iii project init`), but requiring a few updates to
`queue_configs`:

```yaml {5-9} config.yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
workers:
  # ...
  - name: iii-queue
    config:
      queue_configs:
        clicks:
          type: standard
          max_retries: 5
          concurrency: 5
      adapter:
        name: builtin
```

You already wrote `link::record_click` in Chapter 3, where `http::redirect` triggers it directly.
Nothing about the function changes. You only change how it's invoked. First import `TriggerAction`:

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
import { registerWorker, Logger, TriggerAction } from "iii-sdk";
```

Then add an `action` to the existing `link::record_click` call in `http::redirect` so the
`iii-queue` worker enqueues it instead of running it inline:

```typescript src/index.ts {5} theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
// Enqueue the click and return right away; the consumer drains it later.
await worker.trigger({
  function_id: "link::record_click",
  payload: { code, clicked_at: new Date().toISOString() },
  action: TriggerAction.Enqueue({ queue: "clicks" }),
});
return { status_code: 302, headers: { Location: url } };
```

The redirect now returns as soon as the click is accepted onto the queue. `link::record_click`
drains the queue in the background, with retries and a dead-letter queue if a write keeps failing.

## Broadcast events with pub/sub

A queue delivers each message to one consumer. When several unrelated parts of the system need to
react to the same event, use a publish subscribe design instead.

<Info>
  We ship both a `iii-queue` and `iii-pubsub` worker. While `iii-queue` provides standard queueing
  it also provides its own durable publish and subscribe.

  When you need a publish and subscribe flow to be guaranteed to succeed (or fail to a DLQ) then use
  `iii-queue`s `iii::durable::publish` and `durable:subscriber`.

  When you don't need a publish and subscribe flow to be guaranteed then use `iii-pubsub`s `publish`
  and `subscribe`.
</Info>

Here we'll implement topics that publish when a link is created, and when a link is updated. We
don't have link updating functionality yet, so we'll add that and an HTTP endpoint for it too.

### Publish on link.created

Publish an event whenever a link is created or its target changes. Inside `link::create`, after the
database write and `state::set`, trigger the built-in `publish` function:

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
// ...inside link::create, after the database write and state::set:
await worker.trigger({
  function_id: "publish",
  payload: { topic: "link.created", data: { code, url } },
});
```

### Publish on link.updated

Add an update path so a link's target can change, and announce it. First, the domain function: it
updates the database row and publishes a `link.updated` event through durable pub/sub
(`iii::durable::publish`, served by `iii-queue`):

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
worker.registerFunction("link::update", async (payload: { code: string; url: string }) => {
  const url = /^https?:\/\//i.test(payload.url) ? payload.url : `https://${payload.url}`;
  await worker.trigger({
    function_id: "database::execute",
    payload: {
      db: DB,
      sql: "UPDATE links SET url = ? WHERE code = ?",
      params: [url, payload.code],
    },
  });
  await worker.trigger({
    function_id: "iii::durable::publish",
    payload: { topic: "link.updated", data: { code: payload.code, url } },
  });
  return { code: payload.code, url };
});
```

### Expose link updating via HTTP

Then the HTTP handler that validates input and calls the domain function:

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
worker.registerFunction("http::update", async (req) => {
  const code = req.path_params.code;
  const url = req.body?.url;
  if (!url) {
    return {
      status_code: 400,
      body: { error: 'missing "url"' },
      headers: { "Content-Type": "application/json" },
    };
  }
  const link = await worker.trigger<{ code: string; url: string }, { code: string; url: string }>({
    function_id: "link::update",
    payload: { code, url },
  });
  return { status_code: 200, body: link, headers: { "Content-Type": "application/json" } };
});
```

And the trigger that binds it to `PUT /links/:code`:

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
worker.registerTrigger({
  type: "http",
  function_id: "http::update",
  config: { api_path: "/links/:code", http_method: "PUT" },
});
```

## Add reactive state: Keep the cache correct without coupling

`link::update` changes the database but not the state cache, so a query could serve stale data.
Rather than handle refreshing the state cache inside `link::update`, subscribe to the `link.updated`
event with a **durable** subscriber:

<Info>
  **Durable vs. regular pub/sub.** `link.updated` uses durable pub/sub: `iii::durable::publish` with
  a `durable:subscriber` trigger, both served by the `iii-queue` worker. Consumers like this cache
  refresher must receive every update. A dropped event would leave the cache pointing at a stale
  URL. `link.created` stays on regular pub/sub (`iii-pubsub`). Its only consumer is a best-effort
  daily counter, so an occasional miss is harmless. Use durable pub/sub when a missed event would
  corrupt state, and regular pub/sub for fire-and-forget fan-out.
</Info>

```typescript src/index.ts theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
worker.registerFunction("link::on_link_updated", async (data: { code: string; url: string }) => {
  await worker.trigger({
    function_id: "state::set",
    payload: { scope: "links", key: data.code, value: { url: data.url } },
  });
});

worker.registerTrigger({
  type: "durable:subscriber",
  function_id: "link::on_link_updated",
  config: { topic: "link.updated" },
});
```

## Create an analytics worker in Python

Queues and events are useful within a single worker but also between workers. Thus far we've been
writing all of our code in TypeScript. However workers are not restricted to specific languages or
runtimes. So this time we'll create an analytics worker in Python to count links.

### Create a new worker

Scaffold a Python worker the same way you scaffolded the `link` worker in Chapter 1. That generates
an `analytics/` worker with a `main.py` example, and a `iii.worker.yaml` manifest.

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
iii worker init analytics --language python
```

### Subscribe to link.created events

Replace the example `main.py` with this one that subscribes to `link.created` events and keeps count
of every time that a new short link is created:

<Accordion title="analytics/main.py">
  ```python analytics/main.py theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
  import os
  from datetime import datetime, timezone

  from iii import register_worker, InitOptions, Logger

  worker = register_worker(
      os.environ.get("III_URL", "ws://localhost:49134"),
      InitOptions(worker_name="analytics"),
  )
  logger = Logger()

  DB = "analytics"

  def ensure_schema() -> None:
      """The analytics worker owns its own table, in its own database."""
      worker.trigger(
          {
              "function_id": "database::execute",
              "payload": {
                  "db": DB,
                  "sql": "CREATE TABLE IF NOT EXISTS daily_link_counts (day TEXT PRIMARY KEY, count INTEGER NOT NULL)",
              },
          }
      )

  def on_link_created(data: dict) -> dict:
      """Runs whenever link publishes `link.created`. Counts links per day."""
      day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
      worker.trigger(
          {
              "function_id": "database::execute",
              "payload": {
                  "db": DB,
                  "sql": "INSERT INTO daily_link_counts (day, count) VALUES (?, 1) "
                  "ON CONFLICT(day) DO UPDATE SET count = count + 1",
                  "params": [day],
              },
          }
      )
      logger.info(f"counted new link {data.get('code')} for {day}")
      return {"ok": True}

  ensure_schema()

  worker.register_function("analytics::on_link_created", on_link_created)
  worker.register_trigger(
      {
          "type": "subscribe",
          "function_id": "analytics::on_link_created",
          "config": {"topic": "link.created"},
      }
  )

  print("Analytics worker started")
  ```
</Accordion>

### Configure the worker's manifest

The generated manifest has no run scripts yet, so give it an install and start script:

```yaml iii.worker.yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
scripts:
  install: "pip install ."
  start: "watchfiles 'python main.py'"
```

Analytics keeps its counts in its own database, so the `link` worker never has to know it exists.
Add an `analytics` database to the `database` worker, alongside the `primary` one from Chapter 3:

```yaml {9-10} config.yaml theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
workers:
  # ...
  - name: database
    config:
      databases:
        primary:
          # ...
          url: sqlite:./data/iii.db
        analytics:
          url: sqlite:./data/analytics.db
```

Finally, add the new analytics worker to your `config.yaml`:

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
iii worker add ./analytics
```

### See it work

Create five links, follow one a few times, and change its target:

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
# Make some new links
for n in $(seq 1 5); do
  curl -s -X POST http://127.0.0.1:3111/links \
    -H 'Content-Type: application/json' -d "{\"url\":\"https://iii.dev/$n\",\"code\":\"link$n\"}"
done
```

The Python worker keeps count of new link creations as expected:

```bash theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
iii trigger database::query db=analytics sql="SELECT day, count FROM daily_link_counts"
```

```json theme={"theme":{"light":"catppuccin-latte","dark":"dark-plus"}}
{ "rows": [{ "day": "2026-05-27", "count": 5 }], "row_count": 1 }
```

## Conclusion

Redirects no longer wait on a database write: click rows ride a queue, drained in the background.
Link events fan out over pub/sub to a Python analytics counter and a cache refresher, and the `link`
worker does not know either of them exists. Next, in
[Ch. 5: Stream live clicks](/tutorials/linkly/streaming), you broadcast every click in real time
from a dedicated `click-streamer` worker.
