Skip to content

[Feature Request] Support raw Kafka record (ConsumeResult<TKey, TValue>) in .NET Isolated Worker #612

@TsuyoshiUshio

Description

@TsuyoshiUshio

[Feature Request] Support raw Kafka record (ConsumeResult<TKey, TValue>) in .NET Isolated Worker

Summary

Add support for binding to raw Kafka records with full metadata access in the .NET Isolated Worker model. Currently, the KafkaTrigger only supports binding to string, byte[], or our abstracted KafkaEventData types, which limits access to important Kafka message metadata.

Problem Statement

Customer Scenario

Customers using the .NET Isolated Worker (the recommended hosting model) need access to the complete Kafka message metadata, similar to what Confluent.Kafka.ConsumeResult<TKey, TValue> provides. Current limitations:

Available Today Missing
Key, Value Full Message<TKey, TValue> object
Offset, Partition, Topic IsPartitionEOF flag
Timestamp LeaderEpoch
Headers (copied) Original Confluent.Kafka.Headers instance
TopicPartitionOffset as a single object

Why This Matters

  1. Custom header processing - Correlation IDs, trace context, custom routing metadata
  2. Precise offset management - Manual commit scenarios, idempotency patterns
  3. Partition-aware logic - Sharding strategies based on partition info
  4. Integration with existing code - Organizations often have internal libraries that expect ConsumeResult types

Comparison with Other Triggers

EventHubs and ServiceBus extensions already support binding to their raw SDK types:

Trigger Abstracted Type Raw SDK Type (Supported)
EventHubs Custom data types Azure.Messaging.EventHubs.EventData
ServiceBus Custom data types Azure.Messaging.ServiceBus.ServiceBusReceivedMessage
Kafka KafkaEventData<TKey, TValue> Confluent.Kafka.ConsumeResult<TKey, TValue>

Proposed Architecture

Option A: Custom KafkaConsumeResult Type (Recommended for Initial Implementation)

Since Confluent.Kafka.ConsumeResult<TKey, TValue> lacks a factory method to reconstruct from serialized bytes (unlike Azure SDK types which use AMQP), we propose creating a new type that mirrors its structure.

┌─────────────────────────────────────────────────────────────────────────┐
│                        HOST PROCESS (WebJobs Extension)                 │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │  KafkaListener                                                   │   │
│  │  ┌─────────────────┐    ┌──────────────────────────────────┐   │   │
│  │  │ Consume()       │───►│ ConsumeResult<TKey, TValue>      │   │   │
│  │  │ (librdkafka)    │    │ (Confluent.Kafka)                │   │   │
│  │  └─────────────────┘    └──────────────┬───────────────────┘   │   │
│  │                                         │                       │   │
│  │                         ┌───────────────▼───────────────────┐   │   │
│  │                         │ Serialize to ModelBindingData    │   │   │
│  │                         │ (MessagePack/Proto format)        │   │   │
│  │                         │ - All metadata preserved          │   │   │
│  │                         └───────────────┬───────────────────┘   │   │
│  └─────────────────────────────────────────┼───────────────────────┘   │
│                                             │ gRPC                      │
└─────────────────────────────────────────────┼───────────────────────────┘
                                              │
┌─────────────────────────────────────────────┼───────────────────────────┐
│                     WORKER PROCESS (Isolated Extension)                 │
│                                             │                           │
│  ┌──────────────────────────────────────────▼──────────────────────┐   │
│  │  KafkaConsumeResultConverter : IInputConverter                   │   │
│  │  [SupportsDeferredBinding]                                       │   │
│  │  [SupportedTargetType(typeof(KafkaConsumeResult<,>))]           │   │
│  │                                                                   │   │
│  │  ┌─────────────────┐    ┌──────────────────────────────────┐    │   │
│  │  │ ModelBindingData│───►│ Deserialize to                   │    │   │
│  │  │ (bytes)         │    │ KafkaConsumeResult<TKey, TValue> │    │   │
│  │  └─────────────────┘    └──────────────────────────────────┘    │   │
│  └──────────────────────────────────────────┬──────────────────────┘   │
│                                              │                          │
│  ┌───────────────────────────────────────────▼─────────────────────┐   │
│  │  User Function                                                   │   │
│  │                                                                   │   │
│  │  [Function("KafkaTrigger")]                                      │   │
│  │  public void Run(                                                │   │
│  │      [KafkaTrigger(...)]                                         │   │
│  │      KafkaConsumeResult<string, MyEvent> record)                 │   │
│  │  {                                                               │   │
│  │      var offset = record.Offset;                                 │   │
│  │      var partition = record.Partition;                           │   │
│  │      var headers = record.Message.Headers;                       │   │
│  │      var value = record.Message.Value;                           │   │
│  │      // Full metadata access!                                    │   │
│  │  }                                                               │   │
│  └──────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

