ストリーミング API の使用

学習の目的

この単元を完了すると、次のことができるようになります。
  • プル技術を凌ぐプッシュ技術の主な利点を説明する。
  • PushTopic を作成してイベント通知を受信する。
  • プラットフォームイベントを定義し、登録チャネルを派生させる。
  • 汎用ストリーミングを使用して通知をブロードキャストする。
  • デュラブルストリーミングの再生オプションを指定する。

ストリーミングイベント

Salesforce のデータ API に関する調査の結論を出すために、まったく異なるユースケースに用いられるもう 1 つの API を見てみましょう。ストリーミング API では、クライアントアプリケーションは定義した条件に基づいた一連の通知を Salesforce からプッシュで受け取れます。他の API ではクライアントアプリケーションが Salesforce にデータを要求する方式 (プル型) が採られていますが、プッシュ型通知はこのプル型とどのように異なるのでしょうか? ここで、船長になった気分でこの違いを検証してみましょう。

船長となったあなたは外洋航海中、予測される危険や他の船舶、そして宝に溢れた島々を見逃さないようにしたいと考えています。船員の 1 人を見張り台に登らせ、異変を積極的に見つけ出すよう指示します。では開発者の立場に戻ります。あなたは開発者で、取引先が更新されていないかどうかを定期的に確認する REST または SOAP API を使用して、アプリケーションを記述しているとします。取引先データを絶えず要求して、古いデータと一致しているかどうかを確認することは、異変を積極的に見つけ出すことに通じる手法です。

さて、あなたは再び船上にいるとします。ただし、今回は最新のレーダー表示装置を利用できるものとします。気になる物体が近づくと表示装置から信号音が発せられるため、見張り台に船員を立たせておく必要がありません。

ストリーミング API は、このレーダーの役割を果たします。API でイベントを定義し、イベントが発生したときにクライアントアプリケーションに通知を転送 (プッシュ) できます。データの変更を積極的に見つけ出す必要がありません。Salesforce に絶えずポーリングして、不要な API 要求を行う必要がありません。

ストリーミング API はレーダーと同様、データの変更を検知して通知を送信することができます。

Salesforce の外部にあるシステムにビジネスデータが保存されている場合は特に Salesforce のデータ変更の追跡が役立ちます。ストリーミング API を使用すれば、PushTopic イベントと変更データキャプチャイベントを使用して、外部ソースを Salesforce データと常に同期させておくことができます。また、ストリーミング API により、Salesforce でのデータの変更に応じて、外部システムでビジネスプロセスを処理できます。たとえば、ストリーミング API を使用すれば、商談が更新されるたびに注文発送センターに通知することができます。

メモ

メモ

変更データキャプチャの詳細は、「変更データキャプチャの基礎」 Trailhead モジュールを参照してください。

データ変更のほかにも、ストリーミング API では、プラットフォームイベントと汎用ストリーミングを使用してカスタム通知をブロードキャストできます。たとえば、アプリケーションは、注文フルフィルメントサービスが処理する注文のプラットフォームイベント通知を生成できます。または、汎用イベントをリスンし、システムメンテナンス期間の開始前やユーザへの新機能の提供時などにメッセージを表示できます。

PushTopics

PushTopic は、特定のオブジェクトのデータ変更など、リスンするイベントの条件を含む sObject です。SOQL クエリとして定義して、通知の対象となるレコード操作 (作成、更新、削除、復元) を指定します。PushTopic は、イベントの条件のほか、クライアントアプリケーションが登録するチャネルも表します。

この点については、PushTopic の作成時に詳しく説明します。

PushTopic クエリでサポートされるオブジェクト

PushTopic クエリでは、すべてのカスタムオブジェクトとよく使用される標準オブジェクト (取引先、取引先責任者、商談など) がサポートされます。サポートされる標準オブジェクトの完全なリストについては、「リソース」セクションの『ストリーミング API 開発者ガイド』を参照してください。

PushTopic と通知

