Batch операции

Этот раздел перенесён из документации Camunda 7 и в дальнейшем будет доработан с учётом особенностей OpenBPM Engine

Следующие операции могут выполняться асинхронно:

Все batch-операции полагаются на соответствующие методы, которые предоставляют возможность работать со списком сущностей асинхронно. См. общую документацию по Batch, чтобы лучше понять процесс создания.

Асинхронные операции могут быть выполнены на базе списка конкретных экземпляров, а также на результатах запроса, предоставляющего список экземпляров. Если предоставлены как список экземпляров, так и запрос, результирующий набор подвергаемых обработке экземпляров будет объединением двух подмножеств.

Все перечисленные операции, кроме установки времени удаления для исторических batch, зависят от деплоймента. В частности, это означает, что seed-джоба и джобы выполнения получат deploymentId, чтобы зависящие от деплоймента job executors-ы могли извлекать эти джобы из batch для выполнения на их узлах. Deployment id seed-джобы выбирается из списка связанных с ней деплойментов. Этот список получается из результирующего множества подвергаемых обработке экземпляров. Джобы выполнения созержат только элементы из того же самого деплоймента и привязаны к id этого деплоймента.

Отмена запущенных экземпляров процесса

Отмена запущенных экземпляров процесса может производиться асинхронно с использованием вызовов следующих методов Java API:

List<String> processInstanceIds = ...;
runtimeService.deleteProcessInstancesAsync(processInstanceIds,null, REASON);

Существуют перегруженные методы для приведенного выше примера, которые позволяют вам контролировать следующие параметры:

  • skipCustomListeners: Пропустить вызов слушателя выполнения (execution listener) для активностей, который запустились или завершились, как часть этого запроса.

  • skipSubprocesses: Пропустить удаление подпроцесса, относящен=гося к удаленным процессам, как часть этого запроса.

  • skipIoMappings: Пропустить IO-маппинги, если экземпляр процесса их содержит, что в противном случае помешало бы удалению.

Удаление экземпляров исторических процессов

Удаление экземпляров исторических процессов может выполняться асинхронно с использованием следующего вызова метода из Java API:

List<String> historicProcessInstanceIds = ...;
historyService.deleteHistoricProcessInstancesAsync(
        historicProcessInstanceIds, TEST_REASON);

Обновление состояния приостановки экземпляров процессов

Обновление состояния приостановки многочисленных экземпляров процессов может выполняться асинхронно с использованием следующего вызова метода из Java API:

List<String> processInstanceIds = ...;
runtimeService.updateProcessInstanceSuspensionState().byProcessInstanceIds(
  processInstanceIds).suspendAsync();

Установка количества повторных попыток и дат выполнения для джоб с использованием паттерна Builder

Установка количества повторных попыток для джоб может быть выполнена асинхронно с использованием Builder. Существует два способа ссылаться на джобы: по id/запросу или по процессу. Примеры, приведенные ниже, показывают, как использовать оба API:

managementService.setJobRetriesByJobsAsync(retries)
  .jobIds(myJobIdList)
  .jobQuery(myJobQuery)
  .dueDate(myDueDate)
  .executeAsync();
managementService.setJobRetriesByProcessAsync(retries)
  .processInstanceIds(myProcessInstanceIdsList)
  .processInstanceQuery(myProcessInstanceQuery)
  .historicProcessInstanceQuery(myHistoricProcessInstanceQuery)
  .dueDate(myDueDate)
  .executeAsync();

Установка количества попыток для джоб, ассоциированных с экземплярами процесса

Установка количества попыток для джоб, ассоциированных с экземплярами процесса может выполняться асинхронно с использованием следующего вызова метода из Java API:

List<String> processInstanceIds = ...;
int retries = ...;
managementService.setJobRetriesAsync(
        processInstanceIds, null, retries);

Установка количества попыток для внешних задач

Установка количества попыток для внешних задач может выполняться асинхронно с использованием следующего вызова метода из Java API:

List<String> externalTaskIds = ...;
externalTaskService.setRetriesAsync(
        externalTaskIds, TEST_REASON);

Установка переменных для экземпляров процесса

