1

I'm new to Dapr as well as Kafka. I want to implement Dapr with Kafka(KRaft) for my .NET 8 desktop app. I'm working on POC, there I'm facing issue:

'Dapr.DaprException: 'Publish operation failed: the Dapr endpoint indicated a failure'. ({"Status(StatusCode="Unavailable", Detail="Error connecting to subchannel.", DebugException="System.Net.Sockets.SocketException: No connection could be made because the target machine actively refused it.")"}).

docker-compose.yaml for Kafka image in Docker

version: "3.8"

services:
  kafka:
    image: docker.io/bitnami/kafka:3.7
    container_name: kafka
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
      - "./kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks"
      - "./kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks"
    environment:
      # KRaft settings
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      # Additional settings
      - KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
      - KAFKA_CREATE_TOPICS="kafka-topic:1:1"

volumes:
  kafka_data:
    driver: local

Program.cs

using Dapr.Client;

class Program
{
    static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        ConfigureServices(services);
        var serviceProvider = services.BuildServiceProvider();
        var daprClient = serviceProvider.GetRequiredService<DaprClient>();

        var message = new HelloMessage { Text = "Hello, Kafka!" };
        await PublishMessageEvent(daprClient, message);
    }

    //const string topicName = "kafka-topic";

    const string PUBSUB_NAME = "kafka-pubsub";
    const string TOPIC_NAME = "kafka-topic";
    static async Task PublishMessageEvent(DaprClient daprClient, HelloMessage message)
    {
        try
        {
            await daprClient.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, message);

            Console.WriteLine("Event published successfully.");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error publishing event: {ex.Message}");
        }
    }
    static void ConfigureServices(IServiceCollection services)
    {
        string daprPort = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT")!;
        var grpcEndpoint = $"http://localhost:{daprPort}";
        //var grpcEndpoint = $"http://localhost:52540";

        // var channel = GrpcChannel.ForAddress("http://localhost:5009/");
        // Register DaprClient instance using gRPC channel and DaprClientBuilder
        var daprClient = new DaprClientBuilder()
            .UseGrpcEndpoint(grpcEndpoint) // Specify the gRPC endpoint
            .Build();

        services.AddSingleton(daprClient);
    }
}

public class HelloMessage
{
    public string Text { get; set; } = default!;
}

Kafka pubsub yaml (Dapr configuration)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: "localhost:9092"
  - name: consumerGroup
    value: "my-group"
  - name: authRequired
    value: "false"
  - name: disableTls
    value: "true"

I have turned it off firewall. What am I really missing here?

1 Answer 1

0

The issue you are experiencing is because your Dapr component is pointing to a Kafka instance hosted at localhost when in reality, your app is running a containerised environment alongside another container named kafka.

So, if you'd like to get this working, you'd update your Dapr component as follows:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
  namespace: default
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: "kafka:9092"
  - name: consumerGroup
    value: "my-group"
  - name: authRequired
    value: "false"
  - name: disableTls
    value: "true"

Then simply compose down and back up again and your app should connect just fine :)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.