task.js 19.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
/**
 * Support for concurrent task management and synchronization in web
 * applications.
 *
 * @author Dave Longley
 * @author David I. Lehn <dlehn@digitalbazaar.com>
 *
 * Copyright (c) 2009-2013 Digital Bazaar, Inc.
 */
var forge = require('./forge');
require('./debug');
require('./log');
require('./util');

// logging category
var cat = 'forge.task';

// verbose level
// 0: off, 1: a little, 2: a whole lot
// Verbose debug logging is surrounded by a level check to avoid the
// performance issues with even calling the logging code regardless if it
// is actually logged.  For performance reasons this should not be set to 2
// for production use.
// ex: if(sVL >= 2) forge.log.verbose(....)
var sVL = 0;

// track tasks for debugging
var sTasks = {};
var sNextTaskId = 0;
// debug access
forge.debug.set(cat, 'tasks', sTasks);

// a map of task type to task queue
var sTaskQueues = {};
// debug access
forge.debug.set(cat, 'queues', sTaskQueues);

// name for unnamed tasks
var sNoTaskName = '?';

// maximum number of doNext() recursions before a context swap occurs
// FIXME: might need to tweak this based on the browser
var sMaxRecursions = 30;

// time slice for doing tasks before a context swap occurs
// FIXME: might need to tweak this based on the browser
var sTimeSlice = 20;

/**
 * Task states.
 *
 * READY: ready to start processing
 * RUNNING: task or a subtask is running
 * BLOCKED: task is waiting to acquire N permits to continue
 * SLEEPING: task is sleeping for a period of time
 * DONE: task is done
 * ERROR: task has an error
 */
var READY = 'ready';
var RUNNING = 'running';
var BLOCKED = 'blocked';
var SLEEPING = 'sleeping';
var DONE = 'done';
var ERROR = 'error';

/**
 * Task actions.  Used to control state transitions.
 *
 * STOP: stop processing
 * START: start processing tasks
 * BLOCK: block task from continuing until 1 or more permits are released
 * UNBLOCK: release one or more permits
 * SLEEP: sleep for a period of time
 * WAKEUP: wakeup early from SLEEPING state
 * CANCEL: cancel further tasks
 * FAIL: a failure occured
 */
var STOP = 'stop';
var START = 'start';
var BLOCK = 'block';
var UNBLOCK = 'unblock';
var SLEEP = 'sleep';
var WAKEUP = 'wakeup';
var CANCEL = 'cancel';
var FAIL = 'fail';

/**
 * State transition table.
 *
 * nextState = sStateTable[currentState][action]
 */
var sStateTable = {};

sStateTable[READY] = {};
sStateTable[READY][STOP] = READY;
sStateTable[READY][START] = RUNNING;
sStateTable[READY][CANCEL] = DONE;
sStateTable[READY][FAIL] = ERROR;

sStateTable[RUNNING] = {};
sStateTable[RUNNING][STOP] = READY;
sStateTable[RUNNING][START] = RUNNING;
sStateTable[RUNNING][BLOCK] = BLOCKED;
sStateTable[RUNNING][UNBLOCK] = RUNNING;
sStateTable[RUNNING][SLEEP] = SLEEPING;
sStateTable[RUNNING][WAKEUP] = RUNNING;
sStateTable[RUNNING][CANCEL] = DONE;
sStateTable[RUNNING][FAIL] = ERROR;

sStateTable[BLOCKED] = {};
sStateTable[BLOCKED][STOP] = BLOCKED;
sStateTable[BLOCKED][START] = BLOCKED;
sStateTable[BLOCKED][BLOCK] = BLOCKED;
sStateTable[BLOCKED][UNBLOCK] = BLOCKED;
sStateTable[BLOCKED][SLEEP] = BLOCKED;
sStateTable[BLOCKED][WAKEUP] = BLOCKED;
sStateTable[BLOCKED][CANCEL] = DONE;
sStateTable[BLOCKED][FAIL] = ERROR;

