Транзакции в процессах

Этот раздел перенесён из документации Camunda 7 и в дальнейшем будет доработан с учётом особенностей OpenBPM Engine

Process Engine — это пассивный Java-код, который выполняется в потоке (Thread) клиента. Например, если у вас есть веб‑приложение, позволяющее пользователям запускать новый экземпляр процесса, и пользователь нажимает соответствующую кнопку, то некоторый поток из http-thread-pool application server вызовет API‑метод runtimeService.startProcessInstanceByKey(…​), тем самым входя в process engine и запуская новый экземпляр процесса. Мы называем это «заимствованием потока клиента» (borrowing the client thread).

При любом таком внешнем триггере (то есть при старте процесса, завершении задачи, сигнале execution) runtime движка будет продвигаться по процессу до тех пор, пока не достигнет состояний ожидания (wait states) на каждом активном пути выполнения. Wait state — это задача, которая выполняется позже, то есть движок сохраняет текущее execution в базу данных и ждёт, пока его снова не триггернут. Например, в случае user task внешний триггер «завершение задачи» заставляет runtime выполнить следующую часть процесса до тех пор, пока снова не будут достигнуты wait states (или пока экземпляр не завершится). В отличие от user task, timer event не триггерится извне. Вместо этого он продолжается внутренним триггером. Именно поэтому движку также нужен активный компонент — job executor, — который умеет забирать зарегистрированные jobs и обрабатывать их асинхронно.

Состояния ожидания (Wait States)

Мы говорили о wait states как о границах транзакций, где состояние процесса сохраняется в базе данных, поток возвращается клиенту, а транзакция фиксируется (commit). Следующие BPMN‑элементы всегда являются wait states:

receive task element

user task element

message event element

timer event element

signal event

event based gateway

service task

Помните, что асинхронные продолжения (Asynchronous Continuations) также могут добавлять границы транзакций и к другим задачам.

Границы транзакций (Transaction Boundaries)

Переход от одного такого стабильного состояния к другому стабильному состоянию всегда выполняется в рамках одной транзакции — то есть либо целиком завершается успешно, либо откатывается (rollback) при любом исключении, возникшем во время выполнения. Это проиллюстрировано следующим примером:

Transaction Boundaries

Мы видим фрагмент BPMN‑процесса с user task, service task и timer event. Timer event обозначает следующее wait state. Следовательно, завершение user task и проверка адреса являются частью одной единицы работы (unit of work), то есть должны завершиться успешно или завершиться неуспешно атомарно. Это означает, что если service task выбрасывает исключение, мы хотим откатить текущую транзакцию, чтобы выполнение вернулось к user task, и user task по‑прежнему присутствовала в базе данных. Это также является поведением process engine по умолчанию.

В 1 поток приложения или клиентский поток завершает задачу. В этом же потоке runtime движка теперь выполняет service task и продвигается до тех пор, пока не достигнет wait state на timer event (2). Затем управление возвращается вызывающей стороне (3), потенциально фиксируя транзакцию (если она была начата движком).

Асинхронные продолжения (Asynchronous Continuations)

Зачем нужны асинхронные продолжения? (Why Asynchronous Continuations?)

В некоторых случаях синхронное поведение нежелательно. Иногда полезно иметь пользовательский контроль над границами транзакций в процессе. Самая распространённая мотивация — необходимость выделять логические единицы работы (logical units of work). Рассмотрим следующий фрагмент процесса:

Asynchronous Continuations

Мы завершаем user task, генерируем счёт (invoice) и затем отправляем этот счёт клиенту. Можно утверждать, что генерация счёта не является частью той же единицы работы: мы не хотим откатывать завершение user task, если генерация счёта завершилась ошибкой. В идеале process engine должен завершить user task (1), зафиксировать транзакцию (commit) и вернуть управление вызывающему приложению (2). В фоновом потоке (3) он бы сгенерировал счёт. Именно такое поведение обеспечивают асинхронные продолжения: они позволяют задавать границы транзакций в процессе.