PushTopic では、イベント通知を受信するオブジェクト、項目、および条件を定義できます。Apex で定義して挿入された PushTopic の例を次に示します。この PushTopic の作成後にこの PushTopic チャネルに登録すると、請求先市区郡がサンフランシスコの取引先への変更を追跡できます。この PushTopic では、各イベント通知で Id、[名前]、[電話] 項目を返すように指定されています。デフォルトでは、クエリの条件と一致する作成、更新、削除、復元操作に関する通知が送信されます。
PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'AccountUpdates';
pushTopic.Query = 'SELECT Id, Name, Phone FROM Account WHERE BillingCity=\'San Francisco\'';
pushTopic.ApiVersion = 37.0;
insert pushTopic;

最低でも PushTopic の名前、クエリ、および API バージョンを定義します。残りのプロパティはデフォルト値を使用できます。デフォルトでは、SELECT ステートメントの項目リストおよび WHERE 句の項目が、通知をトリガする項目です。通知は、WHERE 句の条件に一致したレコードについてのみ送信されます。通知には、SELECT 句の項目が含まれます。通知をトリガする項目を変更するには、pushTopic.NotifyForFields を次のいずれかの値に設定します。

NotifyForFields の値
説明
すべて
評価されるレコードが WHERE 句で指定された条件と一致した場合、すべてのレコード項目変更について通知が生成されます。
Referenced (デフォルト)
SELECT 句と WHERE 句で参照されている項目への変更が評価されます。評価されるレコードが WHERE 句で指定された条件と一致した場合、そのレコードのみについて通知が生成されます。
選択
SELECT 句で参照されている項目への変更が評価されます。評価されるレコードが WHERE 句で指定された条件と一致した場合、そのレコードのみについて通知が生成されます。
場所
WHERE 句で参照されている項目への変更が評価されます。評価されるレコードが WHERE 句で指定された条件と一致した場合、そのレコードのみについて通知が生成されます。
通知設定を明示的に設定するには、次のプロパティを true または false に設定します。デフォルトでは、すべての値が true に設定されます。
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;

取引先を作成すると、イベント通知が生成されます。通知は JSON 形式で、PushTopic クエリで指定した Id[名前][電話] 項目が含まれます。イベント通知は次のようになります。

{
  "clientId": "lxdl9o32njygi1gj47kgfaga4k",
  "data": {
    "event": {
      "createdDate": "2016-09-16T19:45:27.454Z",
      "replayId": 1,
      "type": "created"
    },
    "sobject": {
      "Phone": "(415) 555-1212",
      "Id": "001D000000KneakIAB",
      "Name": "Blackbeard"
    }
  },
  "channel": "/topic/AccountUpdates"
}

通知メッセージには、PushTopic のチャネルが /topic/PushTopicName 形式で含まれます。PushTopic を作成すると、自動的にチャネルが作成されます。

PushTopic クエリ

ここで、PushTopic に定義したクエリについて詳しく説明します。PushTopic クエリは通常の SOQL クエリで、SOQL に慣れている方は新たな形式を習得する必要がありません。クエリの形式は、次のようになります。
SELECT <comma-separated list of fields> FROM <Salesforce object> WHERE <filter criteria>

通知が遅滞なく送信されるように、PushTopic クエリには次の要件が適用されます。

  • SELECT ステートメントの項目リストに Id を含める必要があります。
  • 1 つのクエリに指定できるオブジェクトは 1 つのみです。
  • オブジェクトは、指定された API バージョンで有効である必要があります。
集計クエリや準結合など一部のクエリはサポートされません。

プラットフォームイベントを使用したカスタム通知

プラットフォームイベントを使用して、事前定義されたスキーマを使用したカスタム通知を公開および登録します。PushTopic イベントや変更データキャプチャイベントとは異なり、プラットフォームイベントは Salesforce レコードには関連付けられず、Salesforce で自動公開されることはありません。代わりに、プラットフォームイベントを作成し、項目を追加して、プラットフォームイベントメッセージのスキーマを定義します。また、クライアントは、Lightning プラットフォーム、Apex、または API で宣言型ツールを使用してプラットフォームイベントを公開します。

プラットフォームイベントのバージョン付きスキーマで、登録者はイベントを確定的に解析できます。各スキーマのバージョンは、イベント通知メッセージに含まれる一意のスキーマ ID に対応しています。

プラットフォームイベントの定義