sStateTable[SLEEPING] = {};
sStateTable[SLEEPING][STOP] = SLEEPING;
sStateTable[SLEEPING][START] = SLEEPING;
sStateTable[SLEEPING][BLOCK] = SLEEPING;
sStateTable[SLEEPING][UNBLOCK] = SLEEPING;
sStateTable[SLEEPING][SLEEP] = SLEEPING;
sStateTable[SLEEPING][WAKEUP] = SLEEPING;
sStateTable[SLEEPING][CANCEL] = DONE;
sStateTable[SLEEPING][FAIL] = ERROR;

sStateTable[DONE] = {};
sStateTable[DONE][STOP] = DONE;
sStateTable[DONE][START] = DONE;
sStateTable[DONE][BLOCK] = DONE;
sStateTable[DONE][UNBLOCK] = DONE;
sStateTable[DONE][SLEEP] = DONE;
sStateTable[DONE][WAKEUP] = DONE;
sStateTable[DONE][CANCEL] = DONE;
sStateTable[DONE][FAIL] = ERROR;

sStateTable[ERROR] = {};
sStateTable[ERROR][STOP] = ERROR;
sStateTable[ERROR][START] = ERROR;
sStateTable[ERROR][BLOCK] = ERROR;
sStateTable[ERROR][UNBLOCK] = ERROR;
sStateTable[ERROR][SLEEP] = ERROR;
sStateTable[ERROR][WAKEUP] = ERROR;
sStateTable[ERROR][CANCEL] = ERROR;
sStateTable[ERROR][FAIL] = ERROR;

/**
 * Creates a new task.
 *
 * @param options options for this task
 *   run: the run function for the task (required)
 *   name: the run function for the task (optional)
 *   parent: parent of this task (optional)
 *
 * @return the empty task.
 */
var Task = function(options) {
  // task id
  this.id = -1;

  // task name
  this.name = options.name || sNoTaskName;

  // task has no parent
  this.parent = options.parent || null;

  // save run function
  this.run = options.run;

  // create a queue of subtasks to run
  this.subtasks = [];

  // error flag
  this.error = false;

  // state of the task
  this.state = READY;

  // number of times the task has been blocked (also the number
  // of permits needed to be released to continue running)
  this.blocks = 0;

  // timeout id when sleeping
  this.timeoutId = null;

  // no swap time yet
  this.swapTime = null;

  // no user data
  this.userData = null;

  // initialize task
  // FIXME: deal with overflow
  this.id = sNextTaskId++;
  sTasks[this.id] = this;
  if(sVL >= 1) {
    forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this);
  }
};

/**
 * Logs debug information on this task and the system state.
 */
Task.prototype.debug = function(msg) {
  msg = msg || '';
  forge.log.debug(cat, msg,
    '[%s][%s] task:', this.id, this.name, this,
    'subtasks:', this.subtasks.length,
    'queue:', sTaskQueues);
};

/**
 * Adds a subtask to run after task.doNext() or task.fail() is called.
 *
 * @param name human readable name for this task (optional).
 * @param subrun a function to run that takes the current task as
 *          its first parameter.
 *
 * @return the current task (useful for chaining next() calls).
 */
Task.prototype.next = function(name, subrun) {
  // juggle parameters if it looks like no name is given
  if(typeof(name) === 'function') {
    subrun = name;

    // inherit parent's name
    name = this.name;
  }
  // create subtask, set parent to this task, propagate callbacks
  var subtask = new Task({
    run: subrun,
    name: name,
    parent: this
  });
  // start subtasks running
  subtask.state = RUNNING;
  subtask.type = this.type;
  subtask.successCallback = this.successCallback || null;
  subtask.failureCallback = this.failureCallback || null;

  // queue a new subtask
  this.subtasks.push(subtask);

  return this;
};

/**
 * Adds subtasks to run in parallel after task.doNext() or task.fail()
 * is called.
 *
 * @param name human readable name for this task (optional).
 * @param subrun functions to run that take the current task as
 *          their first parameter.
 *
 * @return the current task (useful for chaining next() calls).
 */
