Amazon Kinesis とは

AWS のリアルタイムストリーミングデータ処理プラットフォーム

出典: Amazon Kinesis - Big Data Analytics Options on AWS

“Amazon Kinesis is a platform for streaming data on AWS that makes it easy to load and analyze streaming data.”

Kinesis ファミリーの構成

Kinesis は単一のサービスではなく、4つのサービスで構成されるプラットフォーム

出典: Amazon Kinesis - Big Data Analytics Options on AWS

“Currently there are four pieces of the Kinesis platform that can be utilized based on your use case”

サービス役割いつ使うか
Kinesis Data Streamsデータの取り込み・保持カスタム処理が必要なとき
Amazon Data Firehoseデータの自動配信S3/Redshift等に送るだけでいいとき
Managed Service for Apache Flinkストリーム分析SQL/Javaでリアルタイム分析したいとき
Kinesis Video Streams動画処理カメラ映像を扱うとき

このメモでは主に Kinesis Data Streams を扱う


なぜ Kinesis Data Streams が必要か

Producer から Consumer に直接データを送る構成でも動作はする

Producer → Consumer(直接送信)

ただし、この構成では対応できないケースがある

Kinesis が解決する問題

1. Producer と Consumer の速度差の吸収

出典: Amazon Kinesis Data Streams FAQs

“Kinesis Data Streams is useful for rapidly moving data off data producers and then continuously processing the data”

Producer からデータを素早く受け取り、Consumer は自分のペースで処理できる

例:
  Producer: 毎秒10,000件送る
  Consumer: 毎秒1,000件しか処理できない
  → 直接送ると Consumer がパンクしてデータが捨てられる

Kinesis を挟むと:
  Producer → [Kinesis] → Consumer
  → Kinesis がバッファとして一時的に溜める
  → Consumer は自分のペースで処理できる

2. Producer 障害時のデータ保護

出典: Amazon Kinesis Data Streams FAQs

“Instead of waiting to batch the data, you can have your data producers push data to a Kinesis data stream as soon as the data is produced, preventing data loss in case of producer failure.”

Producer がクラッシュしても、既に Kinesis に送ったデータは失われない。ローカルに溜めてからバッチで送る方式だと、クラッシュ時にローカルのデータが消える

3. Consumer 障害時のデータ保護

出典: Amazon Kinesis Data Streams FAQs

“The default retention period of 24 hours covers scenarios where intermittent lags in processing require catch-up with the real-time data.”

Kinesis はデータを保持する(デフォルト24時間、最大365日)。Consumer が落ちている間のデータも、復旧後に処理できる

直接送信の場合:
  Producer → Consumer
  → Consumer が落ちている間のデータは消える

Kinesis を挟む場合:
  Producer → [Kinesis] → Consumer
  → Kinesis がデータを保持
  → Consumer が復旧したら、落ちてた間のデータも処理できる

4. 複数 Consumer での同一データ利用

出典: Amazon Kinesis Data Streams FAQs

“Ability for multiple applications to consume the same stream concurrently. For example, you have one application that updates a real-time dashboard and another that archives data to Amazon Redshift. You want both applications to consume data from the same stream concurrently and independently.”

1回送れば、複数の Consumer が同じデータを読める

直接送信の場合:
  Producer → Consumer A
  → Consumer B も同じデータが欲しい場合、もう一度送る必要がある

Kinesis を挟む場合:
  Producer → [Kinesis] → Consumer A(ダッシュボード用)
                      → Consumer B(アーカイブ用)
                      → Consumer C(分析用)
  → 1回送れば全員が読める

5. Producer と Consumer の疎結合化

出典: Service introduction - Reactive Systems on AWS

“Amazon Kinesis Data Streams is a real-time data streaming service used to decouple key application components”

Producer は Consumer の存在を知らなくていい

直接送信の場合:
  Producer → Consumer
  → Consumer の IP やエンドポイントが変わったら Producer を修正する必要がある

Kinesis を挟む場合:
  Producer → [Kinesis] → Consumer
  → Producer は Kinesis に送るだけ
  → Consumer が増えても減っても Producer は変更不要

Kinesis の役割まとめ

機能直接送信Kinesis
速度差の吸収できないできる
データの保持できない最大365日
複数 Consumer都度送信1回で済む
疎結合密結合疎結合

出典: Amazon Kinesis Data Streams FAQs

“Kinesis Data Streams manages the infrastructure, storage, networking, and configuration needed to stream your data at the level of your data throughput.”

Kinesis はインフラ、ストレージ、ネットワーク、設定を管理する。データを受け取って保持し、Consumer に渡すという仕組み全体をマネージドで提供している