ユーザインターフェースでプラットフォームイベントを定義するには、[設定] から、[クイック検索] ボックスに「プラットフォームイベント」と入力し、[プラットフォームイベント] を選択します。プラットフォームイベントに項目を追加する手順は、カスタムオブジェクトに項目を追加する手順と似ています。項目種別のサブセットがサポートされます。

プラットフォームイベントの API 参照名には、__e というサフィックスが含まれます。たとえば、Order Event という表示ラベルのプラットフォームイベントを作成する場合、API 参照名は Order_Event__e となります。

プラットフォームイベントを定義したら、チャネル名が自動的に提供されます。チャネル名はイベントの API 参照名に基づき、形式は /event/Event_Name です。たとえば、/event/Order_Event__e のようになります。

プラットフォームイベントの公開

Lightning プラットフォームで次の宣言的ツールまたはプログラム型ツールを使用して、プラットフォームイベントを公開できます。

  • [レコードを作成] アクションを使用するプロセスビルダー
  • [レコード作成] 要素を使用するフロー
  • Apex EventBus.publish() メソッド
  • REST API sobjects リソース
  • SOAP API create() コール

詳細は、「リソース」セクションのプラットフォームイベントに関するドキュメントを参照してください。

プラットフォームイベントの登録

ストリーミング API には、プラットフォームイベントを含め、複数のイベント種別の登録メカニズムが用意されています。ストリーミング API のほかに、Lightning プラットフォームを使用してプラットフォームイベントに登録することができます。

  • プラットフォームイベントの発生時に開始されるプロセスを使用するプロセスビルダー
  • プラットフォームイベントの発生を待機するフロー
  • Apex トリガ
  • CometD メッセージングライブラリを使用するストリーミング API
  • empApi Lightning コンポーネント

この例は、注文イベントのプラットフォームイベントメッセージです。

{
  "data": {
    "schema": "dffQ2QLzDNHqwB8_sHMxdA",
    "payload": {
      "CreatedDate": "2018-08-22T12:11:40.517Z",
      "CreatedById": "005D0000001cSZs",
      "Order_Number__c": "12345",
      "Has_Shipped__c": true
    },
    "event": {
      "replayId": 1
    }
  },
  "channel": "/event/Order_Event__e"
}

詳細は、「リソース」セクションのプラットフォームイベントに関するドキュメントを参照してください。

汎用ストリーミングを使用したカスタム通知

単独航海に出航する前に、汎用ストリーミングについても簡単に説明しておきましょう。ストリーミング API は、Salesforce データの変更に関連付けられていない、汎用ペイロードを使用した通知の送信をサポートしています。
任意のペイロードを使用し、事前定義されたスキーマを使用せずにカスタム通知を送受信するには、汎用ストリーミングを使用します。汎用ストリーミングでは、対象となるユーザに通知を公開できます。汎用ストリーミングを使用する場合は、以下のものが必要です。
  • チャネルを定義するストリーミングチャネル
  • チャネルに登録された 1 つ以上のクライアント
  • チャネルのイベントを監視したり呼び出したりする Streaming Channel Push リソース
汎用ストリーミングのストリーミングチャネルは、ストリーミングチャネルアプリケーションのユーザインターフェースか API を使用して作成できます。ストリーミングチャネルは StreamingChannel sObject で表されるため、Apex、REST API、または SOAP API を使用して作成できます。汎用ストリーミングのチャネル名の形式は、/u/ChannelName です。たとえば、この Apex スニペットは Broadcast という名前のチャネルを作成します。
StreamingChannel ch = new StreamingChannel();
ch.Name = '/u/Broadcast';
insert ch;

また、ストリーミングチャネルが存在しない場合に Salesforce で動的に作成されるようにするオプションを選択できます。組織で動的ストリーミングチャネルを有効にするには、[設定] から [クイック検索] ボックスに「ユーザインターフェース」と入力し、[ユーザインターフェース] を選択します。[ユーザインターフェース] ページで、[動的ストリーミングチャネル作成を有効化] オプションを選択します。

CometD クライアントを使用してチャネルに登録できます。(「リソース」セクションに『ストリーミング API 開発者ガイド』のサンプルウォークスルーへのリンクがあります)。