Иногда бывает необходимо добавить или обновить данные в уже запущенном экземпляре процесса. Например, когда пользователь вводит неправильные данные в начале процесса, эти данные необходимо исправить налету.

Эта batch-операция помогает вам установить переменные для корневой области видимости экземпляров процесса асинхронно.

Вы можете либо (1) отфильтровать экземпляры процесса, используя HistoricProcessInstanceQuery или ProcessInstanceQuery, либо (2) напрямую передать набор id экземпляров процесса.

Ниже показано, как вызывать Java API:

List<String> processInstanceIds = ...;
Map<String, Object> variables = Variables.putValue("my-variable", "my-value");
runtimeService.setVariablesAsync(processInstanceIds, variables);

В настоящее время не представляется возможным устанавливать транзиентные переменные через batch-операции. Однако, вы можете устанавливать транзиентные переменные синхронно.

Джобы выполнения в этом batch могут запускаться по расписанию job executor-ом как эксклюзивные джобы. В результате выполнение некоторых из этих batch-jobs может быть задержано другими эксклюзивными джобами, относящимися к тому же экземпляру процесса, для которого должны быть установлены переменные. Однако, эксклюзивное расписание применяется только тогда, когда джобы в данном batch относятся ровно к одному экземпляру процесса. Этим можно управлять через конфигурацию свойства invocationsPerBatchJob property.

Корреляция между сообщениями и экземпляром процесса

Эта batch-операция поможет вам коррелировать сообщения с несколькими экземплярами процессов асинхронно. Более того, вы можете установить переменные также для корневой зоны видимости этих экземпляров процессов.

Вы можете либо (1) отфильтровать экземпляры процесса, используя HistoricProcessInstanceQuery или ProcessInstanceQuery, либо (2) напрямую передать набор id экземпляров процесса.

Ниже показано, как вызывать Java API:

List<String> processInstanceIds = ...;
Map<String, Object> variables = Variables.putValue("my-variable", "my-value");

Batch batch = runtimeService.createMessageCorrelationAsync("myMessage")
  .setVariables(variables)
  .processInstanceIds(processInstanceIds)
  .correlateAllAsync();

Коррелировать на события появления сообщений о запуске, происходящие на уровне определений, через эту batch-операцию невозможно. Однако, вы можете осуществлять корреляцию на стартовые сообщения синхронно.

Джобы выполнения этого batch могут выполняться по расписанию job executor-ом как эксклюзивные джобы. В результате выполнение некоторых из этих batch-jobs может быть задержано другими эксклюзивными джобами, относящимися к тому же экземпляру процесса, для которого должны быть скоррелированы сообщения. Однако, эксклюзивное расписание применяется только тогда, когда джобы в данном batch относятся ровно к одному экземпляру процесса. Этим можно управлять через конфигурацию свойства invocationsPerBatchJob property.

Установить время удаления

Иногда необходимо отложить или даже предотвратить удаление некоторых исторических экземпляров. Время удаления может устанавливаться асинхронно для исторических процессов, решений и batch.

Могут быть выбраны следующие режимы:

  • Absolute: Устанавливает время удаления на произвольно выбранную дату:

    `.absoluteRemovalTime(Date removalTime)`
  • Cleared: Сбрасывает время удаления (передается как null-значение): экземпляры без времени удаления не зачищаются:

    `.clearedRemovalTime()`
  • Calculated: Пересчитывает время удаления на основании настроек движка рабочих процессов (базовое время + время жизни)

    `.calculatedRemovalTime()`

Исторические экземпляры процессов и решений могут быть частью иерархии. Чтобы установить одно и то же время удаления для всех экземпляров внутри иерархии, необходимо вызвать метод .hierarchical().

Установка времени удаления на уже запущенных экземплярах процессов удалит данные из таблиц исторической базы данных (таблиц, чьи имена начинаются с ACT_HI_*), но не из рантайм таблиц базы данных (таблиц, чьи имена начинаются с i.e. tables ACT_RU_*).

Исторические экземпляры процессов

HistoricProcessInstanceQuery query =
  historyService.createHistoricProcessInstanceQuery();