主なユースケース

出典: Amazon Kinesis - Big Data Analytics Options on AWS

1. リアルタイムデータ分析

“Real-time data analytics – Kinesis Data Streams enables real-time data analytics on streaming data, such as analyzing website clickstream data and customer engagement analytics.”

例えば EC サイトで「今この瞬間、どのページが見られているか」をリアルタイムで把握したい場合。バッチ処理だと1時間前のデータしか見れないが、Kinesis なら数秒前のデータを分析できる

2. ログの即時取り込み

“With Kinesis Data Streams, you can have producers push data directly into an Amazon Kinesis stream… This prevents the log data from being lost if the front-end or application server fails”

サーバーがクラッシュしても、既に Kinesis に送ったログは失われない。ローカルにログを貯めてからバッチで送る方式だと、クラッシュ時にローカルのログが消える

3. リアルタイムダッシュボード

“You can use data ingested into Kinesis Data Streams for extracting metrics and generating KPIs to power reports and dashboards at real-time speeds.”

「今の売上」「今のアクセス数」をリアルタイムで表示するダッシュボードを作れる

向いていないケース

出典: Amazon Kinesis - Big Data Analytics Options on AWS

“Small scale consistent throughput – Even though Kinesis Data Streams works for streaming data at 200 KB per second or less, it is designed and optimized for larger data throughputs.” “Long-term data storage and analytics – Kinesis Data Streams is not suited for long-term data storage.”

  • 小規模データ(200KB/秒以下): Kinesis は大量データ向けに最適化されている。小規模なら SQS の方がシンプルでコスト効率が良い
  • 長期保存: 最大365日しか保持できない。長期保存なら S3 に送るべき

SNS / SQS との違い

3つのサービスの設計思想

出典: Amazon SQS, Amazon SNS, or Amazon EventBridge?

“Amazon SQS is a pull-based message queuing service… Amazon SNS is a push-based pub/sub messaging service…”

出典: Message driven - Reactive Systems on AWS

“Amazon Kinesis Data Streams and Amazon MSK are real-time data streaming services that enable decoupling services and allow event replay within a defined retention period.”

多分こういうこと?

サービス何をするかデータの扱い
SQSタスクを順番に処理する処理したら消える
SNS通知を配る配ったら消える
Kinesisデータを流し続ける一定期間残る

基本コンポーネント

全体像

Producer → [Kinesis Data Stream] → Consumer
              │
              └─ 複数の Shard で構成
                   │
                   └─ 各 Shard に Record が入る

用語の定義

出典: Amazon Kinesis Data Streams Terminology and concepts

Stream(ストリーム)

“A Kinesis data stream is a set of shards.”

シャードの集合体。「my-order-stream」のような名前を付けて作成する。データの論理的なまとまり

Shard(シャード)

“A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.”

ストリーム内のデータ分割単位。詳細は後述

Record(レコード)

“A data record is the unit of data stored in a Kinesis data stream. Data records are composed of a sequence number, a partition key, and a data blob”

実際のデータ1件。「ユーザーAが商品Bをクリックした」のような1つのイベント

Producer(プロデューサー)

“Producers put records into Amazon Kinesis Data Streams.”

データを送信する側。Webサーバー、IoTデバイス、モバイルアプリなど

Consumer(コンシューマー)

“Consumers get records from Amazon Kinesis Data Streams and process them.”

データを受信・処理する側。Lambda、EC2上のアプリ、Firehose など


シャード(Shard)とは何か

シャードの定義

出典: Amazon Kinesis Data Streams Terminology and concepts

“A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity.”

シャードは「データの流れる水路」のようなもの。1本の水路には流せる水の量に限界がある

シャードの処理能力

出典: Amazon Kinesis Data Streams FAQs

“A shard supports 1 MB/second and 1,000 records per second for writes and 2 MB/second for reads.”

方向制限
書き込み1MB/秒、1,000レコード/秒
読み取り2MB/秒、5トランザクション/秒

この制限は固定。超えたい場合はシャードを増やす

なぜシャードという単位が必要か

シャードがない(1本のパイプだけ)だと:

  • 処理能力に上限がある(スケールできない)
  • 1つのコンシューマーしか処理できない(並列処理できない)
  • 1箇所が詰まると全体が止まる

シャードがあると:

  1. スケーラビリティ: シャード数を増やせば処理能力が増える
  2. 並列処理: 各シャードを別々のコンシューマーで同時に処理できる
  3. 順序保証: シャード内ではレコードの順序が保証される

「関連するデータだけ同じシャードに入れる」ことで、シャード内では順序保証、シャード間では並列処理を実現する