Option B: Upstream Confluent.Kafka Enhancement (Long-term)

There are active proposals in the Confluent.Kafka repository that could simplify this:

Issue Description Status
confluentinc/confluent-kafka-dotnet#2157 Factory pattern for custom consume result types Open (Dec 2023)
confluentinc/confluent-kafka-dotnet#2543 Allocation-free API with MessageReader Open (Nov 2025)

If #2157 is adopted:

// In WebJobs Extension - could directly create our type without intermediate serialization
TConsumeResult Consume<TConsumeResult>(
    int millisecondsTimeout, 
    IConsumeResultFactory<TKey, TValue, TConsumeResult> factory);

This would eliminate the need for custom serialization, as we could implement IConsumeResultFactory to create KafkaConsumeResult directly.


Required Changes

Repository: Azure/azure-functions-kafka-extension (WebJobs Extension)

File/Area Change
KafkaEventData.cs Add serialization support for full metadata
Trigger/KafkaTriggerBindingStrategy.cs Support ModelBindingData creation with full record
New: Serialization/KafkaRecordSerializer.cs MessagePack/Proto serializer for ConsumeResult
Microsoft.Azure.WebJobs.Extensions.Kafka.csproj Add serialization dependencies

Repository: Azure/azure-functions-dotnet-worker (Isolated Worker Extension)

File/Area Change
New: KafkaConsumeResult.cs New type mirroring ConsumeResult<TKey, TValue> structure
New: KafkaMessage.cs New type mirroring Message<TKey, TValue>
New: KafkaConsumeResultConverter.cs IInputConverter implementation
New: KafkaHeaders.cs Headers collection type
Worker.Extensions.Kafka.csproj Add serialization dependencies

Shared: Serialization Schema

Define a shared schema (MessagePack or Protocol Buffers):

// Example Proto definition
message KafkaConsumeResultProto {
    string topic = 1;
    int32 partition = 2;
    int64 offset = 3;
    optional int32 leader_epoch = 4;
    KafkaMessageProto message = 5;
    bool is_partition_eof = 6;
}

message KafkaMessageProto {
    bytes key = 1;
    bytes value = 2;
    repeated KafkaHeaderProto headers = 3;
    KafkaTimestampProto timestamp = 4;
}

message KafkaHeaderProto {
    string key = 1;
    bytes value = 2;
}

message KafkaTimestampProto {
    int64 unix_timestamp_ms = 1;
    int32 type = 2;  // CreateTime, LogAppendTime, etc.
}

User Experience

New API (Non-Breaking Addition)

Users can opt-in to the new type while existing code continues to work:

// ✅ Existing (continues to work)
[Function("ExistingTrigger")]
public void Run([KafkaTrigger(...)] string message) { }

[Function("ExistingTriggerBytes")]  
public void Run([KafkaTrigger(...)] byte[] message) { }

// ✅ NEW: Full record access
[Function("NewTrigger")]
public void Run([KafkaTrigger(...)] KafkaConsumeResult<string, MyEvent> record)
{
    // Access everything
    Console.WriteLine($"Topic: {record.Topic}");
    Console.WriteLine($"Partition: {record.Partition}");
    Console.WriteLine($"Offset: {record.Offset}");
    Console.WriteLine($"LeaderEpoch: {record.LeaderEpoch}");
    Console.WriteLine($"Key: {record.Message.Key}");
    Console.WriteLine($"Value: {record.Message.Value}");
    Console.WriteLine($"Timestamp: {record.Message.Timestamp}");
    
    // Headers with full API
    foreach (var header in record.Message.Headers)
    {
        Console.WriteLine($"Header: {header.Key} = {Encoding.UTF8.GetString(header.GetValueBytes())}");
    }
}

// ✅ NEW: Batch mode
[Function("BatchTrigger")]
public void Run([KafkaTrigger(...)] KafkaConsumeResult<string, MyEvent>[] records)
{
    foreach (var record in records)
    {
        // Process each record with full metadata
    }
}

Breaking Changes

None - This is an additive feature. All existing binding types (string, byte[], KafkaEventData<T>) continue to work.


Estimated Effort

Task Estimate
Serialization schema design 3 days
WebJobs Extension changes 5 days
Isolated Worker Extension changes 5 days
End-to-end testing 5 days
Documentation 2 days
Total ~4 weeks

Next Steps

  1. Engage with Confluent - Comment on #2157 to express Azure Functions' interest in the factory pattern. If adopted, it would significantly simplify our implementation.

  2. Design Review - Finalize serialization format (MessagePack vs Proto) and KafkaConsumeResult API surface.

  3. Prototype - Create a proof-of-concept in a feature branch to validate the approach.

  4. Customer Validation - Share the proposed API with customers to confirm it meets their requirements.

  5. Implementation - Proceed with full implementation following design review approval.


Related Issues

/cc @Azure/azure-functions-kafka

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions