---
title: "Callback Hooks"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Callback Hooks}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---


## Setup

```r
library(jobqueue)
jq <- jobqueue(workers = 1)
```


## Introduction

Callbacks are the cornerstone of asynchronous programming.

If you want to calculate `2 + 2` and show the results, the synchronous
programming approach would be:

```r
message('Result = ', 2 + 2)
#> Result = 20
```

In asynchronous programming, this task is broken apart into two discrete steps:
*computation* and *result handling*. Using jobqueue, this looks like:

```r
job <- jq$run({ 2 + 2 })
job$on('done', ~message('Result = ', .$result))
#> Result = 20
```

The asynchronous format allows parts of your code to run independently, in this
case on separate R processes. When a part finishes, the callback will be
executed to allow you to work with the result.

Callbacks are not limited to just the `job` finishing. See the "State Triggers"
section for a list of all events that can trigger a callback.

> **Important**
> 
> Hooks are evaluated on the main process, not the background processes. 
> Therefore, ensure that the callback functions execute quickly so as to not 
> delay `job` handling.



## A Callback Function

The callback function should accept one argument: the object that triggered the
callback. Functions accepting zero or multiple arguments are also allowed.

This is a great place for R's new shorthand function definitions (R >= 4.1.0).
Lambda syntax can also be used (see `rlang::as_function()`).

```r
# jobqueue hooks
hook <- function (jq) { message('jobqueue is ', jq$state) }
hook <- \(jq) message('jobqueue is ', jq$state)
hook <- ~message('jobqueue is ', .$state)

# worker hooks
hook <- function (worker) { message('worker is ', worker$state) }
hook <- \(w) message('worker is ', w$state)
hook <- ~message('worker is ', .$state)

# job hooks
hook <- function (job) { message('job is ', job$state) }
hook <- \(j) message('job is ', j$state)
hook <- ~message('job is ', .$state)
```


## Triggers

`jobqueue`, `worker`, and `job` objects update their `$state` as described in
the tables below. Each time the state changes, any callbacks registered to that
state are executed. In addition, you can register `state = '*'` or
`state='.next'` which trigger regardless of the present state name.


### Special Triggers

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'*'`          | Every time the state changes.                           |
| `'.next'`      | Only one time, the next time the state changes.         |


### `jobqueue` States

| State          | Triggers                                                  |
|----------------|-----------------------------------------------------------|
| `'starting'`   | After initialization, before `worker`s are started.       |
| `'idle'`       | When all `workers` are idle. Also, after initial startup. |
| `'busy'`       | At least one `worker` is busy.                            |
| `'stopped'`    | After `<jobqueue>$stop()` is called.                      |


### `worker` States

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'starting'`   | Background process is being configured.                 |
| `'idle'`       | Waiting on `job`s to be submitted.                      |
| `'busy'`       | After a `job` starts running.                           |
| `'stopped'`    | After `<worker>$stop()` is called.                      |


### `job` States

| State          | Triggers                                                |
|----------------|---------------------------------------------------------|
| `'created'`    | After `job` object initialization.                      |
| `'submitted'`  | After `<job>$queue` is assigned.                        |
| `'queued'`     | After `stop_id` and `copy_id` are resolved.             |
| `'starting'`   | Before evaluation begins.                               |
| `'running'`    | After `<job>$worker` is set and evaluation begins.      |
| `'done'`       | After `<job>$output` is assigned.                       |



## Attaching

Callbacks can be attached to `jobqueue`, `worker`, or `job` objects.

You can add callbacks either in the constructor, or later with `$on()`.

```r
hook <- ~message(.$uid, ' is ', .$state)

jq <- jobqueue(hooks         = list(q_idle = hook))
w  <- worker_class$new(hooks = list(idle   = hook))
j  <- job_class$new(hooks    = list(done   = hook))

jq$on('busy',    hook)
w$on('busy',     hook)
j$on('starting', hook)
```