パーティションキー(Partition Key)とは何か

パーティションキーの定義

出典: Amazon Kinesis Data Streams Terminology and concepts

“A partition key is used to group data by shard within a stream… An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards.”

パーティションキーは、どのシャードに入れるかを決めるためのラベル

なぜパーティションキーが必要か

データを送るとき、Kinesis は「どのシャードに入れるか」を決める必要がある

この振り分けルールを決めるのがパーティションキー

同じパーティションキー = 同じシャードに入る = 順序が保証される

例:

user-123 の操作:
  10:00 注文   → Shard 0
  10:01 支払い → Shard 0(同じキーなので同じシャード)
  10:02 発送   → Shard 0
  → 順番通り処理される

user-456 の操作:
  10:00 注文   → Shard 1(別のキーなので別シャード)
  → user-123 とは並列処理される

パーティションキーとシャードIDの違い

パーティションキーシャードID
誰が決めるユーザーAWS
いつ使うデータ送信時データ取得時(直接API)
目的関連データをまとめるシャードを識別する
user-123, order-456shardId-000000000001

もしシャードIDを直接指定する設計だったら:

  • シャード数が変わったらアプリのコードを全部書き換える必要がある
  • どのシャードに入れるか、毎回自分で計算する必要がある

パーティションキーがあると:

  • シャード数が変わっても同じコードで動く(AWS が自動で振り分ける)
  • ビジネス上の意味のあるキー(ユーザーID、注文IDなど)で考えられる

パーティションキーはシャードIDの抽象化と言っていい(?)

処理を記述する上ではユーザーはシャードの存在を意識しなくて良い

コンシューマーの種類

コンシューマーとは

出典: Amazon Kinesis Data Streams Terminology and concepts

“Consumers get records from Amazon Kinesis Data Streams and process them.”

コンシューマーは Kinesis からデータを取得して処理するアプリケーション。Lambda、EC2上のアプリ、Firehose などが該当する

データ取得方式の選択肢

コンシューマーが Kinesis からデータを取得する方式は2種類ある

出典: Amazon Kinesis Data Streams FAQs

“What is a consumer, and what are different consumer types offered by Kinesis Data Streams? … You can choose between shared fan-out and enhanced fan-out consumer types to read data from a Kinesis data stream.”

共有スループット(標準コンシューマー)

コンシューマーが定期的に GetRecords API を呼び出してデータを取得する方式(ポーリング)

ドキュメントによって呼び方が異なる:

出典: Develop enhanced fan-out consumers with dedicated throughput

“The following are the differences between shared throughput consumers and enhanced fan-out consumers”

出典: Using Lambda to process records from Amazon Kinesis Data Streams

“Standard iterators use HTTP protocol to poll each shard for records and share read throughput with other consumers.”

「shared throughput consumer」「standard iterator」「標準コンシューマー」は全て同じものを指す

出典: Amazon Kinesis Data Streams FAQs

“The shared fan-out consumers all share a shard’s 2 MB/second of read throughput and five transactions per second limits”

1シャードあたり2MB/秒の帯域を、全てのコンシューマーで共有する。コンシューマーが増えると、1つあたりの帯域が減り、レイテンシーが悪化する

Enhanced Fan-Out

Kinesis がコンシューマーにデータをプッシュする方式。SubscribeToShard API を使用する

出典: Amazon Kinesis Data Streams FAQs

“An enhanced fan-out consumer gets its own 2 MB/second allotment of read throughput, allowing multiple consumers to read data from the same stream in parallel, without contending for read throughput with other consumers.”

各コンシューマーが専用の2MB/秒帯域を持つ。コンシューマーが増えても帯域を奪い合わない

2つの方式の比較

出典: Develop enhanced fan-out consumers with dedicated throughput

“Message propagation delay… Typically an average of 70 ms whether you have one consumer or five consumers.”

共有スループット(標準):

                      ┌─── 1MB/秒 ───→ Consumer A
  Shard (2MB/秒) ─────┤
                      └─── 1MB/秒 ───→ Consumer B
  
  → 2MB/秒を全員で分け合う
  → コンシューマーが増えると1つあたりの帯域が減る
  → レイテンシー: 200ms〜1000ms


Enhanced Fan-Out:

                      ┌─── 2MB/秒 ───→ Consumer A
  Shard (2MB/秒) ─────┤
                      └─── 2MB/秒 ───→ Consumer B
  
  → 各コンシューマーが専用の2MB/秒を持つ
  → コンシューマーが増えても帯域は減らない
  → レイテンシー: 約70ms
種類仕組みレイテンシーコスト
共有スループットGetRecords API でポーリング200ms〜1000ms追加料金なし
Enhanced Fan-OutSubscribeToShard API でプッシュ約70ms追加料金あり