Настройка асинхронных продолжений (Configure Asynchronous Continuations)

Асинхронные продолжения можно настраивать до и после активности. Кроме того, сам экземпляр процесса может быть настроен на асинхронный старт.

Асинхронное продолжение до активности включается с помощью extension‑атрибута camunda:asyncBefore:

<serviceTask id="service1" name="Generate Invoice" camunda:asyncBefore="true" camunda:class="my.custom.Delegate" />

Асинхронное продолжение после активности включается с помощью extension‑атрибута camunda:asyncAfter:

<serviceTask id="service1" name="Generate Invoice" camunda:asyncAfter="true" camunda:class="my.custom.Delegate" />

Асинхронная инстанциация (создание) экземпляра процесса включается с помощью extension‑атрибута camunda:asyncBefore на start event уровня процесса. При инстанциации экземпляр процесса будет создан и сохранён (persisted) в базе данных, однако выполнение будет отложено. Также execution listeners не будут вызываться синхронно. Это может быть полезно в различных ситуациях, таких как heterogeneous clusters, когда класс execution listener недоступен на узле, который инстанциирует процесс.

<startEvent id="theStart" name="Invoice Received" camunda:asyncBefore="true" />

Асинхронные продолжения для multi-instance активностей (Asynchronous Continuations of Multi-Instance Activities)

multi-instance activity может быть настроена на асинхронное продолжение так же, как и другие активности. Объявление асинхронного продолжения для multi-instance активности делает асинхронным тело multi-instance, то есть процесс продолжится асинхронно до того, как будут созданы экземпляры этой активности, или после того, как завершатся все экземпляры.

Кроме того, внутренняя активность также может быть настроена на асинхронное продолжение с помощью extension‑атрибутов camunda:asyncBefore и camunda:asyncAfter на элементе multiInstanceLoopCharacteristics:

<serviceTask id="service1" name="Generate Invoice" camunda:class="my.custom.Delegate">
	<multiInstanceLoopCharacteristics isSequential="false" camunda:asyncBefore="true">
 		<loopCardinality>5</loopCardinality>
	</multiInstanceLoopCharacteristics>
</serviceTask>

Объявление асинхронного продолжения для внутренней активности делает каждый экземпляр multi-instance активности асинхронным. В приведённом выше примере будут созданы все экземпляры параллельной multi-instance активности, но их выполнение будет отложено. Это может быть полезно, чтобы более точно контролировать границы транзакций multi-instance активности или включить истинный параллелизм в случае параллельной multi-instance активности.

Понимание работы асинхронных продолжений (Understand Asynchronous Continuations)

Чтобы понять, как работают асинхронные продолжения, сначала нужно понять, как выполняется активность:

Asynchronous Continuations

Иллюстрация выше показывает, как выполняется обычная активность, в которую входят и из которой выходят по sequence flow:

  1. Вызываются listeners "TAKE" на sequence flow, входящем в активность.

  2. Вызываются listeners "START" на самой активности.

  3. Выполняется поведение активности: конкретное поведение зависит от типа активности: в случае Service Task поведение состоит в вызове Delegation Code, в случае User Task поведение состоит в создании экземпляра Task в task list и т. п.

  4. Вызываются listeners "END" на активности.

  5. Вызываются listeners "TAKE" исходящего sequence flow.

Асинхронные продолжения позволяют вставлять точки разрыва (break points) между выполнением sequence flow и выполнением активности:

process engine async

Иллюстрация выше показывает, где разные типы асинхронных продолжений разрывают поток выполнения:

  • Асинхронное продолжение BEFORE активности разрывает поток выполнения между вызовом TAKE‑listeners входящего sequence flow и выполнением START‑listeners активности.

  • Асинхронное продолжение AFTER активности разрывает поток выполнения между вызовом END‑listeners активности и TAKE‑listeners исходящего sequence flow.