Task.prototype.parallel = function(name, subrun) {
  // juggle parameters if it looks like no name is given
  if(forge.util.isArray(name)) {
    subrun = name;

    // inherit parent's name
    name = this.name;
  }
  // Wrap parallel tasks in a regular task so they are started at the
  // proper time.
  return this.next(name, function(task) {
    // block waiting for subtasks
    var ptask = task;
    ptask.block(subrun.length);

    // we pass the iterator from the loop below as a parameter
    // to a function because it is otherwise included in the
    // closure and changes as the loop changes -- causing i
    // to always be set to its highest value
    var startParallelTask = function(pname, pi) {
      forge.task.start({
        type: pname,
        run: function(task) {
           subrun[pi](task);
        },
        success: function(task) {
           ptask.unblock();
        },
        failure: function(task) {
           ptask.unblock();
        }
      });
    };

    for(var i = 0; i < subrun.length; i++) {
      // Type must be unique so task starts in parallel:
      //    name + private string + task id + sub-task index
      // start tasks in parallel and unblock when the finish
      var pname = name + '__parallel-' + task.id + '-' + i;
      var pi = i;
      startParallelTask(pname, pi);
    }
  });
};

/**
 * Stops a running task.
 */
Task.prototype.stop = function() {
  this.state = sStateTable[this.state][STOP];
};

/**
 * Starts running a task.
 */
Task.prototype.start = function() {
  this.error = false;
  this.state = sStateTable[this.state][START];

  // try to restart
  if(this.state === RUNNING) {
    this.start = new Date();
    this.run(this);
    runNext(this, 0);
  }
};

/**
 * Blocks a task until it one or more permits have been released. The
 * task will not resume until the requested number of permits have
 * been released with call(s) to unblock().
 *
 * @param n number of permits to wait for(default: 1).
 */
Task.prototype.block = function(n) {
  n = typeof(n) === 'undefined' ? 1 : n;
  this.blocks += n;
  if(this.blocks > 0) {
    this.state = sStateTable[this.state][BLOCK];
  }
};

/**
 * Releases a permit to unblock a task. If a task was blocked by
 * requesting N permits via block(), then it will only continue
 * running once enough permits have been released via unblock() calls.
 *
 * If multiple processes need to synchronize with a single task then
 * use a condition variable (see forge.task.createCondition). It is
 * an error to unblock a task more times than it has been blocked.
 *
 * @param n number of permits to release (default: 1).
 *
 * @return the current block count (task is unblocked when count is 0)
 */
Task.prototype.unblock = function(n) {
  n = typeof(n) === 'undefined' ? 1 : n;
  this.blocks -= n;
  if(this.blocks === 0 && this.state !== DONE) {
    this.state = RUNNING;
    runNext(this, 0);
  }
  return this.blocks;
};

/**
 * Sleep for a period of time before resuming tasks.
 *
 * @param n number of milliseconds to sleep (default: 0).
 */
Task.prototype.sleep = function(n) {
  n = typeof(n) === 'undefined' ? 0 : n;
  this.state = sStateTable[this.state][SLEEP];
  var self = this;
  this.timeoutId = setTimeout(function() {
    self.timeoutId = null;
    self.state = RUNNING;
    runNext(self, 0);
  }, n);
};

/**
 * Waits on a condition variable until notified. The next task will
 * not be scheduled until notification. A condition variable can be
 * created with forge.task.createCondition().
 *
 * Once cond.notify() is called, the task will continue.
 *
 * @param cond the condition variable to wait on.
 */
Task.prototype.wait = function(cond) {
  cond.wait(this);
};

/**
 * If sleeping, wakeup and continue running tasks.
 */
Task.prototype.wakeup = function() {
  if(this.state === SLEEPING) {
    cancelTimeout(this.timeoutId);
    this.timeoutId = null;
    this.state = RUNNING;
    runNext(this, 0);
  }
};

/**
 * Cancel all remaining subtasks of this task.
 */
Task.prototype.cancel = function() {
  this.state = sStateTable[this.state][CANCEL];
  // remove permits needed
  this.permitsNeeded = 0;
  // cancel timeouts
  if(this.timeoutId !== null) {
    cancelTimeout(this.timeoutId);
    this.timeoutId = null;
  }
  // remove subtasks
  this.subtasks = [];
};