In `jobqueue()`, `hooks` can set hooks for the `jobqueue`, `worker`, and `job`.
The rules are:

* Prefixing with `q_`, `w_`, or `j_` attaches the hook to the `jobqueue`, `worker`, or `job`, respectively.
* Non-prefixed hooks are attached to `jobs`.
* Alternatively, a list of lists can be assigned to `hooks`, of the format:
  
```r
jobqueue(
  'hooks' = list(
    'queue'  = list(idle = hook), 
    'worker' = list(idle = hook), 
    'job'    = list(done = hook) ))
```



## Removing

Callbacks attached in the constructor cannot be removed.

When you attach a callback with `$on()`, the return value is a function, which,
when called, will remove that callback from the object.

```r
job <- job_class$new(
  'expr'  = { 3.14 }, 
  'hooks' = list(done = ~message('ABC')) )

off <- job$on('done', ~message('XYZ'))
off()

jq$submit(job)
#> ABC
```


## Default `job` Hooks

When you create a `jobqueue`, you can define a set of callbacks to automatically
apply to all `job`s created with the `<jobqueue>$run()` command.

```r
n  <- 0
jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:5) jq$run({ 'Hi' })

n
#> [1] 5
```

How does `jobqueue()` know to apply the `hooks` to `job` objects instead of the `jobqueue` or `worker` objects? Unless otherwise indicated, `jobqueue()` `hooks` are assumed to be for `job`s. You can also explicitly specify that `hooks` are for `job`s by using the formats described in the "Attaching Callbacks" section above.

```r
jq <- jobqueue(hooks = list(j_created = ~{ n <<- n + 1 } ))
# or
jq <- jobqueue(hooks = list(job = list(created = ~{ n <<- n + 1 } )))
```

If you set `hooks` in `<jobqueue>$run()`, those hooks will REPLACE the `job` hooks from `jobqueue()`.

```r
n  <- 0
jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } ))

for (i in 1:3) jq$run({ 'Hi' }, hooks = list(done = ~message(.$result)))
#> Hi
#> Hi
#> Hi

n
#> [1] 0
```


## Use Case: Priority Setting

Below, we'll set up a callback function that triggers when each `job` enters the `'queued'` state. It will modify the `job`, adding a custom `<job>$priority` field to the `job` object. Then it will modify the `jobqueue`'s internal list of jobs (`<jobqueue>$jobs`), sorting them according to each `job`'s `<job>$priority`. Last, it will attach additional callbacks to the `job` to output timing information upon exit from the `'queued`' state and upon entry into the `'done'` state.

```r
library(glue)
library(jobqueue)

# Our callback/hook function.
prioritize <- function (job) {

  queue      <- job$jobqueue
  queue_jobs <- job$jobqueue$jobs

  # Apply a random priority to this job.
  job$priority <- round(runif(1) * 10) - 5
  
  # Sort all this jobqueue's jobs by priority (including this job).
  priorities        <- sapply(queue_jobs, `[[`, 'priority')
  job$jobqueue$jobs <- queue_jobs[order(priorities)]
  
  # Add hooks to this job to report queued/total times.
  t1    <- Sys.time()
  tdiff <- function () format(round(Sys.time() - t1, 1))
  
  job$on('.next', ~message(glue(
    'job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' )))
    
  job$on('done', ~message(glue(
    'job {.$uid} (priority {.$priority}) finished in {tdiff()}' )))
}

# A single worker best illustrates processing order.
jq <- jobqueue(
  'workers' = 1, 
  'hooks'   = list(queued = prioritize) )