Асинхронные продолжения напрямую связаны с границами транзакций: установка асинхронного продолжения до или после активности создаёт границу транзакции до или после активности:

process engine async transactions

Более того, асинхронные продолжения всегда выполняются компонентом Job Executor.

Откат при исключении (Rollback on Exception)

Хотим подчеркнуть: в случае необработанного исключения текущая транзакция откатывается (rollback), и экземпляр процесса оказывается в последнем wait state (save point). Следующее изображение это визуализирует.

Rollback

Если исключение происходит при вызове startProcessInstanceByKey, экземпляр процесса вообще не будет сохранён в базе данных.

Обоснование такого дизайна (Reasoning for This Design)

Описанное выше решение обычно приводит к обсуждениям, поскольку люди ожидают, что process engine остановится на задаче, вызвавшей исключение. Также другие BPM suites часто реализуют каждую задачу как wait state. Однако у этого подхода есть несколько преимуществ:

  • В тестах вы точно знаете состояние движка после вызова метода, что упрощает проверки (assertions) состояния процесса или результатов сервисных вызовов.

  • В production‑коде верно то же самое; это позволяет использовать синхронную логику, если это требуется, например, когда вы хотите обеспечить синхронный пользовательский опыт на фронтенде.

  • Выполнение — это обычные вычисления на Java, что очень эффективно с точки зрения производительности.

  • Вы всегда можете переключиться на asyncBefore/asyncAfter=true, если требуется другое поведение.

Однако есть и последствия, о которых стоит помнить:

  • В случае исключений состояние откатывается к последнему сохранённому (persistent) wait state экземпляра процесса. Это может даже означать, что экземпляр процесса никогда не будет создан. Нельзя просто и однозначно «привязать» исключение к узлу процесса, который его вызвал. Исключение должен обработать клиент.

  • Параллельные пути процесса не выполняются параллельно в терминах Java Threads: разные пути выполняются последовательно, так как у нас есть и используется только один Thread.

  • Таймеры не могут срабатывать до того, как транзакция будет зафиксирована в базе данных. Таймеры более подробно объясняются далее, но они триггерятся единственной активной частью Process Engine, где используются собственные Threads: Job Executor. Следовательно, они выполняются в отдельном потоке, который получает «созревшие» (due) таймеры из базы данных. Однако в базе данных таймеры не видны до тех пор, пока не станет видна текущая транзакция. Поэтому следующий таймер никогда не сработает:

Not Working Timeout

Интеграция транзакций (Transaction Integration)

Process engine может либо управлять транзакциями самостоятельно («Standalone» управление транзакциями), либо интегрироваться с платформенным transaction manager.

Standalone управление транзакциями (Standalone Transaction Management)

Если process engine настроен на standalone управление транзакциями, он всегда открывает новую транзакцию для каждой выполняемой команды. Чтобы настроить process engine на standalone управление транзакциями, используйте io.openbpm.bpm.engine.impl.cfg.StandaloneProcessEngineConfiguration:

ProcessEngineConfiguration.createStandaloneProcessEngineConfiguration()
  ...
  .buildProcessEngine();

Сценарии для standalone управления транзакциями — это ситуации, когда process engine не требуется интегрировать с другими транзакционными ресурсами, такими как дополнительные datasources или messaging systems.

В дистрибутиве Tomcat process engine настроен на standalone управление транзакциями.

Интеграция с transaction manager (Transaction Manager Integration)

Process engine можно настроить на интеграцию с transaction manager (или системами управления транзакциями). Из коробки process engine поддерживает интеграцию со Spring и JTA transaction management. Дополнительную информацию можно найти в следующих главах:

Сценарии для интеграции с transaction manager — это ситуации, когда process engine нужно интегрировать с:

  • транзакционно‑ориентированными моделями программирования, такими как Java EE или Spring (подумайте о transaction‑scoped JPA entity managers в Java EE),

  • другими транзакционными ресурсами, такими как вторичные datasources, messaging systems или прочий транзакционный middleware, например стек web services.