/**
 * Finishes this task with failure and sets error flag. The entire
 * task will be aborted unless the next task that should execute
 * is passed as a parameter. This allows levels of subtasks to be
 * skipped. For instance, to abort only this tasks's subtasks, then
 * call fail(task.parent). To abort this task's subtasks and its
 * parent's subtasks, call fail(task.parent.parent). To abort
 * all tasks and simply call the task callback, call fail() or
 * fail(null).
 *
 * The task callback (success or failure) will always, eventually, be
 * called.
 *
 * @param next the task to continue at, or null to abort entirely.
 */
Task.prototype.fail = function(next) {
  // set error flag
  this.error = true;

  // finish task
  finish(this, true);

  if(next) {
    // propagate task info
    next.error = this.error;
    next.swapTime = this.swapTime;
    next.userData = this.userData;

    // do next task as specified
    runNext(next, 0);
  } else {
    if(this.parent !== null) {
      // finish root task (ensures it is removed from task queue)
      var parent = this.parent;
      while(parent.parent !== null) {
        // propagate task info
        parent.error = this.error;
        parent.swapTime = this.swapTime;
        parent.userData = this.userData;
        parent = parent.parent;
      }
      finish(parent, true);
    }

    // call failure callback if one exists
    if(this.failureCallback) {
      this.failureCallback(this);
    }
  }
};

/**
 * Asynchronously start a task.
 *
 * @param task the task to start.
 */
var start = function(task) {
  task.error = false;
  task.state = sStateTable[task.state][START];
  setTimeout(function() {
    if(task.state === RUNNING) {
      task.swapTime = +new Date();
      task.run(task);
      runNext(task, 0);
    }
  }, 0);
};

/**
 * Run the next subtask or finish this task.
 *
 * @param task the task to process.
 * @param recurse the recursion count.
 */
var runNext = function(task, recurse) {
  // get time since last context swap (ms), if enough time has passed set
  // swap to true to indicate that doNext was performed asynchronously
  // also, if recurse is too high do asynchronously
  var swap =
    (recurse > sMaxRecursions) ||
    (+new Date() - task.swapTime) > sTimeSlice;

  var doNext = function(recurse) {
    recurse++;
    if(task.state === RUNNING) {
      if(swap) {
        // update swap time
        task.swapTime = +new Date();
      }

      if(task.subtasks.length > 0) {
        // run next subtask
        var subtask = task.subtasks.shift();
        subtask.error = task.error;
        subtask.swapTime = task.swapTime;
        subtask.userData = task.userData;
        subtask.run(subtask);
        if(!subtask.error) {
           runNext(subtask, recurse);
        }
      } else {
        finish(task);

        if(!task.error) {
          // chain back up and run parent
          if(task.parent !== null) {
            // propagate task info
            task.parent.error = task.error;
            task.parent.swapTime = task.swapTime;
            task.parent.userData = task.userData;

            // no subtasks left, call run next subtask on parent
            runNext(task.parent, recurse);
          }
        }
      }
    }
  };

  if(swap) {
    // we're swapping, so run asynchronously
    setTimeout(doNext, 0);
  } else {
    // not swapping, so run synchronously
    doNext(recurse);
  }
};

/**
 * Finishes a task and looks for the next task in the queue to start.
 *
 * @param task the task to finish.
 * @param suppressCallbacks true to suppress callbacks.
 */
