programing

Spring Kafka 클래스가 신뢰할 수 있는 패키지에 없습니다.

madecode 2023. 3. 14. 22:02
반응형

Spring Kafka 클래스가 신뢰할 수 있는 패키지에 없습니다.

업데이트 Boot에서 Spring Boot/Kafka 클래스를 했습니다.org.telegram.telegrambots.api.objects.Update카프카 토픽에 글을 올리기 위해서입니다.는 지금 다음과 것을 하고 있습니다.org.telegram.telegrambots.meta.api.objects.Update을 사용법

애플리케이션을 재기동한 후, 다음의 문제가 발생했습니다.

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

설정은 다음과 같습니다.

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

어떻게 하면 이 문제를 해결하고 Kafka가 오래된 메시지를 새로운 메시지로 역직렬화 할 수 있을까요?

갱신했다

이쪽은 제 청취자입니다.

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}

메뉴얼을 참조해 주세요.

버전 2.1부터는 레코드 헤더에서 유형 정보를 전달할 수 있어 여러 유형을 처리할 수 있습니다.또한 시리얼라이저/디시리얼라이저는 Kafka 속성을 사용하여 구성할 수 있습니다.

Json Serializer 입니다.ADD_TYPE_INFO_HEADers(기본값 true). JsonSerializer에서 이 기능을 비활성화하려면 false로 설정합니다(addTypeInfo 속성을 설정합니다).

JsonDeserializer 입니다.KEY_DEFAULT_TYPE. 헤더 정보가 존재하지 않는 경우 키를 역직렬화하는 폴백타입.

JsonDeserializer 입니다.VALUE_DEFAULT_TYPE. 헤더 정보가 존재하지 않는 경우 값을 역직렬화하는 폴백타입.

JsonDeserializer 입니다.TRUSTED_PACKAGES(기본값 java.util, java.lang).직렬화 해제에 사용할 수 있는 패키지 패턴의 쉼표 목록.*는 모든 직렬화를 해제함을 의미합니다.

기본적으로는 시리얼라이저는 헤더에 유형 정보를 추가합니다.

부트 메뉴얼을 참조해 주세요.

마찬가지로 헤더에 유형 정보를 보내는 JsonSerializer 기본 동작을 비활성화할 수 있습니다.

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

또는 인바운드 메시지 변환기에 유형 매핑을 추가하여 소스 유형을 대상 유형에 매핑할 수 있습니다.

편집

그런데 어떤 버전을 사용하시나요?

두 가지 핵심 사항을 언급해야 합니다.

  1. Producer와 Consumer를 위한 두 개의 분리된 프로젝트가 있습니다.
  2. 그러면 메시지(값)를 보내는 것은 개체 유형으로 다소 원시적인 유형입니다.

문제는 생산 메시지오브젝트가 2개의 개별 프로젝트이기 때문에 사용자 측에서 사용할 수 없다는 것입니다.

이 문제를 해결하려면 다음 Spring Boot Producer 어플리케이션과 Consumer 어플리케이션에서 설명하는 절차를 따르십시오.

-----------------------------------------------------

** 프로듀서 구성 클래스 **

import com.kafka.producer.models.Container;    
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, Container> producerFactory(){

    Map<String, Object> config = new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory(config);
}

@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
}

주의: 컨테이너는 카프카 토픽에 게시되는 커스텀오브젝트입니다.


** 프로듀서 클래스 **

import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class Producer {

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";

@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;

public void sendUserMessage(Container msg) {
    LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
    Message<Container> message = MessageBuilder
            .withPayload(msg)
            .setHeader(KafkaHeaders.TOPIC, TOPIC)
            .build();
    this.kafkaTemplate.send(message);
}
}

** 생산자 컨트롤러 **

import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/message")
public class MessageController {

@Autowired
private Producer producer;

@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
    this.producer.sendUserMessage(containerMsg);
    return "Successfully Published !!";
}
}