どちらを選ぶか

出典: Amazon Kinesis Data Streams FAQs

“We recommend using enhanced fan-out consumers if you want to add more than one consumer to your data stream.”

  • コンシューマーが1つだけ、またはレイテンシー要件が緩い → 共有スループット
  • コンシューマーが複数、または低レイテンシーが必要 → Enhanced Fan-Out

Lambda との連携

Lambda を使うメリット

直接 API を使う場合、自分で実装すること:

  • シャード一覧の取得
  • 各シャードのイテレータ取得
  • ポーリング
  • チェックポイント管理(どこまで処理したか)
  • シャード分割時の対応
  • エラー時のリトライ

Lambda を使う場合、これらを全て AWS が管理してくれる。ビジネスロジックだけ書けばいい

Lambda 同時実行数とシャードの関係

基本: 1シャード = 1 Lambda 実行

3シャード → 最大3つの Lambda が同時に動く
100シャード → 最大100の Lambda が同時に動く

なぜか: AWS が1シャードにつき1つの Lambda を割り当てるから。これにより、シャード内の順序保証が維持される

ParallelizationFactor とは

出典: Using Lambda to process records from Amazon Kinesis Data Streams

“You can specify the number of concurrent batches that Lambda polls from a shard via a parallelization factor from 1 (default) to 10.” “For example, when you set ParallelizationFactor to 2, you can have 200 concurrent Lambda invocations at maximum to process 100 Kinesis data shards”

「1シャードに対して複数の Lambda を同時に動かす」設定

  • デフォルト: 1(1シャード = 1 Lambda)
  • 最大: 10(1シャード = 10 Lambda)

いつ使うか:

  • 1シャードに大量のデータが流れている
  • Lambda 1つでは処理が追いつかない
  • でもシャードは増やしたくない(コストや設計の都合)

計算式:

同時実行 Lambda 数 = シャード数 × ParallelizationFactor

例: 10シャード × Factor 5 = 最大50 Lambda

設定場所: Lambda のイベントソースマッピング

注意: Factor を上げると、同じシャード内でも並列処理される。パーティションキー単位での順序は保証されるが、シャード全体の順序は保証されなくなる


レイテンシー改善

方法1: Enhanced Fan-Out を使用する

出典: Develop enhanced fan-out consumers with dedicated throughput

“Message propagation delay… Typically an average of 70 ms whether you have one consumer or five consumers.”

最も効果的な方法。標準コンシューマーの200ms〜1000msから、約70msに改善できる

標準コンシューマー(ポーリング):
                            ┌─→ Consumer A
  Shard ─── 2MB/秒を共有 ───┼─→ Consumer B
                            └─→ Consumer C
  → コンシューマーが増えるとレイテンシー悪化

Enhanced Fan-Out(プッシュ):
        ┌─── 専用 2MB/秒 ───→ Consumer A
  Shard ┼─── 専用 2MB/秒 ───→ Consumer B
        └─── 専用 2MB/秒 ───→ Consumer C
  → コンシューマーが増えてもレイテンシー変わらず

方法2: シャード数を増やす

出典: Using Lambda to process records from Amazon Kinesis Data Streams

“To increase the speed at which your function processes records, add shards to your data stream.”

シャード数を増やすと並列処理数が増え、全体の処理速度が向上する

シャード 2つ:
  Shard 0 ──→ Lambda A ─┐
  Shard 1 ──→ Lambda B ─┴─→ 2並列で処理

シャード 4つに増やす:
  Shard 0 ──→ Lambda A ─┐
  Shard 1 ──→ Lambda B ─┤
  Shard 2 ──→ Lambda C ─┼─→ 4並列で処理(2倍速)
  Shard 3 ──→ Lambda D ─┘

方法3: ParallelizationFactor を上げる

出典: Using Lambda to process records from Amazon Kinesis Data Streams

“This helps scale up the processing throughput when the data volume is volatile and the IteratorAge is high.”

シャード数を増やさずに並列処理数を増やせる。IteratorAge(処理遅延)が大きい場合に有効

Factor = 1(デフォルト):
  Shard 0 ──→ Lambda A(1つ)
  Shard 1 ──→ Lambda B(1つ)
  → 合計 2 Lambda

Factor = 3 に変更:
  Shard 0 ──→ Lambda A-1, A-2, A-3(3つ同時)
  Shard 1 ──→ Lambda B-1, B-2, B-3(3つ同時)
  → 合計 6 Lambda(3倍速)

計算式:

同時実行 Lambda 数 = シャード数 × ParallelizationFactor