Batch batch = historyService.setRemovalTimeToHistoricProcessInstances()
  .absoluteRemovalTime(new Date()) // sets an absolute removal time
   // .clearedRemovalTime()        // resets the removal time to null
   // .calculatedRemovalTime()     // calculation based on the engine's configuration
  .byQuery(query)
  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")
   // .hierarchical()              // sets a removal time across the hierarchy
  .executeAsync();

Исторические экземпляры решений

HistoricDecisionInstanceQuery query =
  historyService.createHistoricDecisionInstanceQuery();

Batch batch = historyService.setRemovalTimeToHistoricDecisionInstances()
  .absoluteRemovalTime(new Date()) // sets an absolute removal time
   // .clearedRemovalTime()        // resets the removal time to null
   // .calculatedRemovalTime()     // calculation based on the engine's configuration
  .byQuery(query)
  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")
   // .hierarchical()              // sets a removal time across the hierarchy
  .executeAsync();

Флаг .hierarchical() для batch-операции для экземпляров решений устанавливает время удаления только в пределах иерархии решения. Если решение было вызвано задачей бизнес-правила (Business Rule Task), экземпляры вызывающего процесса (включая другие экземпляры процессов, присутствующие в иерархии) не обновляются.

Чтобы обновить все дочерние экземпляры внутри иерархии экземпляра корневого процесса (все экземпляры как процессов, так и решений), используйте batch-операцию для экземпляров процессов с включенным флагом .hierarchical().

Исторические batch

HistoricBatchQuery query = historyService.createHistoricBatchQuery();

Batch batch = historyService.setRemovalTimeToHistoricBatches()
  .absoluteRemovalTime(new Date()) // sets an absolute removal time
   // .clearedRemovalTime()        // resets the removal time to null
   // .calculatedRemovalTime()     // calculation based on the engine's configuration
  .byQuery(query)
  .byIds("693206dd-11e9-b7cb-be5e0f7575b7", "...")
  .executeAsync();

Обновление фрагментами (чанками)

Batch-операции обновляют время удаления всех исторических сущностей, относящихся к корневому элементу, например, к историческому экземпляру процесса. Этии обновления производятся внутри одной транзакции в базе данных. Для большого числа сущностей такие обновления могут занять много времени, транзакции в базе данных упадут по таймауту, а джобы выполнения внутри batch завершатся неудачей. Кроме того, это может заблокировать job executor, если он попытается выполнить эти длинные джобы выполнения, относящиеся к batch.

Для исторических экземпляров процессов вы, таким образом, можете сконфигурировать batch-операцию для обновления соответствующих исторических таблиц баз данных чанками, используя .updateInChunks(). Это ограничивает количество обновленных записей на таблицу до заданного в конфигурации движка управления процессами значения свойства removalTimeUpdateChunkSize. Вы можете переопределить этот предел для отдельных batch-выполнений при помощи опции chunkSize(int chunkSize).

С такой конфигурацией джобы выполнения batch работают каждая в точности с одним историческим экземпляром процесса, фиксируя значение свойства invocationsPerBatchJob в 1. Более того, джобы выполнения batch повторяются до тех пор, пока все относящиеся к ним исторические таблицы не будут обновлены. Таким образом обновления базы данных разбиваются на многочисленные транзакции, что помогает оставаться в пределах таймаута для транзакций.

Флаг .updateInChunks() для batch-операций исторических экземпляров процессов ведет к более сложным запросам на обновление, которые содержат условия для ограничения числа обновляемых записей. Это может понизить производительность некоторых баз данных.

Исопльзуйте эту опцию только для сценариев с большим количеством исторических элементов, относящихся к историческому экземпляру процесса, которые в противном случае будут блокировать job executor на долгое время или вызывать срабатывание таймаута на транзакциях базы данных.

Использование такого режима обновления чанками на уже запущенных экземплярах процесса возможно, но может привести к нестабильным представлениям данных по истории. Мы рекомендуем использовать его только на завершенных или отмененных экземплярах процесса.

Лицензия и атрибуция

Эта документация была создана на базе материала "Camunda 7 Docs" от Camunda, находится под лицензией Creative Commons Attribution-ShareAlike 3.0 Unported License .

Оригинал документации: https://docs.camunda.org