var finish = function(task, suppressCallbacks) {
  // subtask is now done
  task.state = DONE;

  delete sTasks[task.id];
  if(sVL >= 1) {
    forge.log.verbose(cat, '[%s][%s] finish',
      task.id, task.name, task);
  }

  // only do queue processing for root tasks
  if(task.parent === null) {
    // report error if queue is missing
    if(!(task.type in sTaskQueues)) {
      forge.log.error(cat,
        '[%s][%s] task queue missing [%s]',
        task.id, task.name, task.type);
    } else if(sTaskQueues[task.type].length === 0) {
      // report error if queue is empty
      forge.log.error(cat,
        '[%s][%s] task queue empty [%s]',
        task.id, task.name, task.type);
    } else if(sTaskQueues[task.type][0] !== task) {
      // report error if this task isn't the first in the queue
      forge.log.error(cat,
        '[%s][%s] task not first in queue [%s]',
        task.id, task.name, task.type);
    } else {
      // remove ourselves from the queue
      sTaskQueues[task.type].shift();
      // clean up queue if it is empty
      if(sTaskQueues[task.type].length === 0) {
        if(sVL >= 1) {
          forge.log.verbose(cat, '[%s][%s] delete queue [%s]',
            task.id, task.name, task.type);
        }
        /* Note: Only a task can delete a queue of its own type. This
         is used as a way to synchronize tasks. If a queue for a certain
         task type exists, then a task of that type is running.
         */
        delete sTaskQueues[task.type];
      } else {
        // dequeue the next task and start it
        if(sVL >= 1) {
          forge.log.verbose(cat,
            '[%s][%s] queue start next [%s] remain:%s',
            task.id, task.name, task.type,
            sTaskQueues[task.type].length);
        }
        sTaskQueues[task.type][0].start();
      }
    }

    if(!suppressCallbacks) {
      // call final callback if one exists
      if(task.error && task.failureCallback) {
        task.failureCallback(task);
      } else if(!task.error && task.successCallback) {
        task.successCallback(task);
      }
    }
  }
};

/* Tasks API */
module.exports = forge.task = forge.task || {};

/**
 * Starts a new task that will run the passed function asynchronously.
 *
 * In order to finish the task, either task.doNext() or task.fail()
 * *must* be called.
 *
 * The task must have a type (a string identifier) that can be used to
 * synchronize it with other tasks of the same type. That type can also
 * be used to cancel tasks that haven't started yet.
 *
 * To start a task, the following object must be provided as a parameter
 * (each function takes a task object as its first parameter):
 *
 * {
 *   type: the type of task.
 *   run: the function to run to execute the task.
 *   success: a callback to call when the task succeeds (optional).
 *   failure: a callback to call when the task fails (optional).
 * }
 *
 * @param options the object as described above.
 */
forge.task.start = function(options) {
  // create a new task
  var task = new Task({
    run: options.run,
    name: options.name || sNoTaskName
  });
  task.type = options.type;
  task.successCallback = options.success || null;
  task.failureCallback = options.failure || null;

  // append the task onto the appropriate queue
  if(!(task.type in sTaskQueues)) {
    if(sVL >= 1) {
      forge.log.verbose(cat, '[%s][%s] create queue [%s]',
        task.id, task.name, task.type);
    }
    // create the queue with the new task
    sTaskQueues[task.type] = [task];
    start(task);
  } else {
    // push the task onto the queue, it will be run after a task
    // with the same type completes
    sTaskQueues[options.type].push(task);
  }
};

/**
 * Cancels all tasks of the given type that haven't started yet.
 *
 * @param type the type of task to cancel.
 */
forge.task.cancel = function(type) {
  // find the task queue
  if(type in sTaskQueues) {
    // empty all but the current task from the queue
    sTaskQueues[type] = [sTaskQueues[type][0]];
  }
};

/**
 * Creates a condition variable to synchronize tasks. To make a task wait
 * on the condition variable, call task.wait(condition). To notify all
 * tasks that are waiting, call condition.notify().
 *
 * @return the condition variable.
 */
forge.task.createCondition = function() {
  var cond = {
    // all tasks that are blocked
    tasks: {}
  };

  /**
   * Causes the given task to block until notify is called. If the task
   * is already waiting on this condition then this is a no-op.
   *
   * @param task the task to cause to wait.
   */
  cond.wait = function(task) {
    // only block once
    if(!(task.id in cond.tasks)) {
       task.block();
       cond.tasks[task.id] = task;
    }
  };

  /**
   * Notifies all waiting tasks to wake up.
   */
  cond.notify = function() {
    // since unblock() will run the next task from here, make sure to
    // clear the condition's blocked task list before unblocking
    var tmp = cond.tasks;
    cond.tasks = {};
    for(var id in tmp) {
      tmp[id].unblock();
    }
  };

  return cond;
};