イベントを生成するには、次の REST リソースへの POST 要求を行います。XX.0 を API バージョン、Streaming Channel ID をチャネルの ID に置き換えます。

/services/data/vXX.0/sobjects/StreamingChannel/Streaming Channel ID/push
メモ

メモ

チャネル ID を取得するには、StreamingChannel で SELECT Id, Name FROM StreamingChannel などの SOQL クエリを実行します。

REST リクエストボディの例

{
  "pushEvents": [
      {
          "payload": "Broadcast message to all subscribers",
          "userIds": []
      }
   ]
}
メモ

メモ

すべての登録者にブロードキャストする代わりに、省略可能な userIds 項目を使用して、通知を送信する登録ユーザのリストを指定します。また、Streaming Channel Push REST API リソースの GET メソッドを使用して、チャネルのアクティブな登録者のリストを取得することもできます。

登録クライアントが受信するイベント通知は次のようになります。

{
  "clientId": "1p145y6g3x3nmnlodd7v9nhi4k",
  "data": {
    "payload": "Broadcast message to all subscribers",
    "event": {
      "createdDate": "2016-09-16T20:43:39.392Z",
      "replayId": 1
    }
  },
  "channel": "/u/Broadcast"
}

このイベント通知には replayId 項目が含まれています。 

イベント通知のおかげで、自信を持って荒波を航行し、宝物がたくさんある島に向かうことができます。

デュラブルストリーミングを使用した過去の通知の取得

ここまで、さまざまなイベント種別について学びました。クライアントがイベントチャネルに登録する前、またはクライアントが接続していないときに、Salesforce レコードが作成または更新されている場合はどうなるでしょうか? API バージョン 37.0 より前は、対応する通知がクライアントに送信されませんでした。API バージョン 37.0 以降は、PushTopic イベント、汎用イベント、標準規模のプラットフォームイベントは 24 時間、大規模イベントは 72 時間それぞれ保存されます。この時間内であればいつでもイベントメッセージを取得できます。やりました!

API バージョン 37.0 以降、各イベント通知メッセージには replayId という項目が含まれています。動画の再生のように、ストリーミング API は replayId 項目を使用して、送信されたイベント通知を再生します。

各イベントメッセージには、ReplayId 項目に含まれる不透明な ID が割り当てられます。ReplayId 項目値は、イベントが登録者に配信されるときにシステムによって入力され、イベントストリーム内のイベントの位置を参照します。連続するイベントの再実行 ID 値が連番になるという保証はありません。たとえば、ID 999 のイベントの次のイベントの ID が 1,025 になることもあり得ます。登録者は、再実行 ID 値を保存し、その ID を再登録で使用して保持期間内のイベントを取得することができます。たとえば、登録者は接続障害が発生した後に、欠落イベントを取得できます。登録者が、保存された再実行 ID に基づいて新しい再実行 ID を計算し、ストリーム内の他のイベントを参照することはできません。

表 1. 再生オプション
再生オプション
説明
使用方法
再実行 ID
登録者は、replayId 値で指定したイベント後に発生したすべての保存されたイベントと新しいイベントを受信します。
特定のイベントメッセージ (接続障害など) の後に、見逃したイベントを確認することができます。特定の再生 ID を使用して登録するには、取得する保存されたイベントの最初のイベントメッセージの再生 ID を保存します。そして、再登録するときにこの再生 ID を使用します。
-1
登録者は、クライアントが登録した後にブロードキャストされる新規イベントを受信します。
新しいイベントメッセージを受信するように、クライアントは -1 オプションを使用して登録することをお勧めします。クライアントが以前のイベントメッセージを取得する必要がある場合は、他の再生オプションを使用できます。
-2
登録者は、保管期間中の過去のイベントと、新規イベントを含め、すべてのイベントを受信します。
たとえば接続障害などの後に、見逃したイベントを確認し、すべての保存されているイベントを取得することができます。このオプションは慎重に使用してください。多数のイベントメッセージが保存されている場合に -2 オプションを使用して登録すると、パフォーマンスが低下する恐れがあります。

次の図は、イベントコンシューマがさまざまな再生オプションを使用してイベントのストリームを読み取る方法を示しています。

ストリーミングイベントと再生オプション