Amazon Kinesis とは
AWS のリアルタイムストリーミングデータ処理プラットフォーム
出典: Amazon Kinesis - Big Data Analytics Options on AWS
“Amazon Kinesis is a platform for streaming data on AWS that makes it easy to load and analyze streaming data.”
Kinesis ファミリーの構成
Kinesis は単一のサービスではなく、4つのサービスで構成されるプラットフォーム
出典: Amazon Kinesis - Big Data Analytics Options on AWS
“Currently there are four pieces of the Kinesis platform that can be utilized based on your use case”
| サービス | 役割 | いつ使うか |
|---|---|---|
| Kinesis Data Streams | データの取り込み・保持 | カスタム処理が必要なとき |
| Amazon Data Firehose | データの自動配信 | S3/Redshift等に送るだけでいいとき |
| Managed Service for Apache Flink | ストリーム分析 | SQL/Javaでリアルタイム分析したいとき |
| Kinesis Video Streams | 動画処理 | カメラ映像を扱うとき |
このメモでは主に Kinesis Data Streams を扱う
なぜ Kinesis Data Streams が必要か
Producer から Consumer に直接データを送る構成でも動作はする
Producer → Consumer(直接送信)
ただし、この構成では対応できないケースがある
Kinesis が解決する問題
1. Producer と Consumer の速度差の吸収
出典: Amazon Kinesis Data Streams FAQs
“Kinesis Data Streams is useful for rapidly moving data off data producers and then continuously processing the data”
Producer からデータを素早く受け取り、Consumer は自分のペースで処理できる
例:
Producer: 毎秒10,000件送る
Consumer: 毎秒1,000件しか処理できない
→ 直接送ると Consumer がパンクしてデータが捨てられる
Kinesis を挟むと:
Producer → [Kinesis] → Consumer
→ Kinesis がバッファとして一時的に溜める
→ Consumer は自分のペースで処理できる
2. Producer 障害時のデータ保護
出典: Amazon Kinesis Data Streams FAQs
“Instead of waiting to batch the data, you can have your data producers push data to a Kinesis data stream as soon as the data is produced, preventing data loss in case of producer failure.”
Producer がクラッシュしても、既に Kinesis に送ったデータは失われない。ローカルに溜めてからバッチで送る方式だと、クラッシュ時にローカルのデータが消える
3. Consumer 障害時のデータ保護
出典: Amazon Kinesis Data Streams FAQs
“The default retention period of 24 hours covers scenarios where intermittent lags in processing require catch-up with the real-time data.”
Kinesis はデータを保持する(デフォルト24時間、最大365日)。Consumer が落ちている間のデータも、復旧後に処理できる
直接送信の場合:
Producer → Consumer
→ Consumer が落ちている間のデータは消える
Kinesis を挟む場合:
Producer → [Kinesis] → Consumer
→ Kinesis がデータを保持
→ Consumer が復旧したら、落ちてた間のデータも処理できる
4. 複数 Consumer での同一データ利用
出典: Amazon Kinesis Data Streams FAQs
“Ability for multiple applications to consume the same stream concurrently. For example, you have one application that updates a real-time dashboard and another that archives data to Amazon Redshift. You want both applications to consume data from the same stream concurrently and independently.”
1回送れば、複数の Consumer が同じデータを読める
直接送信の場合:
Producer → Consumer A
→ Consumer B も同じデータが欲しい場合、もう一度送る必要がある
Kinesis を挟む場合:
Producer → [Kinesis] → Consumer A(ダッシュボード用)
→ Consumer B(アーカイブ用)
→ Consumer C(分析用)
→ 1回送れば全員が読める
5. Producer と Consumer の疎結合化
出典: Service introduction - Reactive Systems on AWS
“Amazon Kinesis Data Streams is a real-time data streaming service used to decouple key application components”
Producer は Consumer の存在を知らなくていい
直接送信の場合:
Producer → Consumer
→ Consumer の IP やエンドポイントが変わったら Producer を修正する必要がある
Kinesis を挟む場合:
Producer → [Kinesis] → Consumer
→ Producer は Kinesis に送るだけ
→ Consumer が増えても減っても Producer は変更不要
Kinesis の役割まとめ
| 機能 | 直接送信 | Kinesis |
|---|---|---|
| 速度差の吸収 | できない | できる |
| データの保持 | できない | 最大365日 |
| 複数 Consumer | 都度送信 | 1回で済む |
| 疎結合 | 密結合 | 疎結合 |
出典: Amazon Kinesis Data Streams FAQs
“Kinesis Data Streams manages the infrastructure, storage, networking, and configuration needed to stream your data at the level of your data throughput.”
Kinesis はインフラ、ストレージ、ネットワーク、設定を管理する。データを受け取って保持し、Consumer に渡すという仕組み全体をマネージドで提供している
主なユースケース
出典: Amazon Kinesis - Big Data Analytics Options on AWS
1. リアルタイムデータ分析
“Real-time data analytics – Kinesis Data Streams enables real-time data analytics on streaming data, such as analyzing website clickstream data and customer engagement analytics.”
例えば EC サイトで「今この瞬間、どのページが見られているか」をリアルタイムで把握したい場合。バッチ処理だと1時間前のデータしか見れないが、Kinesis なら数秒前のデータを分析できる
2. ログの即時取り込み
“With Kinesis Data Streams, you can have producers push data directly into an Amazon Kinesis stream… This prevents the log data from being lost if the front-end or application server fails”
サーバーがクラッシュしても、既に Kinesis に送ったログは失われない。ローカルにログを貯めてからバッチで送る方式だと、クラッシュ時にローカルのログが消える
3. リアルタイムダッシュボード
“You can use data ingested into Kinesis Data Streams for extracting metrics and generating KPIs to power reports and dashboards at real-time speeds.”
「今の売上」「今のアクセス数」をリアルタイムで表示するダッシュボードを作れる
向いていないケース
出典: Amazon Kinesis - Big Data Analytics Options on AWS
“Small scale consistent throughput – Even though Kinesis Data Streams works for streaming data at 200 KB per second or less, it is designed and optimized for larger data throughputs.” “Long-term data storage and analytics – Kinesis Data Streams is not suited for long-term data storage.”
- 小規模データ(200KB/秒以下): Kinesis は大量データ向けに最適化されている。小規模なら SQS の方がシンプルでコスト効率が良い
- 長期保存: 最大365日しか保持できない。長期保存なら S3 に送るべき
SNS / SQS との違い
3つのサービスの設計思想
出典: Amazon SQS, Amazon SNS, or Amazon EventBridge?
“Amazon SQS is a pull-based message queuing service… Amazon SNS is a push-based pub/sub messaging service…”
出典: Message driven - Reactive Systems on AWS
“Amazon Kinesis Data Streams and Amazon MSK are real-time data streaming services that enable decoupling services and allow event replay within a defined retention period.”
多分こういうこと?
| サービス | 何をするか | データの扱い |
|---|---|---|
| SQS | タスクを順番に処理する | 処理したら消える |
| SNS | 通知を配る | 配ったら消える |
| Kinesis | データを流し続ける | 一定期間残る |
基本コンポーネント
全体像
Producer → [Kinesis Data Stream] → Consumer
│
└─ 複数の Shard で構成
│
└─ 各 Shard に Record が入る
用語の定義
出典: Amazon Kinesis Data Streams Terminology and concepts
Stream(ストリーム)
“A Kinesis data stream is a set of shards.”
シャードの集合体。「my-order-stream」のような名前を付けて作成する。データの論理的なまとまり
Shard(シャード)
“A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.”
ストリーム内のデータ分割単位。詳細は後述
Record(レコード)
“A data record is the unit of data stored in a Kinesis data stream. Data records are composed of a sequence number, a partition key, and a data blob”
実際のデータ1件。「ユーザーAが商品Bをクリックした」のような1つのイベント
Producer(プロデューサー)
“Producers put records into Amazon Kinesis Data Streams.”
データを送信する側。Webサーバー、IoTデバイス、モバイルアプリなど
Consumer(コンシューマー)
“Consumers get records from Amazon Kinesis Data Streams and process them.”
データを受信・処理する側。Lambda、EC2上のアプリ、Firehose など
シャード(Shard)とは何か
シャードの定義
出典: Amazon Kinesis Data Streams Terminology and concepts
“A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.”
シャードは「データの流れる水路」のようなもの。1本の水路には流せる水の量に限界がある
シャードの処理能力
出典: Amazon Kinesis Data Streams FAQs
“A shard supports 1 MB/second and 1,000 records per second for writes and 2 MB/second for reads.”
| 方向 | 制限 |
|---|---|
| 書き込み | 1MB/秒、1,000レコード/秒 |
| 読み取り | 2MB/秒、5トランザクション/秒 |
この制限は固定。超えたい場合はシャードを増やす
なぜシャードという単位が必要か
シャードがない(1本のパイプだけ)だと:
- 処理能力に上限がある(スケールできない)
- 1つのコンシューマーしか処理できない(並列処理できない)
- 1箇所が詰まると全体が止まる
シャードがあると:
- スケーラビリティ: シャード数を増やせば処理能力が増える
- 並列処理: 各シャードを別々のコンシューマーで同時に処理できる
- 順序保証: シャード内ではレコードの順序が保証される
「関連するデータだけ同じシャードに入れる」ことで、シャード内では順序保証、シャード間では並列処理を実現する
パーティションキー(Partition Key)とは何か
パーティションキーの定義
出典: Amazon Kinesis Data Streams Terminology and concepts
“A partition key is used to group data by shard within a stream… An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards.”
パーティションキーは、どのシャードに入れるかを決めるためのラベル
なぜパーティションキーが必要か
データを送るとき、Kinesis は「どのシャードに入れるか」を決める必要がある
この振り分けルールを決めるのがパーティションキー
同じパーティションキー = 同じシャードに入る = 順序が保証される
例:
user-123 の操作:
10:00 注文 → Shard 0
10:01 支払い → Shard 0(同じキーなので同じシャード)
10:02 発送 → Shard 0
→ 順番通り処理される
user-456 の操作:
10:00 注文 → Shard 1(別のキーなので別シャード)
→ user-123 とは並列処理される
パーティションキーとシャードIDの違い
| パーティションキー | シャードID | |
|---|---|---|
| 誰が決める | ユーザー | AWS |
| いつ使う | データ送信時 | データ取得時(直接API) |
| 目的 | 関連データをまとめる | シャードを識別する |
| 例 | user-123, order-456 | shardId-000000000001 |
もしシャードIDを直接指定する設計だったら:
- シャード数が変わったらアプリのコードを全部書き換える必要がある
- どのシャードに入れるか、毎回自分で計算する必要がある
パーティションキーがあると:
- シャード数が変わっても同じコードで動く(AWS が自動で振り分ける)
- ビジネス上の意味のあるキー(ユーザーID、注文IDなど)で考えられる
パーティションキーはシャードIDの抽象化と言っていい(?)
処理を記述する上ではユーザーはシャードの存在を意識しなくて良い
コンシューマーの種類
コンシューマーとは
出典: Amazon Kinesis Data Streams Terminology and concepts
“Consumers get records from Amazon Kinesis Data Streams and process them.”
コンシューマーは Kinesis からデータを取得して処理するアプリケーション。Lambda、EC2上のアプリ、Firehose などが該当する
データ取得方式の選択肢
コンシューマーが Kinesis からデータを取得する方式は2種類ある
出典: Amazon Kinesis Data Streams FAQs
“What is a consumer, and what are different consumer types offered by Kinesis Data Streams? … You can choose between shared fan-out and enhanced fan-out consumer types to read data from a Kinesis data stream.”
共有スループット(標準コンシューマー)
コンシューマーが定期的に GetRecords API を呼び出してデータを取得する方式(ポーリング)
ドキュメントによって呼び方が異なる:
出典: Develop enhanced fan-out consumers with dedicated throughput
“The following are the differences between shared throughput consumers and enhanced fan-out consumers”
出典: Using Lambda to process records from Amazon Kinesis Data Streams
“Standard iterators use HTTP protocol to poll each shard for records and share read throughput with other consumers.”
「shared throughput consumer」「standard iterator」「標準コンシューマー」は全て同じものを指す
出典: Amazon Kinesis Data Streams FAQs
“The shared fan-out consumers all share a shard’s 2 MB/second of read throughput and five transactions per second limits”
1シャードあたり2MB/秒の帯域を、全てのコンシューマーで共有する。コンシューマーが増えると、1つあたりの帯域が減り、レイテンシーが悪化する
Enhanced Fan-Out
Kinesis がコンシューマーにデータをプッシュする方式。SubscribeToShard API を使用する
出典: Amazon Kinesis Data Streams FAQs
“An enhanced fan-out consumer gets its own 2 MB/second allotment of read throughput, allowing multiple consumers to read data from the same stream in parallel, without contending for read throughput with other consumers.”
各コンシューマーが専用の2MB/秒帯域を持つ。コンシューマーが増えても帯域を奪い合わない
2つの方式の比較
出典: Develop enhanced fan-out consumers with dedicated throughput
“Message propagation delay… Typically an average of 70 ms whether you have one consumer or five consumers.”
共有スループット(標準):
┌─── 1MB/秒 ───→ Consumer A
Shard (2MB/秒) ─────┤
└─── 1MB/秒 ───→ Consumer B
→ 2MB/秒を全員で分け合う
→ コンシューマーが増えると1つあたりの帯域が減る
→ レイテンシー: 200ms〜1000ms
Enhanced Fan-Out:
┌─── 2MB/秒 ───→ Consumer A
Shard (2MB/秒) ─────┤
└─── 2MB/秒 ───→ Consumer B
→ 各コンシューマーが専用の2MB/秒を持つ
→ コンシューマーが増えても帯域は減らない
→ レイテンシー: 約70ms
| 種類 | 仕組み | レイテンシー | コスト |
|---|---|---|---|
| 共有スループット | GetRecords API でポーリング | 200ms〜1000ms | 追加料金なし |
| Enhanced Fan-Out | SubscribeToShard API でプッシュ | 約70ms | 追加料金あり |
どちらを選ぶか
出典: Amazon Kinesis Data Streams FAQs
“We recommend using enhanced fan-out consumers if you want to add more than one consumer to your data stream.”
- コンシューマーが1つだけ、またはレイテンシー要件が緩い → 共有スループット
- コンシューマーが複数、または低レイテンシーが必要 → Enhanced Fan-Out
Lambda との連携
Lambda を使うメリット
直接 API を使う場合、自分で実装すること:
- シャード一覧の取得
- 各シャードのイテレータ取得
- ポーリング
- チェックポイント管理(どこまで処理したか)
- シャード分割時の対応
- エラー時のリトライ
Lambda を使う場合、これらを全て AWS が管理してくれる。ビジネスロジックだけ書けばいい
Lambda 同時実行数とシャードの関係
基本: 1シャード = 1 Lambda 実行
3シャード → 最大3つの Lambda が同時に動く
100シャード → 最大100の Lambda が同時に動く
なぜか: AWS が1シャードにつき1つの Lambda を割り当てるから。これにより、シャード内の順序保証が維持される
ParallelizationFactor とは
出典: Using Lambda to process records from Amazon Kinesis Data Streams
“You can specify the number of concurrent batches that Lambda polls from a shard via a parallelization factor from 1 (default) to 10.” “For example, when you set ParallelizationFactor to 2, you can have 200 concurrent Lambda invocations at maximum to process 100 Kinesis data shards”
「1シャードに対して複数の Lambda を同時に動かす」設定
- デフォルト: 1(1シャード = 1 Lambda)
- 最大: 10(1シャード = 10 Lambda)
いつ使うか:
- 1シャードに大量のデータが流れている
- Lambda 1つでは処理が追いつかない
- でもシャードは増やしたくない(コストや設計の都合)
計算式:
同時実行 Lambda 数 = シャード数 × ParallelizationFactor
例: 10シャード × Factor 5 = 最大50 Lambda
設定場所: Lambda のイベントソースマッピング
注意: Factor を上げると、同じシャード内でも並列処理される。パーティションキー単位での順序は保証されるが、シャード全体の順序は保証されなくなる
レイテンシー改善
方法1: Enhanced Fan-Out を使用する
出典: Develop enhanced fan-out consumers with dedicated throughput
“Message propagation delay… Typically an average of 70 ms whether you have one consumer or five consumers.”
最も効果的な方法。標準コンシューマーの200ms〜1000msから、約70msに改善できる
標準コンシューマー(ポーリング):
┌─→ Consumer A
Shard ─── 2MB/秒を共有 ───┼─→ Consumer B
└─→ Consumer C
→ コンシューマーが増えるとレイテンシー悪化
Enhanced Fan-Out(プッシュ):
┌─── 専用 2MB/秒 ───→ Consumer A
Shard ┼─── 専用 2MB/秒 ───→ Consumer B
└─── 専用 2MB/秒 ───→ Consumer C
→ コンシューマーが増えてもレイテンシー変わらず
方法2: シャード数を増やす
出典: Using Lambda to process records from Amazon Kinesis Data Streams
“To increase the speed at which your function processes records, add shards to your data stream.”
シャード数を増やすと並列処理数が増え、全体の処理速度が向上する
シャード 2つ:
Shard 0 ──→ Lambda A ─┐
Shard 1 ──→ Lambda B ─┴─→ 2並列で処理
シャード 4つに増やす:
Shard 0 ──→ Lambda A ─┐
Shard 1 ──→ Lambda B ─┤
Shard 2 ──→ Lambda C ─┼─→ 4並列で処理(2倍速)
Shard 3 ──→ Lambda D ─┘
方法3: ParallelizationFactor を上げる
出典: Using Lambda to process records from Amazon Kinesis Data Streams
“This helps scale up the processing throughput when the data volume is volatile and the IteratorAge is high.”
シャード数を増やさずに並列処理数を増やせる。IteratorAge(処理遅延)が大きい場合に有効
Factor = 1(デフォルト):
Shard 0 ──→ Lambda A(1つ)
Shard 1 ──→ Lambda B(1つ)
→ 合計 2 Lambda
Factor = 3 に変更:
Shard 0 ──→ Lambda A-1, A-2, A-3(3つ同時)
Shard 1 ──→ Lambda B-1, B-2, B-3(3つ同時)
→ 合計 6 Lambda(3倍速)
計算式:
同時実行 Lambda 数 = シャード数 × ParallelizationFactor