Skip to content

Kinesis

CloudMock emulates Amazon Kinesis Data Streams, supporting stream lifecycle, record ingestion (single and batch), shard iteration with sequential reads, and tagging.

OperationStatusNotes
CreateStreamSupportedCreates a stream with the specified shard count
DeleteStreamSupportedDeletes a stream and all its data
DescribeStreamSupportedReturns stream metadata and shard details
ListStreamsSupportedReturns all stream names
PutRecordSupportedWrites a single record to a stream
PutRecordsSupportedWrites up to 500 records in one call
GetShardIteratorSupportedReturns a shard iterator for reading
GetRecordsSupportedReturns records from a shard iterator
IncreaseStreamRetentionPeriodSupportedSets retention period (hours)
DecreaseStreamRetentionPeriodSupportedReduces retention period
AddTagsToStreamSupportedAdds tags to a stream
RemoveTagsFromStreamSupportedRemoves tags
ListTagsForStreamSupportedReturns tags for a stream
Terminal window
# Create a stream
curl -X POST http://localhost:4566 \
-H "X-Amz-Target: Kinesis_20131202.CreateStream" \
-H "Content-Type: application/x-amz-json-1.1" \
-d '{"StreamName": "events", "ShardCount": 1}'
# Put a record
curl -X POST http://localhost:4566 \
-H "X-Amz-Target: Kinesis_20131202.PutRecord" \
-H "Content-Type: application/x-amz-json-1.1" \
-d '{"StreamName": "events", "Data": "eyJldmVudCI6ImNsaWNrIn0=", "PartitionKey": "user-123"}'
import { KinesisClient, CreateStreamCommand, PutRecordsCommand, GetShardIteratorCommand, GetRecordsCommand } from '@aws-sdk/client-kinesis';
const kinesis = new KinesisClient({
endpoint: 'http://localhost:4566',
region: 'us-east-1',
credentials: { accessKeyId: 'test', secretAccessKey: 'test' },
});
await kinesis.send(new CreateStreamCommand({ StreamName: 'orders', ShardCount: 1 }));
await kinesis.send(new PutRecordsCommand({
StreamName: 'orders',
Records: [
{ Data: Buffer.from(JSON.stringify({ orderId: 'o-1' })), PartitionKey: 'p1' },
{ Data: Buffer.from(JSON.stringify({ orderId: 'o-2' })), PartitionKey: 'p2' },
],
}));
import boto3, json
kinesis = boto3.client('kinesis', endpoint_url='http://localhost:4566',
aws_access_key_id='test', aws_secret_access_key='test',
region_name='us-east-1')
kinesis.create_stream(StreamName='orders', ShardCount=1)
kinesis.put_records(
StreamName='orders',
Records=[
{'Data': json.dumps({'orderId': 'o-1'}), 'PartitionKey': 'p1'},
{'Data': json.dumps({'orderId': 'o-2'}), 'PartitionKey': 'p2'},
],
)
stream = kinesis.describe_stream(StreamName='orders')
shard_id = stream['StreamDescription']['Shards'][0]['ShardId']
iterator = kinesis.get_shard_iterator(
StreamName='orders', ShardId=shard_id, ShardIteratorType='TRIM_HORIZON',
)['ShardIterator']
records = kinesis.get_records(ShardIterator=iterator)
for r in records['Records']:
print(json.loads(r['Data']))
cloudmock.yml
services:
kinesis:
enabled: true

No additional service-specific configuration is required.

  • Records are stored in memory per shard. Sequence numbers are monotonically increasing integers.
  • GetRecords advances the iterator; subsequent calls return newer records.
  • Enhanced fan-out (SubscribeToShard) is not implemented.
  • Stream encryption and server-side encryption are accepted but not enforced.
  • Shard splitting and merging are not implemented.
CodeHTTP StatusDescription
ResourceNotFoundException400The specified stream does not exist
ResourceInUseException400The stream is being created or deleted
InvalidArgumentException400An argument is not valid
ProvisionedThroughputExceededException400The request rate exceeds the shard throughput