for (i in 1:5) {
  job <- jq$run({ 3.14 })
  message(glue_data(job, 'Created job {uid} with priority {priority}'))
}
#> job J11 (priority -3) was dispatched after 0.1 secs
#> Created job J11 with priority -3
#> Created job J12 with priority -2
#> Created job J13 with priority 1
#> Created job J14 with priority 0
#> Created job J15 with priority -1
#> job J11 (priority -3) finished in 0.7 secs
#> job J12 (priority -2) was dispatched after 0.6 secs
#> job J12 (priority -2) finished in 1.1 secs
#> job J15 (priority -1) was dispatched after 0.7 secs
#> job J15 (priority -1) finished in 1.3 secs
#> job J14 (priority 0) was dispatched after 1.4 secs
#> job J14 (priority 0) finished in 2 secs
#> job J13 (priority 1) was dispatched after 2.1 secs
#> job J13 (priority 1) finished in 2.6 secs
```

In an actual application, you could set `<job>$priority` based on `<job>$vars`.

```r
prioritize <- function (job) {
  
  # Give priority to lower number of replications
  job$priority <- job$vars$replications
  
  queue_jobs        <- job$jobqueue$jobs
  priorities        <- sapply(queue_jobs, `[[`, 'priority')
  job$jobqueue$jobs <- queue_jobs[order(priorities)]
}
jq <- jobqueue(hooks = list(queued = prioritize))

#                                         vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))
```

Or, define the custom `<job>$priority` field in `<jobqueue>$run()`.

```r
prioritize <- function (job) {
  queue_jobs        <- job$jobqueue$jobs
  priorities        <- sapply(queue_jobs, `[[`, 'priority')
  job$jobqueue$jobs <- queue_jobs[order(priorities)]
}
jq <- jobqueue(hooks = list(queued = prioritize))

#                                         vvvvvvvvvvvvvvv
for (reps in 5:1) job <- jq$run({ 3.14 }, priority = reps)
```

Or, set `<job>$priority` in a hook that triggers before `prioritize()` is triggered.

```r
set_priority <- function (job) {
  job$priority <- job$vars$replications
}

prioritize <- function (job) {
  queue_jobs        <- job$jobqueue$jobs
  priorities        <- sapply(queue_jobs, `[[`, 'priority')
  job$jobqueue$jobs <- queue_jobs[order(priorities)]
}

#                           vvvvvvvvvvvvvvvvvvvvvv
jq <- jobqueue(hooks = list(created = set_priority, queued = prioritize))
for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps))
```


## Use Case: Rate Limiting

Say you're hosting a web service, where users are allowed to submit one `job`
every 30 seconds. `job`s can take more than 30 seconds, so the solution is more
complex than setting `stop_id = user_id`. And what if you want to stop the new
`job` instead of the old one?

```r
rate_limit <- function (job) {

  job$t_start <- Sys.time()

  for (j in job$jobqueue$jobs)
    if (j$user_id == job$user_id)
      if (job$t_start - j$t_start < 30)
        job$stop('Rate Limit Exceeded')
}

jq <- jobqueue(hooks = list(submitted = rate_limit))

j_A1 <- jq$run({ 42 }, user_id = 'A')
j_B1 <- jq$run({ 42 }, user_id = 'B')
j_B2 <- jq$run({ 42 }, user_id = 'B')

j_B1$result
#> [1] 42

j_B2$result
#> <interrupt: Rate Limit Exceeded>
```

Note that the above code won't completely solve the rate limiting task. If a
user's `job` only takes five seconds to complete, then they could submit a `job`
every six seconds and the `jobqueue` would be none the wiser.

To give the `jobqueue` awareness of previously completed `job`s, you'll need to
persistently store per-user `job` start times - like in the below solution.

```r
t_user <- list()

rate_limit <- function (job) {
  t_start <- Sys.time()
  t_diff  <- t_user[[job$user_id]] - t_start
  if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded')   }
  else                     { t_user[[job$user_id]] <<- t_start }
}

jq <- jobqueue(hooks = list(created = rate_limit))
```

In the first example, we attached a hook to `'submitted'` because that's when
`<job>$jobqueue` becomes available in callbacks. In the latter example, we
attached a hook to `'created'` instead because we didn't need `<job>$jobqueue`
for that solution. Check the trigger order listed in the "Callback Triggers"
section above, and attach callbacks as early as possible to expedite `job`
handling.
