[Feature Request] Support raw Kafka record (ConsumeResult<TKey, TValue>) in .NET Isolated Worker
Summary
Add support for binding to raw Kafka records with full metadata access in the .NET Isolated Worker model. Currently, the KafkaTrigger only supports binding to string, byte[], or our abstracted KafkaEventData types, which limits access to important Kafka message metadata.
Problem Statement
Customer Scenario
Customers using the .NET Isolated Worker (the recommended hosting model) need access to the complete Kafka message metadata, similar to what Confluent.Kafka.ConsumeResult<TKey, TValue> provides. Current limitations:
| Available Today |
Missing |
Key, Value |
Full Message<TKey, TValue> object |
Offset, Partition, Topic |
IsPartitionEOF flag |
Timestamp |
LeaderEpoch |
Headers (copied) |
Original Confluent.Kafka.Headers instance |
|
TopicPartitionOffset as a single object |
Why This Matters
- Custom header processing - Correlation IDs, trace context, custom routing metadata
- Precise offset management - Manual commit scenarios, idempotency patterns
- Partition-aware logic - Sharding strategies based on partition info
- Integration with existing code - Organizations often have internal libraries that expect
ConsumeResult types
Comparison with Other Triggers
EventHubs and ServiceBus extensions already support binding to their raw SDK types:
| Trigger |
Abstracted Type |
Raw SDK Type (Supported) |
| EventHubs |
Custom data types |
Azure.Messaging.EventHubs.EventData ✅ |
| ServiceBus |
Custom data types |
Azure.Messaging.ServiceBus.ServiceBusReceivedMessage ✅ |
| Kafka |
KafkaEventData<TKey, TValue> |
Confluent.Kafka.ConsumeResult<TKey, TValue> ❌ |
Proposed Architecture
Option A: Custom KafkaConsumeResult Type (Recommended for Initial Implementation)
Since Confluent.Kafka.ConsumeResult<TKey, TValue> lacks a factory method to reconstruct from serialized bytes (unlike Azure SDK types which use AMQP), we propose creating a new type that mirrors its structure.
┌─────────────────────────────────────────────────────────────────────────┐
│ HOST PROCESS (WebJobs Extension) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ KafkaListener │ │
│ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │
│ │ │ Consume() │───►│ ConsumeResult<TKey, TValue> │ │ │
│ │ │ (librdkafka) │ │ (Confluent.Kafka) │ │ │
│ │ └─────────────────┘ └──────────────┬───────────────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────▼───────────────────┐ │ │
│ │ │ Serialize to ModelBindingData │ │ │
│ │ │ (MessagePack/Proto format) │ │ │
│ │ │ - All metadata preserved │ │ │
│ │ └───────────────┬───────────────────┘ │ │
│ └─────────────────────────────────────────┼───────────────────────┘ │
│ │ gRPC │
└─────────────────────────────────────────────┼───────────────────────────┘
│
┌─────────────────────────────────────────────┼───────────────────────────┐
│ WORKER PROCESS (Isolated Extension) │
│ │ │
│ ┌──────────────────────────────────────────▼──────────────────────┐ │
│ │ KafkaConsumeResultConverter : IInputConverter │ │
│ │ [SupportsDeferredBinding] │ │
│ │ [SupportedTargetType(typeof(KafkaConsumeResult<,>))] │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌──────────────────────────────────┐ │ │
│ │ │ ModelBindingData│───►│ Deserialize to │ │ │
│ │ │ (bytes) │ │ KafkaConsumeResult<TKey, TValue> │ │ │
│ │ └─────────────────┘ └──────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┬──────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────▼─────────────────────┐ │
│ │ User Function │ │
│ │ │ │
│ │ [Function("KafkaTrigger")] │ │
│ │ public void Run( │ │
│ │ [KafkaTrigger(...)] │ │
│ │ KafkaConsumeResult<string, MyEvent> record) │ │
│ │ { │ │
│ │ var offset = record.Offset; │ │
│ │ var partition = record.Partition; │ │
│ │ var headers = record.Message.Headers; │ │
│ │ var value = record.Message.Value; │ │
│ │ // Full metadata access! │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Option B: Upstream Confluent.Kafka Enhancement (Long-term)
There are active proposals in the Confluent.Kafka repository that could simplify this:
If #2157 is adopted:
// In WebJobs Extension - could directly create our type without intermediate serialization
TConsumeResult Consume<TConsumeResult>(
int millisecondsTimeout,
IConsumeResultFactory<TKey, TValue, TConsumeResult> factory);
This would eliminate the need for custom serialization, as we could implement IConsumeResultFactory to create KafkaConsumeResult directly.
Required Changes
Repository: Azure/azure-functions-kafka-extension (WebJobs Extension)
| File/Area |
Change |
KafkaEventData.cs |
Add serialization support for full metadata |
Trigger/KafkaTriggerBindingStrategy.cs |
Support ModelBindingData creation with full record |
New: Serialization/KafkaRecordSerializer.cs |
MessagePack/Proto serializer for ConsumeResult |
Microsoft.Azure.WebJobs.Extensions.Kafka.csproj |
Add serialization dependencies |
Repository: Azure/azure-functions-dotnet-worker (Isolated Worker Extension)
| File/Area |
Change |
New: KafkaConsumeResult.cs |
New type mirroring ConsumeResult<TKey, TValue> structure |
New: KafkaMessage.cs |
New type mirroring Message<TKey, TValue> |
New: KafkaConsumeResultConverter.cs |
IInputConverter implementation |
New: KafkaHeaders.cs |
Headers collection type |
Worker.Extensions.Kafka.csproj |
Add serialization dependencies |
Shared: Serialization Schema
Define a shared schema (MessagePack or Protocol Buffers):
// Example Proto definition
message KafkaConsumeResultProto {
string topic = 1;
int32 partition = 2;
int64 offset = 3;
optional int32 leader_epoch = 4;
KafkaMessageProto message = 5;
bool is_partition_eof = 6;
}
message KafkaMessageProto {
bytes key = 1;
bytes value = 2;
repeated KafkaHeaderProto headers = 3;
KafkaTimestampProto timestamp = 4;
}
message KafkaHeaderProto {
string key = 1;
bytes value = 2;
}
message KafkaTimestampProto {
int64 unix_timestamp_ms = 1;
int32 type = 2; // CreateTime, LogAppendTime, etc.
}
User Experience
New API (Non-Breaking Addition)
Users can opt-in to the new type while existing code continues to work:
// ✅ Existing (continues to work)
[Function("ExistingTrigger")]
public void Run([KafkaTrigger(...)] string message) { }
[Function("ExistingTriggerBytes")]
public void Run([KafkaTrigger(...)] byte[] message) { }
// ✅ NEW: Full record access
[Function("NewTrigger")]
public void Run([KafkaTrigger(...)] KafkaConsumeResult<string, MyEvent> record)
{
// Access everything
Console.WriteLine($"Topic: {record.Topic}");
Console.WriteLine($"Partition: {record.Partition}");
Console.WriteLine($"Offset: {record.Offset}");
Console.WriteLine($"LeaderEpoch: {record.LeaderEpoch}");
Console.WriteLine($"Key: {record.Message.Key}");
Console.WriteLine($"Value: {record.Message.Value}");
Console.WriteLine($"Timestamp: {record.Message.Timestamp}");
// Headers with full API
foreach (var header in record.Message.Headers)
{
Console.WriteLine($"Header: {header.Key} = {Encoding.UTF8.GetString(header.GetValueBytes())}");
}
}
// ✅ NEW: Batch mode
[Function("BatchTrigger")]
public void Run([KafkaTrigger(...)] KafkaConsumeResult<string, MyEvent>[] records)
{
foreach (var record in records)
{
// Process each record with full metadata
}
}
Breaking Changes
None - This is an additive feature. All existing binding types (string, byte[], KafkaEventData<T>) continue to work.
Estimated Effort
| Task |
Estimate |
| Serialization schema design |
3 days |
| WebJobs Extension changes |
5 days |
| Isolated Worker Extension changes |
5 days |
| End-to-end testing |
5 days |
| Documentation |
2 days |
| Total |
~4 weeks |
Next Steps
-
Engage with Confluent - Comment on #2157 to express Azure Functions' interest in the factory pattern. If adopted, it would significantly simplify our implementation.
-
Design Review - Finalize serialization format (MessagePack vs Proto) and KafkaConsumeResult API surface.
-
Prototype - Create a proof-of-concept in a feature branch to validate the approach.
-
Customer Validation - Share the proposed API with customers to confirm it meets their requirements.
-
Implementation - Proceed with full implementation following design review approval.
Related Issues
/cc @Azure/azure-functions-kafka
[Feature Request] Support raw Kafka record (
ConsumeResult<TKey, TValue>) in .NET Isolated WorkerSummary
Add support for binding to raw Kafka records with full metadata access in the .NET Isolated Worker model. Currently, the
KafkaTriggeronly supports binding tostring,byte[], or our abstractedKafkaEventDatatypes, which limits access to important Kafka message metadata.Problem Statement
Customer Scenario
Customers using the .NET Isolated Worker (the recommended hosting model) need access to the complete Kafka message metadata, similar to what
Confluent.Kafka.ConsumeResult<TKey, TValue>provides. Current limitations:Key,ValueMessage<TKey, TValue>objectOffset,Partition,TopicIsPartitionEOFflagTimestampLeaderEpochHeaders(copied)Confluent.Kafka.HeadersinstanceTopicPartitionOffsetas a single objectWhy This Matters
ConsumeResulttypesComparison with Other Triggers
EventHubs and ServiceBus extensions already support binding to their raw SDK types:
Azure.Messaging.EventHubs.EventData✅Azure.Messaging.ServiceBus.ServiceBusReceivedMessage✅KafkaEventData<TKey, TValue>Confluent.Kafka.ConsumeResult<TKey, TValue>❌Proposed Architecture
Option A: Custom
KafkaConsumeResultType (Recommended for Initial Implementation)Since
Confluent.Kafka.ConsumeResult<TKey, TValue>lacks a factory method to reconstruct from serialized bytes (unlike Azure SDK types which use AMQP), we propose creating a new type that mirrors its structure.Option B: Upstream Confluent.Kafka Enhancement (Long-term)
There are active proposals in the Confluent.Kafka repository that could simplify this:
MessageReaderIf #2157 is adopted:
This would eliminate the need for custom serialization, as we could implement
IConsumeResultFactoryto createKafkaConsumeResultdirectly.Required Changes
Repository:
Azure/azure-functions-kafka-extension(WebJobs Extension)KafkaEventData.csTrigger/KafkaTriggerBindingStrategy.csModelBindingDatacreation with full recordSerialization/KafkaRecordSerializer.csConsumeResultMicrosoft.Azure.WebJobs.Extensions.Kafka.csprojRepository:
Azure/azure-functions-dotnet-worker(Isolated Worker Extension)KafkaConsumeResult.csConsumeResult<TKey, TValue>structureKafkaMessage.csMessage<TKey, TValue>KafkaConsumeResultConverter.csIInputConverterimplementationKafkaHeaders.csWorker.Extensions.Kafka.csprojShared: Serialization Schema
Define a shared schema (MessagePack or Protocol Buffers):
User Experience
New API (Non-Breaking Addition)
Users can opt-in to the new type while existing code continues to work:
Breaking Changes
None - This is an additive feature. All existing binding types (
string,byte[],KafkaEventData<T>) continue to work.Estimated Effort
Next Steps
Engage with Confluent - Comment on #2157 to express Azure Functions' interest in the factory pattern. If adopted, it would significantly simplify our implementation.
Design Review - Finalize serialization format (MessagePack vs Proto) and
KafkaConsumeResultAPI surface.Prototype - Create a proof-of-concept in a feature branch to validate the approach.
Customer Validation - Share the proposed API with customers to confirm it meets their requirements.
Implementation - Proceed with full implementation following design review approval.
Related Issues
/cc @Azure/azure-functions-kafka