При настройке transaction manager убедитесь, что он действительно управляет data source, который вы настроили для process engine. Если это не так, data source работает в режиме auto-commit. Это может привести к несогласованностям в базе данных, потому что commits и rollbacks транзакций больше не выполняются.

Транзакции и контекст process engine (Transactions and the Process Engine Context)

Когда выполняется команда Process Engine (Process Engine Command), движок создаёт Process Engine Context. Context кеширует сущности базы данных, так что многократные операции с одной и той же сущностью не приводят к многократным запросам к базе данных. Это также означает, что изменения этих сущностей накапливаются и сбрасываются (flush) в базу данных, как только команда возвращает управление. Однако следует отметить, что текущая транзакция может быть зафиксирована (committed) позже.

Если одна команда Process Engine вложена в другую (то есть команда выполняется внутри другой команды), поведение по умолчанию — переиспользовать существующий Process Engine Context. Это означает, что вложенная команда будет иметь доступ к тем же кешированным сущностям и к изменениям, внесённым в них.

Когда вложенную команду нужно выполнить в новой транзакции, для её выполнения нужно создать новый Process Engine Context. В этом случае вложенная команда будет использовать новый кеш для сущностей базы данных, независимый от кеша предыдущей (внешней) команды. Это означает, что изменения в кеше одной команды невидимы для другой команды, и наоборот. Когда вложенная команда возвращает управление, изменения сбрасываются в базу данных независимо от Process Engine Context внешней команды.

Вспомогательный класс ProcessEngineContext можно использовать, чтобы объявить Process Engine, что необходимо создать новый Process Engine Context для того, чтобы операции с базой данных во вложенной команде Process Engine были отделены и выполнялись в новой транзакции. Следующий пример кода на Java показывает, как можно использовать этот класс:

try {

  // declare new Process Engine Context
  ProcessEngineContext.requiresNew();

  // call engine APIs
  execution.getProcessEngineServices()
    .getRuntimeService()
    .startProcessInstanceByKey("EXAMPLE_PROCESS");

} finally {
  // clear declaration for new Process Engine Context
  ProcessEngineContext.clear();
}

Оптимистическая блокировка (Optimistic Locking)

OpenBPM Engine может использоваться в многопоточных приложениях. В таких условиях, когда несколько потоков взаимодействуют с process engine одновременно, может случиться так, что эти потоки попытаются изменить одни и те же данные. Например: два потока пытаются завершить одну и ту же User Task в одно и то же время (конкурентно). Такая ситуация является конфликтом: задача может быть завершена только один раз.

OpenBPM Engine использует хорошо известную технику, называемую «Optimistic Locking» (или Optimistic Concurrency Control), чтобы обнаруживать и разрешать такие ситуации.

Этот раздел состоит из двух частей: первая часть вводит Optimistic Locking как концепцию. Вы можете пропустить её, если уже знакомы с Optimistic Locking. Вторая часть объясняет использование Optimistic Locking в OpenBPM Engine.

Что такое Optimistic Locking? (What is Optimistic Locking?)

Optimistic Locking (также Optimistic Concurrency Control) — это метод управления конкурентным доступом (concurrency control), который используется в системах, основанных на транзакциях. Optimistic Locking наиболее эффективен в ситуациях, когда данные читаются чаще, чем изменяются. Многие потоки могут читать одни и те же объекты данных одновременно, не исключая друг друга. Согласованность (consistency) затем обеспечивается обнаружением конфликтов и предотвращением обновлений в ситуациях, когда несколько потоков пытаются изменять одни и те же объекты данных конкурентно. Если такой конфликт обнаружен, обеспечивается, что только одно обновление завершается успешно, а все остальные завершаются неуспешно.

Пример (Example)

Предположим, у нас есть таблица базы данных со следующей записью:

Id Version Name Address …​

8

1

Steve

