--- title: "Callback Hooks" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Callback Hooks} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Setup ```r library(jobqueue) jq <- jobqueue(workers = 1) ``` ## Introduction Callbacks are the cornerstone of asynchronous programming. If you want to calculate `2 + 2` and show the results, the synchronous programming approach would be: ```r message('Result = ', 2 + 2) #> Result = 20 ``` In asynchronous programming, this task is broken apart into two discrete steps: *computation* and *result handling*. Using jobqueue, this looks like: ```r job <- jq$run({ 2 + 2 }) job$on('done', ~message('Result = ', .$result)) #> Result = 20 ``` The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result. Callbacks are not limited to just the `job` finishing. See the "State Triggers" section for a list of all events that can trigger a callback. > **Important** > > Hooks are evaluated on the main process, not the background processes. > Therefore, ensure that the callback functions execute quickly so as to not > delay `job` handling. ## A Callback Function The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed. This is a great place for R's new shorthand function definitions (R >= 4.1.0). Lambda syntax can also be used (see `rlang::as_function()`). ```r # jobqueue hooks hook <- function (jq) { message('jobqueue is ', jq$state) } hook <- \(jq) message('jobqueue is ', jq$state) hook <- ~message('jobqueue is ', .$state) # worker hooks hook <- function (worker) { message('worker is ', worker$state) } hook <- \(w) message('worker is ', w$state) hook <- ~message('worker is ', .$state) # job hooks hook <- function (job) { message('job is ', job$state) } hook <- \(j) message('job is ', j$state) hook <- ~message('job is ', .$state) ``` ## Triggers `jobqueue`, `worker`, and `job` objects update their `$state` as described in the tables below. Each time the state changes, any callbacks registered to that state are executed. In addition, you can register `state = '*'` or `state='.next'` which trigger regardless of the present state name. ### Special Triggers | State | Triggers | |----------------|---------------------------------------------------------| | `'*'` | Every time the state changes. | | `'.next'` | Only one time, the next time the state changes. | ### `jobqueue` States | State | Triggers | |----------------|-----------------------------------------------------------| | `'starting'` | After initialization, before `worker`s are started. | | `'idle'` | When all `workers` are idle. Also, after initial startup. | | `'busy'` | At least one `worker` is busy. | | `'stopped'` | After `$stop()` is called. | ### `worker` States | State | Triggers | |----------------|---------------------------------------------------------| | `'starting'` | Background process is being configured. | | `'idle'` | Waiting on `job`s to be submitted. | | `'busy'` | After a `job` starts running. | | `'stopped'` | After `$stop()` is called. | ### `job` States | State | Triggers | |----------------|---------------------------------------------------------| | `'created'` | After `job` object initialization. | | `'submitted'` | After `$queue` is assigned. | | `'queued'` | After `stop_id` and `copy_id` are resolved. | | `'starting'` | Before evaluation begins. | | `'running'` | After `$worker` is set and evaluation begins. | | `'done'` | After `$output` is assigned. | ## Attaching Callbacks can be attached to `jobqueue`, `worker`, or `job` objects. You can add callbacks either in the constructor, or later with `$on()`. ```r hook <- ~message(.$uid, ' is ', .$state) jq <- jobqueue(hooks = list(q_idle = hook)) w <- worker_class$new(hooks = list(idle = hook)) j <- job_class$new(hooks = list(done = hook)) jq$on('busy', hook) w$on('busy', hook) j$on('starting', hook) ``` In `jobqueue()`, `hooks` can set hooks for the `jobqueue`, `worker`, and `job`. The rules are: * Prefixing with `q_`, `w_`, or `j_` attaches the hook to the `jobqueue`, `worker`, or `job`, respectively. * Non-prefixed hooks are attached to `jobs`. * Alternatively, a list of lists can be assigned to `hooks`, of the format: ```r jobqueue( 'hooks' = list( 'queue' = list(idle = hook), 'worker' = list(idle = hook), 'job' = list(done = hook) )) ``` ## Removing Callbacks attached in the constructor cannot be removed. When you attach a callback with `$on()`, the return value is a function, which, when called, will remove that callback from the object. ```r job <- job_class$new( 'expr' = { 3.14 }, 'hooks' = list(done = ~message('ABC')) ) off <- job$on('done', ~message('XYZ')) off() jq$submit(job) #> ABC ``` ## Default `job` Hooks When you create a `jobqueue`, you can define a set of callbacks to automatically apply to all `job`s created with the `$run()` command. ```r n <- 0 jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:5) jq$run({ 'Hi' }) n #> [1] 5 ``` How does `jobqueue()` know to apply the `hooks` to `job` objects instead of the `jobqueue` or `worker` objects? Unless otherwise indicated, `jobqueue()` `hooks` are assumed to be for `job`s. You can also explicitly specify that `hooks` are for `job`s by using the formats described in the "Attaching Callbacks" section above. ```r jq <- jobqueue(hooks = list(j_created = ~{ n <<- n + 1 } )) # or jq <- jobqueue(hooks = list(job = list(created = ~{ n <<- n + 1 } ))) ``` If you set `hooks` in `$run()`, those hooks will REPLACE the `job` hooks from `jobqueue()`. ```r n <- 0 jq <- jobqueue(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:3) jq$run({ 'Hi' }, hooks = list(done = ~message(.$result))) #> Hi #> Hi #> Hi n #> [1] 0 ``` ## Use Case: Priority Setting Below, we'll set up a callback function that triggers when each `job` enters the `'queued'` state. It will modify the `job`, adding a custom `$priority` field to the `job` object. Then it will modify the `jobqueue`'s internal list of jobs (`$jobs`), sorting them according to each `job`'s `$priority`. Last, it will attach additional callbacks to the `job` to output timing information upon exit from the `'queued`' state and upon entry into the `'done'` state. ```r library(glue) library(jobqueue) # Our callback/hook function. prioritize <- function (job) { queue <- job$jobqueue queue_jobs <- job$jobqueue$jobs # Apply a random priority to this job. job$priority <- round(runif(1) * 10) - 5 # Sort all this jobqueue's jobs by priority (including this job). priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] # Add hooks to this job to report queued/total times. t1 <- Sys.time() tdiff <- function () format(round(Sys.time() - t1, 1)) job$on('.next', ~message(glue( 'job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' ))) job$on('done', ~message(glue( 'job {.$uid} (priority {.$priority}) finished in {tdiff()}' ))) } # A single worker best illustrates processing order. jq <- jobqueue( 'workers' = 1, 'hooks' = list(queued = prioritize) ) for (i in 1:5) { job <- jq$run({ 3.14 }) message(glue_data(job, 'Created job {uid} with priority {priority}')) } #> job J11 (priority -3) was dispatched after 0.1 secs #> Created job J11 with priority -3 #> Created job J12 with priority -2 #> Created job J13 with priority 1 #> Created job J14 with priority 0 #> Created job J15 with priority -1 #> job J11 (priority -3) finished in 0.7 secs #> job J12 (priority -2) was dispatched after 0.6 secs #> job J12 (priority -2) finished in 1.1 secs #> job J15 (priority -1) was dispatched after 0.7 secs #> job J15 (priority -1) finished in 1.3 secs #> job J14 (priority 0) was dispatched after 1.4 secs #> job J14 (priority 0) finished in 2 secs #> job J13 (priority 1) was dispatched after 2.1 secs #> job J13 (priority 1) finished in 2.6 secs ``` In an actual application, you could set `$priority` based on `$vars`. ```r prioritize <- function (job) { # Give priority to lower number of replications job$priority <- job$vars$replications queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } jq <- jobqueue(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps)) ``` Or, define the custom `$priority` field in `$run()`. ```r prioritize <- function (job) { queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } jq <- jobqueue(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvv for (reps in 5:1) job <- jq$run({ 3.14 }, priority = reps) ``` Or, set `$priority` in a hook that triggers before `prioritize()` is triggered. ```r set_priority <- function (job) { job$priority <- job$vars$replications } prioritize <- function (job) { queue_jobs <- job$jobqueue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$jobqueue$jobs <- queue_jobs[order(priorities)] } # vvvvvvvvvvvvvvvvvvvvvv jq <- jobqueue(hooks = list(created = set_priority, queued = prioritize)) for (reps in 5:1) job <- jq$run({ 3.14 }, vars = list(replications = reps)) ``` ## Use Case: Rate Limiting Say you're hosting a web service, where users are allowed to submit one `job` every 30 seconds. `job`s can take more than 30 seconds, so the solution is more complex than setting `stop_id = user_id`. And what if you want to stop the new `job` instead of the old one? ```r rate_limit <- function (job) { job$t_start <- Sys.time() for (j in job$jobqueue$jobs) if (j$user_id == job$user_id) if (job$t_start - j$t_start < 30) job$stop('Rate Limit Exceeded') } jq <- jobqueue(hooks = list(submitted = rate_limit)) j_A1 <- jq$run({ 42 }, user_id = 'A') j_B1 <- jq$run({ 42 }, user_id = 'B') j_B2 <- jq$run({ 42 }, user_id = 'B') j_B1$result #> [1] 42 j_B2$result #> ``` Note that the above code won't completely solve the rate limiting task. If a user's `job` only takes five seconds to complete, then they could submit a `job` every six seconds and the `jobqueue` would be none the wiser. To give the `jobqueue` awareness of previously completed `job`s, you'll need to persistently store per-user `job` start times - like in the below solution. ```r t_user <- list() rate_limit <- function (job) { t_start <- Sys.time() t_diff <- t_user[[job$user_id]] - t_start if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded') } else { t_user[[job$user_id]] <<- t_start } } jq <- jobqueue(hooks = list(created = rate_limit)) ``` In the first example, we attached a hook to `'submitted'` because that's when `$jobqueue` becomes available in callbacks. In the latter example, we attached a hook to `'created'` instead because we didn't need `$jobqueue` for that solution. Check the trigger order listed in the "Callback Triggers" section above, and attach callbacks as early as possible to expedite `job` handling.