real-time-streaming
Use this skill when building real-time data pipelines, stream processing jobs, or change data capture systems. Triggers on tasks involving Apache Kafka (producers, consumers, topics, partitions, consumer groups, Connect, Streams), Apache Flink (DataStream API, windowing, checkpointing, stateful processing), event sourcing implementations, CDC with Debezium, stream processing patterns (windowing, watermarks, exactly-once semantics), and any pipeline that processes unbounded data in motion rather than data at rest.
data kafkaflinkcdcstream-processingevent-sourcingreal-timeWhat is real-time-streaming?
Use this skill when building real-time data pipelines, stream processing jobs, or change data capture systems. Triggers on tasks involving Apache Kafka (producers, consumers, topics, partitions, consumer groups, Connect, Streams), Apache Flink (DataStream API, windowing, checkpointing, stateful processing), event sourcing implementations, CDC with Debezium, stream processing patterns (windowing, watermarks, exactly-once semantics), and any pipeline that processes unbounded data in motion rather than data at rest.
real-time-streaming
real-time-streaming is a production-ready AI agent skill for claude-code, gemini-cli, openai-codex, and 1 more. Building real-time data pipelines, stream processing jobs, or change data capture systems.
Quick Facts
| Field | Value |
|---|---|
| Category | data |
| Version | 0.1.0 |
| Platforms | claude-code, gemini-cli, openai-codex, mcp |
| License | MIT |
How to Install
- Make sure you have Node.js installed on your machine.
- Run the following command in your terminal:
npx skills add AbsolutelySkilled/AbsolutelySkilled --skill real-time-streaming- The real-time-streaming skill is now available in your AI coding agent (Claude Code, Gemini CLI, OpenAI Codex, etc.).
Overview
A practitioner's guide to building and operating real-time data pipelines. This skill covers the full stack of stream processing - from ingestion (Kafka producers, CDC with Debezium) through processing (Kafka Streams, Apache Flink) to materialization (sinks, materialized views, event-sourced stores). The focus is on production-grade patterns: exactly-once semantics, backpressure handling, state management, and failure recovery. Designed for engineers who understand distributed systems basics and need concrete guidance on building streaming pipelines that run reliably at scale.
Tags
kafka flink cdc stream-processing event-sourcing real-time
Platforms
- claude-code
- gemini-cli
- openai-codex
- mcp
Related Skills
Pair real-time-streaming with these complementary skills:
Frequently Asked Questions
What is real-time-streaming?
Use this skill when building real-time data pipelines, stream processing jobs, or change data capture systems. Triggers on tasks involving Apache Kafka (producers, consumers, topics, partitions, consumer groups, Connect, Streams), Apache Flink (DataStream API, windowing, checkpointing, stateful processing), event sourcing implementations, CDC with Debezium, stream processing patterns (windowing, watermarks, exactly-once semantics), and any pipeline that processes unbounded data in motion rather than data at rest.
How do I install real-time-streaming?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill real-time-streaming in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support real-time-streaming?
This skill works with claude-code, gemini-cli, openai-codex, mcp. Install it once and use it across any supported AI coding agent.
Maintainers
Generated from AbsolutelySkilled
SKILL.md
Real-Time Streaming
A practitioner's guide to building and operating real-time data pipelines. This skill covers the full stack of stream processing - from ingestion (Kafka producers, CDC with Debezium) through processing (Kafka Streams, Apache Flink) to materialization (sinks, materialized views, event-sourced stores). The focus is on production-grade patterns: exactly-once semantics, backpressure handling, state management, and failure recovery. Designed for engineers who understand distributed systems basics and need concrete guidance on building streaming pipelines that run reliably at scale.
When to use this skill
Trigger this skill when the user:
- Sets up or configures Kafka topics, producers, or consumers
- Writes a Flink job (DataStream or Table API, windowing, state)
- Implements change data capture (CDC) from a database to a streaming pipeline
- Designs a stream processing topology (joins, aggregations, windowing)
- Debugs consumer lag, rebalancing storms, or backpressure issues
- Implements exactly-once or at-least-once delivery guarantees
- Builds an event sourcing system with streaming infrastructure
- Needs to choose between Kafka Streams, Flink, or Spark Streaming
Do NOT trigger this skill for:
- General event-driven architecture decisions (use event-driven-architecture skill)
- Batch ETL pipelines with no real-time component (use a data-engineering skill)
Key principles
Treat streams as the source of truth - In a streaming architecture, the log (Kafka topic) is the authoritative record. Databases, caches, and search indexes are derived views. Design from the stream outward, not from the database outward.
Partition for parallelism, key for correctness - Partitioning determines your maximum parallelism. Key selection determines ordering guarantees. Choose partition keys based on your highest-volume access pattern. Events that must be processed in order must share a key (and therefore a partition).
Exactly-once is a system property, not a component property - No single component delivers exactly-once alone. It requires idempotent producers, transactional writes, and consumer offset management working together end-to-end. Understand where your guarantees break down.
Backpressure is a feature, not a bug - When a consumer cannot keep up with a producer, the system must signal this. Design pipelines with explicit backpressure handling rather than unbounded buffering. Flink handles this natively; Kafka consumers need careful tuning of
max.poll.recordsandmax.poll.interval.ms.Late data is inevitable - Real-world events arrive out of order. Use watermarks to define "how late is too late," allowed lateness windows to handle stragglers, and side outputs for events that arrive after the window closes.
Core concepts
The streaming stack has three layers. The transport layer (Kafka, Pulsar, Kinesis) provides durable, ordered, partitioned logs. The processing layer (Flink, Kafka Streams, Spark Structured Streaming) reads from the transport, applies transformations, and writes results. The materialization layer (databases, search indexes, caches) serves the processed data to applications.
Kafka's core model centers on topics divided into partitions. Producers write to partitions (by key hash or round-robin). Consumer groups read partitions in parallel - each partition is assigned to exactly one consumer in the group. Offsets track progress. Consumer group rebalancing redistributes partitions when consumers join or leave.
Flink's execution model is based on dataflow graphs. A job is a DAG of operators (sources, transformations, sinks). Flink manages state via checkpointing - periodic snapshots of operator state to durable storage. On failure, Flink restores from the last checkpoint and replays from the source offset, achieving exactly-once processing.
Change data capture (CDC) turns database changes into a stream of events. Debezium reads the database's transaction log (WAL for Postgres, binlog for MySQL) and publishes change events to Kafka. Each event contains before/after snapshots of the row, enabling downstream consumers to reconstruct the full change history.
Common tasks
Set up a Kafka topic with proper configuration
Choose partition count based on target throughput and consumer parallelism. Set replication factor to at least 3 for production.
kafka-topics.sh --create \
--topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config min.insync.replicas=2 \
--bootstrap-server localhost:9092Start with partitions = 2x your expected max consumer count. You can increase partitions later but never decrease them. Changing partition count breaks key-based ordering guarantees for existing data.
Write an idempotent Kafka producer (Java)
Enable idempotent production to prevent duplicates on retries.
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", orderId, orderJson), (metadata, ex) -> {
if (ex != null) log.error("Send failed for order {}", orderId, ex);
});With
enable.idempotence=true, the broker deduplicates retries using sequence numbers. This requiresacks=alland allows up to 5 in-flight requests while maintaining ordering per partition.
Write a Flink windowed aggregation
Count events per key in tumbling 1-minute windows with late data handling.
DataStream<Event> events = env
.addSource(new FlinkKafkaConsumer<>("clicks", new EventSchema(), kafkaProps))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp()));
SingleOutputStreamOperator<WindowResult> result = events
.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.sideOutputLateData(lateOutputTag)
.aggregate(new CountAggregator());
result.addSink(new JdbcSink<>(...));
result.getSideOutput(lateOutputTag).addSink(new LateDataSink<>());Set
forBoundedOutOfOrdernessto the maximum expected event delay. Events arriving withinallowedLatenessafter the window fires trigger a re-computation. Events arriving after that go to the side output.
Configure CDC with Debezium and Kafka Connect
Deploy a Debezium PostgreSQL connector to stream table changes.
{
"name": "orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-primary",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${env:CDC_DB_PASSWORD}",
"database.dbname": "commerce",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"slot.name": "debezium_orders",
"publication.name": "dbz_orders_pub",
"snapshot.mode": "initial",
"transforms": "route",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "cdc\\.public\\.(.*)",
"transforms.route.topic.replacement": "cdc.$1"
}
}Always set
slot.nameexplicitly to avoid orphaned replication slots. Usesnapshot.mode=initialfor the first deployment to capture existing data, then switch tosnapshot.mode=no_datafor redeployments.
Implement exactly-once with Kafka transactions
Use transactions to atomically write to multiple topics and commit offsets.
producer.initTransactions();
try {
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
String result = process(record);
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
producer.close(); // fatal, must restart
} catch (KafkaException e) {
producer.abortTransaction();
}Transactional consumers must set
isolation.level=read_committedto avoid reading uncommitted records. This adds latency equal to the transaction duration.
Build a stream-table join in Kafka Streams
Enrich a stream of orders with customer data from a compacted topic.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), orderSerde, customerSerde)
);
enriched.to("enriched-orders");The KTable is backed by a local RocksDB state store. Ensure the
customerstopic usescleanup.policy=compactso the table always has the latest value per key. Monitor state store size - it can consume significant disk on the Streams instance.
Handle consumer lag and rebalancing
Monitor and tune consumer performance to prevent lag buildup.
# Check consumer lag per partition
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --describe
# Key tuning parameters
max.poll.records=500 # records per poll batch
max.poll.interval.ms=300000 # max time between polls before rebalance
session.timeout.ms=45000 # heartbeat timeout
heartbeat.interval.ms=15000 # heartbeat frequency (1/3 of session timeout)If processing takes longer than
max.poll.interval.ms, the consumer is evicted and triggers a rebalance. Reducemax.poll.recordsor increase the interval. Use cooperative sticky rebalancing (partition.assignment.strategy= CooperativeStickyAssignor) to minimize rebalance disruption.
Anti-patterns / common mistakes
| Mistake | Why it's wrong | What to do instead |
|---|---|---|
| Using a single partition for ordering | Destroys parallelism, creates a bottleneck | Partition by entity key; only events for the same entity need ordering |
| Unbounded state in stream processing | Memory grows until OOM; checkpoint sizes explode | Use TTL on state, windowed aggregations, or incremental cleanup |
| Ignoring consumer group rebalancing | Rebalance storms cause duplicate processing and lag spikes | Use cooperative sticky assignor, tune session/poll timeouts |
| CDC without monitoring replication slots | Orphaned slots cause WAL bloat and disk exhaustion on the database | Alert on slot lag, set max_replication_slots conservatively |
| Polling Kafka in a tight loop without backoff | Wastes CPU when topic is empty, causes unnecessary broker load | Use poll(Duration.ofMillis(100)) or longer; tune fetch.min.bytes |
| Skipping schema evolution | Breaking consumer deserialization on producer-side changes | Use a schema registry (Avro/Protobuf) with compatibility checks |
| Processing without idempotency | At-least-once delivery causes duplicate side effects | Make sinks idempotent (upserts, dedup keys, conditional writes) |
Gotchas
Orphaned Postgres replication slots from CDC - When a Debezium connector is paused, deleted, or loses connectivity, the replication slot on the database side continues to accumulate WAL. This can exhaust disk and bring down the primary. Always monitor
pg_replication_slotsforactive = falseslots and alert on slot lag. Drop slots explicitly when decommissioning a connector.Consumer group rebalance triggered by slow processing - If a consumer's processing loop exceeds
max.poll.interval.ms, Kafka evicts it and triggers a rebalance. This causes duplicate processing and lag spikes. Reducemax.poll.recordsto keep processing within the interval, or increase the interval - but don't increase it blindly without understanding the processing time distribution.Increasing Kafka partition count breaks key ordering - Partitions can be added but never removed. Adding partitions after data exists changes the key-to-partition mapping, meaning events for the same key may now land on different partitions. Never increase partition count on a topic where key-based ordering is a correctness requirement.
Flink checkpoint interval too aggressive - Very frequent checkpoints (e.g., every 10 seconds) increase checkpoint overhead and can starve actual processing throughput. Start with 1-5 minute intervals and tune down only if recovery time is unacceptably long.
Transactional consumer not setting
isolation.level=read_committed- Without this setting, consumers read uncommitted records from in-progress transactions, causing phantom reads. Any consumer of a transactionally-produced topic must setisolation.level=read_committed, accepting the added latency.
References
For detailed patterns and implementation guidance on specific streaming domains,
read the relevant file from the references/ folder:
references/kafka-operations.md- topic management, broker tuning, monitoring, security setupreferences/flink-patterns.md- checkpointing, savepoints, state backends, complex event processingreferences/cdc-debezium.md- connector configuration, schema evolution, snapshot strategies, MySQL/Postgres specificsreferences/stream-processing-patterns.md- windowing strategies, join types, deduplication, watermark tuning
Only load a references file if the current task requires it - they are long and will consume context.
References
cdc-debezium.md
CDC with Debezium
Architecture Overview
Debezium runs as a Kafka Connect source connector. It reads the database's transaction log and publishes change events to Kafka topics (one topic per table by default).
Database (WAL/binlog) -> Debezium Connector -> Kafka Topics -> ConsumersEach change event contains:
before: row state before the change (null for inserts)after: row state after the change (null for deletes)op: operation type (c=create,u=update,d=delete,r=read/snapshot)source: metadata (database, table, LSN/binlog position, timestamp)ts_ms: event timestamp
PostgreSQL Configuration
Database prerequisites
-- Enable logical replication in postgresql.conf
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create a dedicated user
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'secret';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
-- Create a publication (pgoutput plugin)
CREATE PUBLICATION dbz_publication FOR TABLE orders, order_items, customers;Connector configuration
{
"name": "pg-orders-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-primary.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/secrets/cdc-password.txt:password}",
"database.dbname": "commerce",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.order_items",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "debezium_orders",
"snapshot.mode": "initial",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "UPDATE debezium_heartbeat SET ts = NOW()",
"tombstones.on.delete": true,
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}Replication slot management
Replication slots prevent WAL segments from being recycled until consumed. If the connector falls behind or is removed without cleaning up:
- WAL accumulates on disk, eventually filling the volume
pg_replication_slotsshowsactive=falsefor orphaned slots
Monitor and alert on:
SELECT slot_name, active, pg_size_pretty(
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)
) AS slot_lag
FROM pg_replication_slots;Alert when slot lag exceeds a threshold (e.g., 1GB). Drop orphaned slots with:
SELECT pg_drop_replication_slot('debezium_orders');Heartbeats
The heartbeat mechanism prevents WAL bloat on low-traffic tables. Without heartbeats, the replication slot holds WAL even when no changes occur on monitored tables.
Set heartbeat.interval.ms=10000 and heartbeat.action.query to a lightweight
UPDATE on a dedicated heartbeat table.
MySQL Configuration
Database prerequisites
-- Enable binlog in my.cnf
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 3
-- Create a dedicated user
CREATE USER 'debezium'@'%' IDENTIFIED BY 'secret';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';GTID mode (recommended)
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${file:/secrets/mysql-password.txt:password}",
"database.server.id": "184054",
"topic.prefix": "cdc",
"database.include.list": "commerce",
"table.include.list": "commerce.orders,commerce.customers",
"include.schema.changes": true,
"gtid.source.includes": ".*",
"snapshot.mode": "initial"
}Use GTIDs for reliable binlog position tracking across failovers. Without GTIDs, a MySQL primary failover can cause the connector to lose its position.
Snapshot Modes
| Mode | Behavior | When to use |
|---|---|---|
initial |
Snapshot existing data, then stream changes | First deployment |
initial_only |
Snapshot only, no streaming | One-time migration |
no_data |
No snapshot, stream changes from current position | Redeployment after initial snapshot |
when_needed |
Snapshot if offsets are missing | Safe default for restarts |
never |
Never snapshot | When you control the starting offset |
After the initial deployment with
snapshot.mode=initial, change towhen_neededfor resilience. If offsets are lost (e.g., Kafka topic deleted), it will automatically re-snapshot.
Schema Evolution
Compatible changes (no connector restart needed)
- Adding a nullable column
- Adding a column with a default value
- Increasing column width (e.g., VARCHAR(50) to VARCHAR(100))
Breaking changes (require planning)
- Renaming a column: Debezium sees this as drop + add. Downstream consumers must handle both field names during transition.
- Changing column type: May break deserialization. Use Avro + schema registry with
BACKWARDcompatibility to catch issues before deployment. - Dropping a column: The
beforeimage will stop containing the field. Consumers relying on that field will break.
Recommended approach for schema changes
- Deploy consumer code that handles both old and new schema
- Apply the database schema change
- Debezium automatically picks up the new schema from the transaction log
- After all old-schema events are processed, remove legacy handling from consumers
Transforms (SMTs)
Common single-message transforms
{
"transforms": "route,unwrap,filter",
"transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.route.topic.regex": "cdc\\.public\\.(.*)",
"transforms.route.topic.replacement": "cdc.$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op != 'r'"
}- ByLogicalTableRouter: Simplify topic names (remove schema prefix).
- ExtractNewRecordState: Flatten the envelope to just the
afterstate. Useful when sinking to databases or search indexes that expect flat records. - Filter: Drop snapshot records (
op=r) or filter by field values.
Monitoring
Key Kafka Connect metrics
| Metric | What it means |
|---|---|
source-record-poll-total |
Total records polled from the database |
source-record-write-total |
Total records written to Kafka |
source-record-active-count |
Records polled but not yet written (backlog) |
milliseconds-behind-source |
How far the connector is behind the database |
snapshot-completion-pct |
Progress of initial snapshot (0-100) |
Debezium-specific metrics (JMX)
debezium.postgres:type=connector-metrics,context=streaming,server=<prefix>
- MilliSecondsBehindSource
- NumberOfEventsFiltered
- LastEvent
- ConnectedAlert on MilliSecondsBehindSource trending upward and Connected=false.
flink-patterns.md
Flink Patterns
Checkpointing and Fault Tolerance
Checkpoint configuration
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing every 60 seconds
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
// Checkpoint must complete within 10 minutes or be discarded
env.getCheckpointConfig().setCheckpointTimeout(600000);
// Allow only 1 checkpoint in progress at a time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Minimum 30 seconds between checkpoint starts
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// Keep checkpoints on cancellation for manual recovery
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Checkpoint interval tuning
- Short interval (10-30s): Lower recovery time but higher I/O overhead. Use for latency-sensitive jobs with small state.
- Medium interval (60-120s): Good default for most production jobs.
- Long interval (5-10min): For jobs with very large state (TBs) where checkpoint I/O is expensive.
Savepoints vs checkpoints
| Feature | Checkpoint | Savepoint |
|---|---|---|
| Purpose | Automatic fault tolerance | Manual operational snapshot |
| Triggered by | Flink runtime | Operator (CLI or REST API) |
| Format | Incremental, optimized | Full, portable |
| Use case | Crash recovery | Job upgrades, migration, A/B testing |
| Retained on cancel | Configurable | Always retained |
Always take a savepoint before: upgrading job code, changing parallelism, migrating clusters, or modifying state schema.
# Trigger savepoint
flink savepoint <jobId> s3://flink-savepoints/job-name/
# Restore from savepoint
flink run -s s3://flink-savepoints/job-name/savepoint-abc123 job.jarState Backends
RocksDB (recommended for production)
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // incremental checkpoints
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/job-name/");- Supports state larger than available memory (spills to disk).
- Incremental checkpoints: only uploads changed SST files.
- Tune
state.backend.rocksdb.block.cache-size(default 8MB, increase to 256MB+). - Monitor
rocksdb_compaction_pending- high values indicate I/O bottleneck.
HashMap (in-memory)
env.setStateBackend(new HashMapStateBackend());- Faster for small state (fits in JVM heap).
- Full checkpoint every time (no incremental).
- Use only when state fits comfortably in memory with headroom.
Windowing Strategies
Window types
| Type | Behavior | Use case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping | Hourly aggregations, batch-like processing |
| Sliding | Fixed-size, overlapping | Moving averages, rolling metrics |
| Session | Gap-based, variable size | User session analysis, activity grouping |
| Global | One window per key, custom triggers | Custom aggregation logic |
Event time vs processing time
Always prefer event time for correctness. Processing time is simpler but produces inconsistent results during backfill or replay.
// Event time with bounded out-of-orderness
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, ts) -> event.getEventTime())
.withIdleness(Duration.ofMinutes(1)); // handle idle partitionsThe withIdleness call is critical for topics with uneven partition throughput.
Without it, an idle partition holds back the watermark for the entire operator,
stalling all windows.
Custom triggers
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))Fires intermediate results every 30 seconds within a 5-minute window. Useful for dashboards that need frequent updates before the window closes.
Complex Event Processing (CEP)
Detect patterns in event streams using Flink's CEP library.
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
public boolean filter(Event e) { return e.getType().equals("LOGIN_FAILED"); }
})
.timesOrMore(3)
.within(Time.minutes(5))
.followedBy("success")
.where(new SimpleCondition<Event>() {
public boolean filter(Event e) { return e.getType().equals("LOGIN_SUCCESS"); }
});
PatternStream<Event> patternStream = CEP.pattern(events.keyBy(Event::getUserId), pattern);
patternStream.select(new PatternSelectFunction<Event, Alert>() {
public Alert select(Map<String, List<Event>> matches) {
return new Alert("BRUTE_FORCE_DETECTED", matches.get("start").get(0).getUserId());
}
});CEP patterns are stateful and consume memory proportional to the number of in-progress pattern matches. Set
within()to bound memory usage.
Table API and SQL
For analysts and simpler transformations, Flink SQL is often more productive.
CREATE TABLE orders (
order_id STRING,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'json'
);
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM orders
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR);Flink SQL windows use the same watermark mechanism as the DataStream API. Ensure
WATERMARK FORis defined in the table DDL for event-time processing.
Performance Tuning
Parallelism
- Set job-level default:
env.setParallelism(N). - Override per operator for hotspots:
stream.keyBy(...).window(...).setParallelism(32). - Parallelism cannot exceed partition count of the source topic.
Network buffers
taskmanager.network.memory.fraction: 0.15
taskmanager.network.memory.min: 256mb
taskmanager.network.memory.max: 1gbIncrease for high-throughput jobs or when seeing backpressure at network level.
Managed memory
taskmanager.memory.managed.fraction: 0.4RocksDB state backend uses managed memory for its block cache and write buffers. Increase this fraction for state-heavy jobs.
kafka-operations.md
Kafka Operations
Topic Management
Creation best practices
- Partition count: Target 2x your maximum expected consumer parallelism. 12-24 partitions is a reasonable default for most workloads. High-throughput topics may need 50-100+.
- Replication factor: Always 3 in production. Never 1, even in staging.
min.insync.replicas: Set to 2 with replication factor 3. This ensures writes are durable on at least 2 replicas before acknowledging.- Retention:
retention.msfor time-based,retention.bytesfor size-based. Usecleanup.policy=compactfor KTable-backing topics,deletefor event streams,compact,deletefor compacted topics with a retention window.
Key configuration parameters
| Parameter | Default | Recommendation | Why |
|---|---|---|---|
num.partitions |
1 | 12+ | Default is too low for any real workload |
default.replication.factor |
1 | 3 | Durability requires replication |
min.insync.replicas |
1 | 2 | Prevents data loss on broker failure |
unclean.leader.election.enable |
false | false | Never enable - causes data loss |
message.max.bytes |
1MB | 1MB | Increase only if truly needed; large messages are an anti-pattern |
Compaction
Compacted topics retain only the latest value per key. Essential for:
- KTable changelog topics (Kafka Streams)
- CDC snapshot topics
- Configuration distribution
Set min.cleanable.dirty.ratio=0.5 and segment.ms=3600000 (1 hour) to control
compaction frequency. Lower dirty ratio = more frequent compaction = less disk but
more I/O.
Broker Tuning
Memory
- JVM heap: 6GB is sufficient for most brokers. Do not exceed 8GB (GC pressure).
- Page cache: The real performance driver. Ensure the OS has 32-64GB+ of free RAM for page cache. Kafka reads/writes are sequential and rely heavily on OS cache.
socket.send.buffer.bytes/socket.receive.buffer.bytes: Set to 1MB+ for cross-datacenter replication.
Disk
- Use dedicated disks for Kafka log directories (not shared with OS).
- XFS or ext4 filesystem. Mount with
noatime. - Stripe across multiple disks using
log.dirs=/disk1/kafka,/disk2/kafka. - Monitor
under-replicated partitionsmetric - if non-zero, a broker is falling behind.
Network
num.network.threads: Set to number of CPU cores (handles socket I/O).num.io.threads: Set to 2x number of disks (handles log read/write).num.replica.fetchers: Increase to 2-4 for high-partition-count clusters.
Monitoring
Critical metrics to alert on
| Metric | Threshold | Action |
|---|---|---|
UnderReplicatedPartitions |
> 0 for 5 min | Broker falling behind; check disk I/O and network |
ActiveControllerCount |
!= 1 across cluster | Controller election issue; check ZK/KRaft |
OfflinePartitionsCount |
> 0 | Data unavailable; check broker health |
RequestHandlerAvgIdlePercent |
< 0.3 | Broker overloaded; scale or reduce load |
NetworkProcessorAvgIdlePercent |
< 0.3 | Network thread saturation |
LogFlushRateAndTimeMs |
p99 > 100ms | Disk performance degradation |
| Consumer group lag | Growing trend | Consumers not keeping up; scale or tune |
JMX metrics collection
Export via JMX exporter to Prometheus. Key bean paths:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.controller:type=KafkaController,name=ActiveControllerCount
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*Security
Authentication
- SASL/SCRAM: Recommended for most deployments. Supports dynamic credential management without broker restart.
- mTLS: Use for service-to-service in zero-trust environments. More operational overhead (certificate management).
- SASL/OAUTHBEARER: For environments with existing OAuth infrastructure.
Authorization
Use Kafka ACLs with a deny-by-default policy:
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:order-service \
--operation Read --operation Write \
--topic orders \
--group order-processor-groupEncryption
- In-transit: Enable
ssl.protocol=TLSv1.3on all listeners. - At-rest: Use filesystem-level encryption (LUKS, dm-crypt) or cloud-managed encrypted volumes. Kafka does not provide native at-rest encryption.
KRaft Migration
Kafka 3.3+ supports KRaft (Kafka Raft) mode, removing the ZooKeeper dependency.
- New clusters: Always use KRaft mode.
- Existing clusters: Migrate using the
kafka-metadata.shtool. Test thoroughly in staging first. - KRaft benefits: Faster controller failover, simplified operations, no ZK dependency.
- KRaft controllers need dedicated nodes in large clusters (100+ brokers).
stream-processing-patterns.md
Stream Processing Patterns
Windowing Deep Dive
Tumbling windows
Non-overlapping, fixed-size windows. Every event belongs to exactly one window.
Time: |---W1---|---W2---|---W3---|
Events: e1 e2 e3 e4 e5 e6Best for: periodic aggregations (per-minute counts, hourly summaries).
Sliding windows
Fixed-size windows that advance by a slide interval. Events belong to multiple windows.
Window size: 10min, Slide: 5min
Time: |---W1 (0-10)---|
|---W2 (5-15)---|
|---W3 (10-20)---|Best for: moving averages, rolling metrics, trend detection.
Sliding windows with small slide intervals create many overlapping windows and multiply state size. A 1-hour window with 1-second slides creates 3600 windows per key. Use sparingly.
Session windows
Variable-size windows defined by an inactivity gap. A new event after the gap starts a new session.
Gap: 5min
Events: e1 e2 [5min gap] e3 e4 e5 [5min gap] e6
Windows: |--S1--| |---S2---| |S3|Best for: user session analysis, activity grouping, click-stream analysis.
Global windows with custom triggers
A single window per key that never closes. Use custom triggers to emit results.
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(100)))Best for: batch-like processing on streams (emit every N elements).
Join Patterns
Stream-stream joins
Join two event streams within a time window. Both sides are buffered.
orderStream
.keyBy(Order::getOrderId)
.intervalJoin(paymentStream.keyBy(Payment::getOrderId))
.between(Time.seconds(-5), Time.minutes(30))
.process(new OrderPaymentJoinFunction());between(-5s, +30min): Match payments that arrive between 5 seconds before and 30 minutes after the order.- Both streams are buffered in state for the join window duration.
- State grows with: (event rate) x (window duration) x (key cardinality).
Stream-table joins (enrichment)
Join a stream against a slowly-changing dimension (lookup table).
Kafka Streams:
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");
orders.join(customers, (order, customer) -> enrich(order, customer));Flink (temporal table join):
SELECT o.*, c.name, c.tier
FROM orders o
JOIN customers FOR SYSTEM_TIME AS OF o.event_time AS c
ON o.customer_id = c.id;The temporal join uses the version of the customer record valid at the order's event time, not the current version. Essential for correct historical analysis.
Table-table joins
Both sides are materialized as tables. Produces a new table that updates when either input changes.
KTable<String, Order> orders = builder.table("orders");
KTable<String, Shipment> shipments = builder.table("shipments");
KTable<String, OrderStatus> status = orders.join(
shipments,
(order, shipment) -> new OrderStatus(order, shipment)
);Deduplication
Idempotent sinks
The simplest approach: make the sink handle duplicates.
- Database upserts:
INSERT ... ON CONFLICT DO UPDATE - Redis:
SETis naturally idempotent - Elasticsearch: Index with a deterministic
_id
Stream-level deduplication
Use state to track seen event IDs within a window.
events
.keyBy(Event::getEventId)
.process(new DeduplicationFunction(Time.minutes(10)));
// DeduplicationFunction keeps a ValueState<Boolean> per key with TTL
// Emits the event only on first occurrence within the TTL windowSet TTL aggressively. Dedup state grows linearly with unique event IDs. A 24-hour dedup window on a high-cardinality stream can consume TBs of state.
Kafka-level deduplication
Idempotent producers handle network-retry duplicates automatically. For application- level duplicates (same business event sent twice), use a compacted topic as a dedup buffer:
- Produce to a compacted intermediate topic keyed by dedup ID
- Consumer reads from the compacted topic (only latest per key)
- Compaction removes earlier duplicates over time
Watermark Strategies
Bounded out-of-orderness
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))Watermark = max observed event time - 30 seconds. Simple and effective when you know the maximum expected delay.
Custom watermark generator
WatermarkStrategy.<Event>forGenerator(ctx -> new WatermarkGenerator<Event>() {
private long maxTimestamp = Long.MIN_VALUE;
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
}
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - 30000));
}
});Use custom generators when:
- Different sources have different latency characteristics
- You need to emit watermarks based on external signals (e.g., a "flush" event)
Handling idle sources
.withIdleness(Duration.ofMinutes(1))If a partition produces no events for 1 minute, it is marked idle and excluded from watermark calculation. Without this, one idle partition blocks all windows.
Watermark alignment (Flink 1.15+)
.withWatermarkAlignment("alignment-group", Duration.ofSeconds(20))Prevents fast sources from advancing the watermark too far ahead of slow sources. Limits the drift between the fastest and slowest source in the alignment group.
Exactly-Once Patterns
End-to-end exactly-once checklist
- Source: Kafka consumer with committed offsets (replay on failure)
- Processing: Flink checkpointing or Kafka Streams state stores
- Sink: One of:
- Idempotent writes (upserts, conditional writes)
- Two-phase commit sink (Flink's
TwoPhaseCommitSinkFunction) - Transactional Kafka producer (atomic offset + output commit)
Two-phase commit sinks (Flink)
public class ExactlyOnceDatabaseSink extends TwoPhaseCommitSinkFunction<Record, Connection, Void> {
protected Connection beginTransaction() {
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
return conn;
}
protected void invoke(Connection txn, Record value, Context ctx) {
// Write to database within transaction
}
protected void preCommit(Connection txn) {
txn.flush(); // Ensure all writes are sent
}
protected void commit(Connection txn) {
txn.commit();
txn.close();
}
protected void abort(Connection txn) {
txn.rollback();
txn.close();
}
}Two-phase commit sinks hold open transactions between checkpoints. This means the transaction duration equals the checkpoint interval. Set checkpoint interval low enough to avoid long-running transactions that cause database lock contention.
Backpressure Handling
Flink (built-in)
Flink uses credit-based flow control. When a downstream operator is slow:
- It stops granting credits to the upstream operator
- The upstream operator buffers locally until credits are available
- Backpressure propagates upstream to the source
Diagnosing: Check the Flink Web UI's backpressure tab. Look for operators with
HIGH backpressure ratio. The bottleneck is typically the first operator that is
NOT backpressured (it's the slow one causing pressure upstream).
Kafka consumers
Kafka does not have built-in backpressure. Instead:
- Reduce
max.poll.recordsto process smaller batches - Increase
max.poll.interval.msto allow longer processing time - Use
pause()/resume()on specific partitions to temporarily stop fetching
consumer.pause(overloadedPartitions);
// Process existing records
consumer.resume(overloadedPartitions);Event Sourcing with Streams
Pattern: Event store on Kafka
Command -> Validate -> Event (Kafka topic, key=entityId)
|
+-> Projector 1 -> Read DB (PostgreSQL)
+-> Projector 2 -> Search index (Elasticsearch)
+-> Projector 3 -> Analytics (ClickHouse)- Use a compacted + time-retained topic for the event store
- Key by entity ID for ordering guarantees per entity
- Projectors are independent Kafka consumers (different consumer groups)
- Each projector materializes a different read model
Snapshots for long event histories
When replaying hundreds of thousands of events per entity is too slow:
- Periodically write a snapshot to a separate compacted topic
- On recovery, load the latest snapshot, then replay events after the snapshot offset
- Snapshot frequency: every N events or every T minutes per entity
Frequently Asked Questions
What is real-time-streaming?
Use this skill when building real-time data pipelines, stream processing jobs, or change data capture systems. Triggers on tasks involving Apache Kafka (producers, consumers, topics, partitions, consumer groups, Connect, Streams), Apache Flink (DataStream API, windowing, checkpointing, stateful processing), event sourcing implementations, CDC with Debezium, stream processing patterns (windowing, watermarks, exactly-once semantics), and any pipeline that processes unbounded data in motion rather than data at rest.
How do I install real-time-streaming?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill real-time-streaming in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support real-time-streaming?
real-time-streaming works with claude-code, gemini-cli, openai-codex, mcp. Install it once and use it across any supported AI coding agent.