주의: Container 유형의 메시지는 kafka topic name:final-topic에 JSON 메시지로 게시됩니다.

===============================================================================

-- 컨슈머 앱 --

** 컨피규레이션클래스 **

import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {

@Bean
public ConsumerFactory<String, Container> consumerFactory(){
    JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
    deserializer.setRemoveTypeHeaders(false);
    deserializer.addTrustedPackages("*");
    deserializer.setUseTypeMapperForKey(true);

    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;

}
}

주의: 여기에서는 기본 JsonDeserializer()를 사용하는 대신 커스텀 JsonDeserializer를 사용하여 최종 토픽(토픽 이름)에서 컨테이너 개체 유형 Json Messages를 소비해야 합니다.


** 개인고객서비스 **

import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class ConsumerOne {

private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);

@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
    System.out.println("received data in Consumer One ="+ msg.getMessageTypes());
}
}

이 방법에는 deserializer와 application.yml의 두 가지 방법이 있습니다.

디시리얼라이저 중 하나

중인 디세리얼라이저에서는인 디세리얼라이저DefaultKafkaConsumerFactory(고객용 팩토리를 만듭니다). Let's say you want to make a 예를 들어, 당신이 어떤 것을 만들고 싶다고 가정해 봅시다.ConsumerFactory<String, Foo> with 와 함께Foo모델/POJJJJ는 KA 메시지에 사용할 수 있습니다.카프카 메시지에 사용할 모델/POJO가 됩니다.

할 ?요 to다 need you있가?addTrustedPackages부에서JsonDeserializer코토린에서 예를 들어 있지만 자바에서 거의 동일한 구문입니다.코틀린 자바

 val deserializer = JsonDeserializer<Foo>()
 deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages

 val consumerFactory = DefaultKafkaConsumerFactory(
      consumerConfigs(),  // your consumer config
      StringDeserializer(), 
      deserializer // Using our newly created deserializer
 )

또는 어플리케이션.yml에 있습니다.

spring-kafka 명령어 뒤에 있는 application.yml 파일.com.example.entity에서 Foo 클래스를 추가합니다.다음을 사용하여 신뢰할 수 있는 저장소의 Foo 패키지:

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.example.entity.Foo"

★★★★★★★★★★★★★★★★ spring.json.trusted.packages패키지 배열을 수락합니다.패키지를 , 「」를 사용할 수도 있습니다.*를 참조해 주세요.는 꼭 됩니다.deserializerDefaultKafkaConsumerFactory()컨슈머 컨피규레이션에서만 가능합니다.

jsonDeserializer.addTrustedPackages("*");

봄 카프카-2.2.8 문제를 해결했습니다.


application.properties:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

중요사항:

Kafka Consumer 및 Kafka Producer에 대해 Serializer 및 Deserializer 인스턴스를 각각 제공한 경우 이러한 인스턴스는 효과가 없습니다.

고고: :
[1] https://docs.spring.io/spring-kafka/reference/html/ #json-serde
[2] https://github.com/spring-projects/spring-kafka/issues/535

저도 이 문제에 직면해 있습니다만, 위의 해결방법은 효과가 없었습니다.그러나 Kafka 컨슈머 팩토리를 다음과 같이 구성한 것이 문제였습니다.

props.put(JsonDeserializer.TRUSTED_PACKAGES, "your.package.name");

Kafka가 2개의 다른 디시리얼라이저를 초기화하기 때문에 스트림 관련 작업을 할 때 이 문제에 직면한 사용자는 소비자와 스트림 모두에 대해 신뢰할 수 있는 패키지를 지정해야 합니다.

spring:
  kafka:
    bootstrap-servers: localhost:9092
  producer:
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  consumer:
...
    properties:
       spring.json.trusted.packages:  "YOUR ENTTIES PACKAGE"
  streams:
...
    properties:
      spring.json.trusted.packages: "YOUR ENTTIES PACKAGE"