3, Workflow Boulevard, Token Town

…​

…​

…​

…​

…​

…​

Таблица выше показывает одну строку, содержащую данные пользователя. У пользователя есть уникальный Id (primary key), версия (version), имя (name) и текущий адрес.

Теперь построим ситуацию, в которой 2 транзакции пытаются обновить эту запись: одна пытается изменить адрес, другая — удалить пользователя. Ожидаемое поведение: одна из транзакций успешно выполняется, а другая прерывается с ошибкой, указывающей, что обнаружен конфликт конкурентного доступа. Затем пользователь может решить, повторять ли транзакцию, основываясь на последнем состоянии данных:

Transactions with Optimistic Locking

Как видно на изображении выше, Transaction 1 читает данные пользователя, делает что‑то с данными, удаляет пользователя и затем фиксирует транзакцию (commit). Transaction 2 стартует в то же время, читает те же данные пользователя и также работает с ними. Когда Transaction 2 пытается обновить адрес пользователя, обнаруживается конфликт (так как Transaction 1 уже удалил пользователя).

Конфликт обнаруживается, потому что текущее состояние данных пользователя читается в момент, когда Transaction 2 выполняет обновление. В этот момент конкурентная Transaction 1 уже пометила строку на удаление. Теперь база данных ждёт завершения Transaction 1. После её завершения Transaction 2 может продолжить. В этот момент строки больше не существует, и обновление выполняется, но сообщает, что изменило 0 строк. Приложение может отреагировать на это и откатить Transaction 2, чтобы предотвратить применение других изменений, сделанных этой транзакцией.

Далее приложение (или пользователь, который его использует) может решить, следует ли повторять Transaction 2. В нашем примере транзакция тогда не найдёт данных пользователя и сообщит, что пользователь был удалён.

Optimistic Locking vs. Pessimistic Locking

Pessimistic Locking работает с read locks. Read lock блокирует объект данных при чтении, предотвращая чтение этого объекта другими конкурентными транзакциями. Таким образом конфликты предотвращаются заранее.

В примере выше Transaction 1 заблокировал бы данные пользователя в момент чтения. При попытке прочитать те же данные Transaction 2 был бы заблокирован и не смог бы продвигаться. Как только Transaction 1 завершится, Transaction 2 сможет продолжить и прочитает актуальное состояние. Таким образом конфликты предотвращаются, поскольку транзакции всегда эксклюзивно работают с последним состоянием данных.

Pessimistic Locking эффективен в ситуациях, когда записи (writes) происходят так же часто, как чтения (reads), и при высокой конкуренции (contention).

Однако, поскольку pessimistic locks являются эксклюзивными, параллелизм снижается, ухудшая производительность. Поэтому Optimistic Locking, который обнаруживает конфликты вместо того, чтобы предотвращать их, предпочтительнее в условиях высокого уровня конкурентности и когда чтения происходят чаще, чем записи. Кроме того, Pessimistic Locking может быстро приводить к deadlocks.

Дополнительное чтение (Further Reading)

Optimistic Locking в OpenBPM Engine (Optimistic Locking in OpenBPM Engine)

OpenBPM Engine использует Optimistic Locking для контроля конкурентного доступа. Если обнаружен конфликт, выбрасывается исключение и транзакция откатывается. Конфликты обнаруживаются при выполнении операторов UPDATE или DELETE. Выполнение операторов delete или update возвращает количество затронутых строк (affected rows count). Если это число равно нулю, это указывает, что строка ранее была обновлена или удалена. В таких случаях конфликт считается обнаруженным и выбрасывается OptimisticLockingException.

Исключение OptimisticLockingException (The OptimisticLockingException)

OptimisticLockingException может быть выброшено методами API. Рассмотрим следующий вызов метода completeTask(…​):

taskService.completeTask(aTaskId); // may throw OptimisticLockingException

Указанный выше метод может выбросить OptimisticLockingException, если выполнение вызова метода приводит к конкурентной модификации данных.

