プラットフォームイベントトリガーの記述に関するベストプラクティスを適用する
学習の目的
この単元を完了すると、次のことができるようになります。
- 制限例外の発生後にトリガーを再開する機能を使用するプラットフォームイベントトリガーを記述する。
- 例外の発生時にトリガーを再試行する機能を使用するプラットフォームイベントトリガーを記述する。
- プラットフォームイベントトリガーのベースとしてトリガーテンプレートを使用する。
注文イベントが公開されると、注文履行アプリケーションがそのイベントをリアルタイムで受信し、注文を処理できます。注文履行アプリケーションは Salesforce Platform の外部に存在する場合もあります。たとえば、Pub/Sub API を使用して注文履行システムをコールする外部クライアントなどです。また、注文アプリケーションはプラットフォーム上に存在する場合もあります。たとえば Apex プラットフォームイベントトリガーなどです。このモジュールで Cloud Kicks が使用している注文履行アプリケーションは、Order_Event__e のプラットフォームイベントトリガーです。
チェックポイントでプラットフォームイベントトリガーの回復力を高める
プラットフォームイベントまたは変更イベントに Apex トリガーを記述して、リアルタイムのイベントメッセージに登録できます。例外が発生しても正しく処理できる堅牢なトリガーを記述することが重要です。堅牢なトリガーでは、setResumeCheckpoint メソッドを使用することで、制限例外などのキャッチできない例外や未処理の例外が発生した後でも実行を再開できます。
DML や SOQL クエリの例外などの Apex 制限例外が発生した場合、その例外は try-catch ブロックではキャッチできません。コードは失敗し、トリガーに配信された現在のイベントバッチは利用できなくなります。そのバッチで未処理のイベントは再配信されません。
制限例外ではない例外が発生した場合は、try-catch ブロックを使用してキャッチできます。エラーを処理してデバッグログに記録し、イベントの処理を続行できます。ですが、トリガーで例外をキャッチして処理しない場合は、例外の発生後にトリガーの実行が停止し、同じイベントバッチの未処理イベントを再び取得することはできません。
制限例外または未処理の例外が発生したときに未処理のイベントバッチが失われるのを避けるには、setResumeCheckpoint メソッドを使用します。setResumeCheckpoint メソッドはイベントストリーム内にチェックポイントを設定し、プラットフォームイベントトリガーはその位置から新しい呼び出しで実行を再開します。
新しい呼び出しでトリガーを再開すると、新しい呼び出しでリセットされる制限例外や、一時的な制限例外ではない例外に対処しやすくなります。例外がスローされると、次回のトリガー実行時にそのチェックポイントが使用されます。トリガー処理は、最後に正常にチェックポイント設定されたイベントメッセージの後から再開されます。これにより、前回の実行で未処理だったイベントが失われず、次回のトリガー呼び出しでそれらのイベントを処理できます。
トリガーが再開されると、新しいバッチは再実行 ID を使用して setResumeCheckpoint で設定したイベントメッセージの次のメッセージから始まります。イベントは、変更されない ReplayId 項目値に基づいて元の順序で再送されます。トリガーは、再送されたイベントとその後のバッチを順番に処理します。また、再開後の新しいトリガー呼び出しでは、Apex ガバナ制限がリセットされます。その結果、制限例外がすぐに再発することはありません。
このトリガー例では、各反復で最後に処理したイベントメッセージの再実行 ID を設定します。例外が発生すると、トリガーが再び起動され、設定された再実行 ID を持つイベントメッセージの次のメッセージから処理を再開します。
trigger ResumeEventProcessingTrigger on Order_Event__e (after insert) {
for (Order_Event__e event : Trigger.New) {
// Process the event message.
// ...
// Set the Replay ID of the last successfully processed event message.
// If a limit is hit, the trigger refires and processing starts with the
// event after the last one processed (the set Replay ID).
EventBus.TriggerContext.currentContext().setResumeCheckpoint(event.replayId);
}
}詳細は、『プラットフォームイベント開発者ガイド』の「Resume a Platform Event Trigger After an Uncaught Exception (キャッチされない例外の発生後にプラットフォームイベントトリガーを再開する)」を参照してください。
再試行によって一時的なエラーを処理する
チェックポイントを使用してトリガーを再開する方法のほかにも、EventBus.RetryableException を使用してイベントのバッチ全体でトリガーを再試行する方法があります。トリガーを再試行すると、一時的なエラーが発生した場合や、条件が変わるのを待つ必要がある場合に、イベントメッセージをあらためて処理できます。
イベントトリガーを再試行するには、EventBus.RetryableException をスローします。イベントは短い遅延の後に再送され、再試行が重なるにつれて遅延が長くなります。
イベントは、変更されない ReplayId 項目値に基づいて元の順序で再送されます。トリガーは、再送されたイベントとその後のイベントバッチを順番に処理します。再送されたイベントの項目値は元のイベントと同じです。再送されるイベントバッチは大きくなる場合があります。トリガーが再試行されると、再試行前にそのトリガーで実行された DML 操作はロールバックされ、変更は保存されません。
トリガーは、再試行によって最大 10 回まで実行できます (初回実行と 9 回の再試行)。トリガーが 9 回再試行された後は、エラー状態に移行して新しいイベントの処理を停止します。詳細は、『プラットフォームイベント開発者ガイド』の「Retry Event Triggers with EventBus.RetryableException (EventBus.RetryableException を使用してイベントトリガーを再試行する)」を参照してください。
次の疑似コードは、EventBus.RetryableException をどのようにスローし、再試行回数をどのように制限するかの概要を示しています。このトリガーでは try-catch ブロックを使用し、catch ブロックで EventBus.RetryableException をスローします。このコードは、例外が一時的なものであり、今後のトリガー呼び出しでは発生しないことを前提としています。
trigger ResendEventsTrigger on Order_Event__e (after insert) {
try {
for (Order_Event__e event : Trigger.New) {
// Process platform event.
// ...
}
} catch(Exception e) {
System.debug('Caught exception: ' + e.getMessage() +
' Stack trace:' + e.getStackTrace());
System.debug('Retrying the platform event trigger');
// Retry the trigger up to 4 times and resend batch of events
if (EventBus.TriggerContext.currentContext().retries < 4) {
throw new EventBus.RetryableException(
'Retrying the platform event trigger.');
}
}プラットフォームイベントトリガーテンプレートを使用する
このプラットフォームイベントトリガーは、回復力のある方法でイベントを処理するためのモデルであり、チェックポイントと再試行可能な例外を組み合わせています。setResumeCheckpoint() をコールして例外を処理し、try-catch ブロックを使用して制限例外ではない例外を捕捉して、必要に応じて EventBus.RetryableException でトリガーを再び起動します。setResumeCheckpoint() メソッドは、少なくとも 1 件のイベントが正常に処理され、setResumeCheckpoint() が少なくとも 1 回コールされた場合に有効になります。
トリガーが制限例外を処理する方法
制限例外を処理する場合、トリガーは新しいトリガー呼び出しで再開されるため、制限はリセットされます。
- 少なくとも 1 件のイベントが正常に処理され、
setResumeCheckpoint()がコールされた後で制限例外が発生した場合、トリガーは最後に正常に処理されたイベントの後から新しい呼び出しで再開されます。制限がリセットされるため、制限例外がすぐに再発する可能性は低くなります。 - 最初のイベントが処理されて
setResumeCheckpoint()がコールされる前に制限例外が発生した場合、トリガーはその制限例外を捕捉できず、実行を停止してイベントバッチ全体を破棄します。制限例外は通常、いくつかのイベントを処理した後に発生するため、このケースが頻繁に発生することは想定されていません。
トリガーが制限例外ではない例外を処理するしくみ
制限例外ではない例外を try-catch ブロックで処理すると、例外メッセージを記録できるためデバッグに役立ちます。また、制限例外ではない例外が一時的なものであれば、イベントバッチ全体でトリガーを再試行して、イベントを処理する別の機会を得ることができます。
setResumeCheckpoint()がコールされる前のループの最初の反復で制限例外ではない例外が発生した場合 (processedEventsCounter == 0)、トリガーはその例外を捕捉し、catchブロックでEventBus.RetryableExceptionをスローします。EventBus.RetryableExceptionによって、トリガーは元のイベントバッチ全体で再び起動されます。再び起動される回数には上限があります。トリガーを再び起動することで、今後のトリガー呼び出しでは再発しない一時的な例外に対処できます。その例外が繰り返し発生し、トリガーの再試行回数も使い切った場合は、最初のイベントがスキップされ、ループはバッチ内の次のイベントに進みます。- 少なくとも 1 件のイベントが正常に処理された後で制限例外ではない例外が発生した場合 (
processedEventsCounter > 0)、その例外はデバッグログに記録されます。次に、その例外は再スローされ、トリガーは終了し、前に設定されたチェックポイントにより新しい呼び出しが開始されます。新しいイベントバッチは、例外の原因となった最後のイベントから始まります。この新しい呼び出しで例外が再発した場合は、トリガーが処理するバッチ内の最初のイベントであるため、EventBus.RetryableExceptionがコールされます。EventBus.RetryableExceptionによって、イベントバッチは一定回数再試行されます。トリガーを再び起動することで、今後のトリガー呼び出しでは再発しない一時的な例外に対処できます。その例外が繰り返し発生し、トリガーの再試行回数も使い切った場合は、最初のイベントがスキップされ、ループはバッチ内の次のイベントに進みます。
trigger EventTriggerTemplate on Order_Event__e (after insert) {
Integer processedEventsCounter = 0;
Boolean shouldContinueToProcess = true;
for (Order_Event__e event : Trigger.New) {
try {
// Process event message.
// ....
// Set Replay ID after which to resume event processing
// in new trigger execution.
EventBus.TriggerContext.currentContext().setResumeCheckpoint(
event.ReplayId);
System.debug('Processed event with Replay ID: ' + event.ReplayId);
processedEventsCounter++;
if (shouldContinueToProcess != true) {
// Resume after the last successfully processed event message
// after the trigger stops running.
// Exit for loop.
break;
}
} catch(Exception e) {
// This catch block works only for non-limit exceptions
// because limit exceptions cannot be caught.
//
// If no events have been processed, throw new EventBus.RetryableException
if (processedEventsCounter == 0) {
// Only throw RetryableException when the first event
// is processed but before setResumeCheckpoint is called.
if (EventBus.TriggerContext.currentContext().retries < 4) {
throw new EventBus.RetryableException();
}
} else {
// Else log the exception in the debug log. The trigger will
// resume from the checkpoint next time.
System.debug('An exception occurred: ' + e.getMessage());
// Rethrow exception - trigger exits and resumes with this event
// as the first event in the batch of the new invocation.
throw e;
}
}
}
}予期しない失敗の発生後に再開するトリガーを記述する
Vijay は、注文イベントを受信して処理するプラットフォームイベントトリガーを記述したいと考えています。また、プラットフォームイベントに登録するプラットフォームイベントトリガーは、堅牢にしておく必要があります。このトリガーは、制限例外から回復でき、その他の例外も処理できます。Vijay は、前の「プラットフォームイベントトリガーテンプレートを使用する」セクションにあるトリガーテンプレートを使用して変更を加えます。
OrderEventTrigger トリガーは、プラットフォームイベントトリガーテンプレートに基づいています。このトリガーは、テストコンテキストでこのユーティリティメソッド PlatformEventsTestUtil.forceExceptionForTesting() をコールして制限例外が発生するように変更されています。テストコンテキストで例外が発生した場合は、チェックポイントによって例外の発生後にトリガーが再開されることを確認できます。このトリガーは、例外のテストを可能にするユーティリティクラス PlatformEventsTestUtil に依存しています。また、定数を保存する PlatformEventsConstants クラスにも依存しています。トリガーが例外の発生後に再開されることを確認できるように、次のセクションにテストクラスが用意されています。
Trailhead Playground に OrderEventTrigger クラスを追加する手順は、次のとおりです。
- 開発者コンソールで、[New (新規)] | [Apex Class (Apex クラス)] をクリックします。
- クラス名として、
PlatformEventsConstantsと入力します。 - デフォルトのコードを、次のトリガー本文に置き換えます。
public class PlatformEventsConstants {
public static final Integer MAX_RETRIES = 3;
}- 同じ手順を繰り返して、
PlatformEventsTestUtilクラスを追加します。
public class PlatformEventsTestUtil {
private static Boolean shouldForceException = false;
private static Boolean isCatchableException = false;
private static Boolean isContinuous = false;
private static Integer runCounter = 0;
private static Integer exceptionThrownAtRun = 0;
public class MyCustomException extends Exception {}
// Throws a limit exception at a specific trigger run.
// The test method calls this utility method.
public static void throwLimitExceptionAtRun(Integer exThrownAtRun) {
enableExceptionForTesting(false, exThrownAtRun, false);
}
// Throws a catchable exception at a specific trigger run.
// The test method calls this utility method.
public static void throwCatchableExceptionAtRun(Integer exThrownAtRun) {
enableExceptionForTesting(true, exThrownAtRun, false);
}
// Throws a catchable exception starting at a specific trigger run
// and keeps throwing it again.
// The test method calls this utility method.
public static void throwCatchableExceptionContinuousAtRun(Integer exThrownAtRun) {
enableExceptionForTesting(true, exThrownAtRun, true);
}
// Sets up the test parameters.
private static void enableExceptionForTesting(Boolean isCatchableEx,
Integer exThrownAtRun,
Boolean isCont) {
if (Test.isRunningTest()) {
shouldForceException = true;
isCatchableException = isCatchableEx;
isContinuous = isCont;
runCounter = 0;
exceptionThrownAtRun = exThrownAtRun;
}
}
// Enables testing of exceptions in an Apex trigger.
// Runs only in test context.
// The Apex trigger calls this method.
public static void forceExceptionForTesting() {
if (Test.isRunningTest() && shouldForceException) {
runCounter++;
System.debug('runCounter=' + runCounter);
Boolean shouldThrowException = (runCounter == exceptionThrownAtRun) ||
(runCounter >= exceptionThrownAtRun && isContinuous);
if (shouldThrowException) {
if (isCatchableException) {
System.debug('Throwing a catchable exception.');
throw new MyCustomException();
} else {
System.debug('Throwing a limit exception.');
throw new System.LimitException();
}
}
}
}
// Cleans up the test parameters. The test method calls this method.
public static void cleanUp() {
shouldForceException = false;
isCatchableException = false;
isContinuous = false;
runCounter = 0;
exceptionThrownAtRun = 0;
}
public static Integer getRunCounter() {
return runCounter;
}
}- 同じ手順を繰り返して、[New (新規)] | [Apex Trigger (Apex トリガー)] を選択し、
OrderEventTriggerクラスを追加します。
trigger OrderEventTrigger on Order_Event__e (after insert) {
private Integer processedEventsCounter = 0;
for (Order_Event__e event : Trigger.New) {
try {
// Enable testing exceptions. Runs only in test context.
PlatformEventsTestUtil.forceExceptionForTesting();
// Process event message.
// ...
// Set Replay ID after which to resume event processing
// in a new trigger execution.
EventBus.TriggerContext.currentContext().setResumeCheckpoint(
event.ReplayId);
System.debug('Processed event with Replay ID: ' + event.ReplayId);
processedEventsCounter++;
} catch(Exception e) {
// This catch block works only for non-limit exceptions
// because limit exceptions cannot be caught.
//
// If no events have been processed, throw new EventBus.RetryableException
if (processedEventsCounter == 0) {
// Only throw RetryableException when the first event
// is processed but before setResumeCheckpoint is called.
if (EventBus.TriggerContext.currentContext().retries <
PlatformEventsConstants.MAX_RETRIES) {
throw new EventBus.RetryableException();
}
}
// This block is reached when catchable exception is received and
// one of these statements is true:
// - At least one event was processed.
// - No event was processed but the number of retries
// for RetryableException is exhausted.
//
// Log the exception in the debug log. The trigger will
// resume from the checkpoint if a checkpoint was set.
System.debug('An exception occurred in OrderEventTrigger: ' + e.getTypeName());
// Rethrow exception: Trigger exits and resumes with this event
// as the first event in the batch of the new invocation.
throw e;
}
}
}Apex テストでトリガーの再開をテストする
元のテストイベントバッチと残りのテストイベントがトリガーに配信される回数を想定して、その回数だけ Test.getEventBus().deliver(); メソッドをコールすることによって、Apex テストでトリガーの再開を確認できます。deliver() テストメソッドを使用すると、イベントバッチがトリガーに配信されます。トリガーがイベントバッチの一部しか処理しない場合は、Test.getEventBus().deliver(); を再度コールします。次回の deliver() コールで、バッチ内の残りの未処理イベントが送信され、トリガーが再開されます。
OrderEventTriggerTest テストクラスには、さまざまなシナリオをテストするテストメソッドが含まれています。これらのテストでは、キャッチできない制限例外とキャッチできる例外の発生後にトリガーが再開されることをテストします。その他のテストでは、RetryableException を使用したトリガーの再試行を確認します。EventBusSubscriber.Position 項目を使用して、処理されたイベント数を確認します。テストコンテキストでは ReplayId が 1 から始まって順に増加するため、これを使用できます。非テストコンテキストでは、ReplayId はこのようにはなりません。
OrderEventTriggerTest クラスを Trailhead Playground に追加します。
@isTest
public class OrderEventTriggerTest {
// This test causes the trigger to hit a non-catchable limit exception
// on the third event.
// Expected: The trigger resumes where it left off and processes the next events.
@isTest
static void testResumeAfterLimitException() {
PlatformEventsTestUtil.throwLimitExceptionAtRun(3);
try {
// Publish 5 events.
// First event batch should have 2 successfully processed events
// because the exception occurs on the 3rd event.
// Verify that trigger resumes and processes remaining events.
publishAndDeliverEvents(5, 2);
} finally {
// Clean up test parameters so they don't affect other tests
PlatformEventsTestUtil.cleanUp();
}
}
// This test causes the trigger to hit a catchable exception on the second event.
// Expected: The trigger resumes where it left off and processes the next events.
@isTest
static void testResumeAfterCatchableExceptionOnSecondEvent() {
PlatformEventsTestUtil.throwCatchableExceptionAtRun(2);
try {
// Publish 5 events.
// First event batch should have 1 successfully processed event
// and setResumeCheckpoint was called.
// The exception occurs on the 2nd event.
// Verify that trigger resumes and processes remaining events.
publishAndDeliverEvents(5, 1);
} finally {
// Clean up test parameters so they don't affect other tests
PlatformEventsTestUtil.cleanUp();
}
}
// This test causes the trigger to hit a catchable exception on the first event.
// Expected: The trigger is retried with the entire batch of events.
@isTest
static void testRetryAfterCatchableExceptionOnFirstEvent() {
PlatformEventsTestUtil.throwCatchableExceptionAtRun(1);
try {
// Publish 5 events.
// First batch of events delivered is empty because
// the execution jumps to the catch block.
// Because setResumeCheckpoint has not been called yet,
// the trigger will be retried with EventBus.RetryableException
// with the entire batch of events.
publishAndDeliverEvents(5, 0);
} finally {
// Clean up test parameters so they don't affect other tests
PlatformEventsTestUtil.cleanUp();
}
}
// This test causes the trigger to hit a catchable exception on the first event and
// exhausts the number of retries.
// Expected: The trigger is retried with the entire batch of events MAX_RETRIES + 1 times.
// After the retries are exhausted, the trigger rethrows the exception.
@isTest
static void testRetryAfterCatchableExceptionOnFirstEventMaxRetriesReached() {
PlatformEventsTestUtil.throwCatchableExceptionContinuousAtRun(1);
try {
publishEvents(2);
// The trigger should run (max retry + 1) times
for (Integer i = 1; i <= PlatformEventsConstants.MAX_RETRIES + 1; i++) {
Test.getEventBus().deliver();
Assert.areEqual(i, PlatformEventsTestUtil.getRunCounter(),
'Trigger should have retried.');
}
// Calling deliver more times won't cause the trigger to run,
// since we're not throwing the Retryable exception anymore.
Test.getEventBus().deliver();
Assert.areEqual(PlatformEventsConstants.MAX_RETRIES + 1,
PlatformEventsTestUtil.getRunCounter(),
'Trigger should NOT have retried.');
} finally {
// Clean up test parameters so they don't affect other tests
PlatformEventsTestUtil.cleanUp();
}
}
// Helper method to publish events.
private static void publishEvents(Integer totalEvents) {
List<Order_Event__e> eventList = new List<Order_Event__e>();
// Create test events. Number of test events to create is totalEvents.
for(Integer i = 0; i < totalEvents; i++) {
Order_Event__e event = (Order_Event__e)Order_Event__e.sObjectType.newSObject(
null, true);
event.Order_Id__c='dummyOrderId' + i;
eventList.add(event);
}
// Publish all events
EventBus.publish(eventList);
}
// Helper method to publish events and call deliver twice to verify
// that the trigger resumes after an exception.
private static void publishAndDeliverEvents(Integer totalEvents,
Integer expectedFirstDeliveryCount) {
publishEvents(totalEvents);
// Deliver the batch of events
Test.getEventBus().deliver();
// Verify that the trigger processed expectedFirstDeliveryNum of events
// because it processed only that many events before the exception
// was hit.
Assert.areEqual(expectedFirstDeliveryCount,
getTriggerPosition(), 'Unexpected number of events processed.');
// Call deliver() again to deliver the remaining events in the batch
// and cause the trigger resume after the exception.
Test.getEventBus().deliver();
// Verify that the trigger resumes and remaining events are processed
// for the remaining events, so there are a total of totalEvents processed events.
Assert.areEqual(totalEvents,
getTriggerPosition(),
'Unexpected number of events processed.');
getTriggerPosition();
}
// Helper method to return the ReplayId of the last processed event.
// In test context, replayIDs start from 1 and are incremented.
private static Integer getTriggerPosition() {
Integer position = 0;
EventBusSubscriber[] subscribers =
[SELECT Name, Type, Position, Retries, LastError
FROM EventBusSubscriber WHERE Topic='Order_Event__e'];
for (EventBusSubscriber sub : subscribers) {
if (sub.Name == 'OrderEventTrigger') {
System.debug('sub.Position='+sub.Position);
position = sub.Position;
}
}
return position;
}
}Trailhead Playground の開発者コンソールでテストクラスを実行する
- 開発者コンソールで、[Test (テスト)] | [New Run (新規実行)] をクリックします。
- [Test Classes (テストクラス)] 列で、OrderEventTriggerTest を選択します。
- テストを選択して、[Run (実行)] をクリックします。
- 完了したテスト実行をダブルクリックして、結果を詳細ビューで開きます。すべてのテストが成功し、
OrderEventTriggerのコードカバー率が 100% であることを確認できます。
この単元では、障害に強いトリガーを記述するのに役立つ機能について学習しました。チェックポイントを使用すると、制限例外や未処理の例外が発生した後にトリガーを再開できます。イベントの完全なバッチでトリガーを再試行して、イベントを改めて処理できます。これらの手法に加えて、制限に達しないようにトリガーバッチのサイズを制御することもできます。その方法は、次の単元で学習します。
リソース
- プラットフォームイベント開発者ガイド: Resume a Platform Event Trigger After an Uncaught Exception (キャッチされない例外の発生後にプラットフォームイベントトリガーを再開する)
- プラットフォームイベント開発者ガイド: EventBus.RetryableException によるイベントトリガーの再試行
- プラットフォームイベント開発者ガイド: Comparing setResumeCheckpoint() and EventBus.RetryableException (setResumeCheckpoint() と EventBus.RetryableException を比較する)
- プラットフォームイベント開発者ガイド: Deliver Test Event Messages (テストイベントメッセージを配信する)
- プラットフォームイベント開発者ガイド: プラットフォームイベントの割り当て
- プラットフォームイベント開発者ガイド: View and Manage an Event’s Subscribers on the Platform Event’s Detail Page (プラットフォームイベントの詳細ページでイベントのサブスクライバーを表示および管理する)
- Apex 開発者ガイド: 実行ガバナと制限
- Apex 開発者ガイド: Running Apex Within Governor Execution Limits (ガバナ実行制限内で Apex を実行する)
- Apex 開発者ガイド: Exceptions in Apex (Apex 開発者ガイド: Apex の例外)
- Trailhead: Apex Replay Debugger でバグを見つけて修正する