컨슈머 앱에서 메시지를 소비하려고 할 때 두 가지 오류가 발생하는 유사한 문제가 있었습니다.

1-The class 'someClass' is not in the trusted packages: [java.util, java.lang,If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*)

2-org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition partion at offset 9902. If needed, please seek past the record to continue consumption.

이 속성을 추가하면 해결할 수 있습니다(JsonDeserializer).TRUSTED_PACKAGE)를 KafkaConfig 클래스의 config 생성 메서드(makeConfig)로 변환하여 이 접근법으로 설정을 소비합니다.이 문제는 해결되었습니다.

private Map<String, Object> makeConfig(ServiceMessagePriority input)
{
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.core.model.ServiceMsgDTO");
    configProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return configProps;
}

제 spring-kafka 버전은 2.2.11 이며, 이 에러도 발생했습니다.

이 에러는, 같은 카프타 토픽에서 다른 구성으로 2개의 컨슈머를 설정했기 때문에 발생합니다.그 중 하나는 Consumer Factory <String, Order DTO>이고 다른 하나는 Consumer Factory <String, String>입니다.

한 컨슈머의 설정이 잘못되어 변경된 오류를 해결했습니다.

토픽의 소비자를 체크하기만 하면 됩니다.

문제를 해결하려면 두 가지 방법이 있습니다.

  1. 생산자에서 유형 머리글 사용 안 함
  2. 소비자 내 신뢰할 수 있는 생산자 목록 추가

생산자에서 유형 머리글 사용 안 함

생산자 속성 파일은 다음과 같습니다.

spring:
  profiles: dev
  kafka:
    producer:
      bootstrap-servers: localhost:9092, localhost:9093
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        acks: 1
        spring:
          json:
            add:
              type:
                headers: false

그리고 당신의 소비자 재산 파일은 다음과 같아야 합니다.

spring:
  profiles: dev
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: consumer-group-1
      properties:
        spring:
          json:
            value:
              default:
                type: 'com.kafka.consumer.Message'

소비자 내 신뢰할 수 있는 생산자 목록 추가

생산자 속성 파일에서 아무것도 변경하지 않으려면 다음과 같이 소비자 속성 파일을 업데이트하십시오.

spring:
  profiles: dev
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: consumer-group-1
      properties:
        spring:
          json:
            value:
              default:
                type: 'com.kafka.consumer.Message'
            type:
              mapping: 'com.kafka.producer.Message:com.kafka.consumer.Message'
            trusted:
              packages: 'com.kafka.producer'

인마이Messageclass, noArgsConstructor와 함께 세터와 게터가 있는지 확인합니다.그렇지 않으면 동작하지 않습니다.

@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public
class Message {

    Integer id;

    String name;

    String address;

    String phone;

    boolean isActive;
}

컨슈머 리스너:

@Component
@Slf4j
public class KafkaMessageListener {

    @KafkaListener(topics = {"test_json"})
    public void OnMessage(Message rc){
        log.info(rc.getName());
    }
}

주의: My Producer와 Consumer는 두 개의 다른 프로젝트이며 패키지 이름은 다음과 같습니다.

제작자:com.kafka.producer.Message

개인 사용자:com.kafka.consumer.Message

저도 같은 문제가 있었는데, 알고 보니 제 application.yml 파일에 한 개의 들여쓰기가 너무 많았습니다.나는 Springboot에 완전히 익숙하지 않다.

이전:

  port: 5000

spring:
  data:
    mongodb:
      host: localhost
      port: 27017
      database: bankaccount
    kafka:
      producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

그 후:

  port: 5000

spring:
  data:
    mongodb:
      host: localhost
      port: 27017
      database: bankaccount
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

소비자 및 생산자 모두에 동일한 패키지 이름을 사용합니다. 그것은 나의 오류를 해결했습니다.

언급URL : https://stackoverflow.com/questions/51688924/spring-kafka-the-class-is-not-in-the-trusted-packages

반응형