Выполнение job также может привести к выбрасыванию OptimisticLockingException. Поскольку это ожидаемое поведение, выполнение будет повторено (retried).

Обработка исключений Optimistic Locking (Handling Optimistic Locking exceptions)

Если текущая команда (Command) запускается Job Executor’ом, OptimisticLockingException обрабатываются автоматически с использованием повторов (retries). Так как это исключение ожидаемо, оно не уменьшает счётчик повторов.

Если текущая команда запускается внешним вызовом API, OpenBPM Engine откатывает текущую транзакцию к последней точке сохранения (save point, wait state). Теперь пользователь должен решить, как следует обработать исключение — нужно ли повторять транзакцию или нет. Также учтите, что даже если транзакция была откатана, у неё могли быть нетранзакционные побочные эффекты, которые не были откатаны.

Чтобы контролировать область транзакций, можно добавлять явные точки сохранения (save points) до и после активностей с помощью асинхронных продолжений (Asynchronous Continuations).

Типичные места, где выбрасываются Optimistic Locking exceptions (Common Places Where Optimistic Locking Exceptions Are Thrown)

Есть несколько типичных мест, где может быть выброшено OptimisticLockingException. Например:

  • Конкурирующие внешние запросы: конкурентное завершение одной и той же задачи дважды.

  • Точки синхронизации внутри процесса: например, parallel gateway, multi instance и т. п.

Следующая модель показывает parallel gateway, на котором может возникнуть OptimisticLockingException.

Optimistic Locking in parallel gateway

После открывающего parallel gateway есть две user task. Закрывающий parallel gateway после user task объединяет executions в одну. В большинстве случаев одна из user task будет завершена первой. Затем выполнение будет ожидать на закрывающем parallel gateway, пока не будет завершена вторая user task.

Однако возможно и конкурентное завершение обеих user task. Допустим, user task сверху завершена. Транзакция предполагает, что она первая на закрывающем parallel gateway. Нижняя user task завершается конкурентно, и транзакция также предполагает, что она первая на закрывающем parallel gateway. Обе транзакции пытаются обновить строку, которая указывает, что они первые на закрывающем parallel gateway. В таких случаях выбрасывается OptimisticLockingException. Одна из транзакций откатывается, а другая успешно обновляет строку.

Optimistic Locking и нетранзакционные побочные эффекты (Optimistic Locking and Non-Transactional Side Effects)

После возникновения OptimisticLockingException транзакция откатывается. Любая транзакционная работа будет отменена. Нетранзакционная работа, такая как создание файлов или эффекты вызова нетранзакционных web services, отменена не будет. Это может привести к несогласованному состоянию.

Существует несколько решений этой проблемы; самое распространённое — eventual consolidation с использованием повторов (retries).

Внутренние детали реализации (Internal Implementation Details)

Большинство таблиц базы данных OpenBPM Engine содержат колонку REV_. Эта колонка представляет версию ревизии (revision). При чтении строки данные читаются на конкретной «ревизии». Модификации (UPDATE и DELETE) всегда пытаются обновить ревизию, которая была прочитана текущей командой. Обновления инкрементируют ревизию. После выполнения оператора модификации проверяется количество затронутых строк. Если это число равно 1, делается вывод, что прочитанная версия всё ещё была актуальной в момент выполнения модификации. Если количество затронутых строк равно 0, значит другая транзакция изменила те же данные, пока выполнялась текущая транзакция. Это означает, что обнаружен конфликт конкурентного доступа, и этой транзакции нельзя позволять фиксироваться (commit). Впоследствии транзакция откатывается (или помечается как rollback-only) и выбрасывается OptimisticLockingException.

Лицензия и атрибуция

Эта документация была создана на базе материала "Camunda 7 Docs" от Camunda, находится под лицензией Creative Commons Attribution-ShareAlike 3.0 Unported License .

Оригинал документации: https://docs.camunda.org