KAFKA PRODUCER/CONSUMER 쌤플

Posted by Albert 803Day 19Hour 23Min 7Sec ago [2023-02-06]

테스트 환경

INTELIJ MAVE JAVA

VirtualBox Centos KAFKA설치


kafka server.config 도메인 설정 필요(않하면 오류)

kafka 설치 폴더 이동

[kafka@localhost kafka_2.13-3.3.2]$ vi ./config/server.properties

기존 'listeners=PLAINTEXT://:9092 찾아서 주석줄 풀고 포트 앞 kafka 서버 주소 넣어주면 됨

==> listeners=PLAINTEXT://192.168.200.176:9092


MAVEN KAFKA dependency항목

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
</dependency>

</dependencies>

PRODUCER JAVA코드

package org.visionboy.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class kafkaProducer {

private static final Logger log = LoggerFactory.getLogger(kafkaProducer.class);

public static void main(String[] args) {

log.info("hello albert");

String bootstrapServer = "192.168.200.176:9092";

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// create a producer record
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("albert-topic-1", "hello world2222");

producer.send(producerRecord);

producer.flush();

producer.close();
}

}


CONSUMER JAVA코드

package org.visionboy.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class kafkaConsumer {

private static final Logger log = LoggerFactory.getLogger(kafkaConsumer.class);


public static void main(String[] args) {

String bootstrapServers = "192.168.200.176:9092";
String groupId = "my-fourth-application";
String topic = "albert-topic-1";

// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

// get a reference to the current thread
final Thread mainThread = Thread.currentThread();

// adding the shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
log.info("Detected a shutdown, let's exit by calling consumer.wakeup()...");
consumer.wakeup();

// join the main thread to allow the execution of the code in the main thread
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

try {

// subscribe consumer to our topic(s)
consumer.subscribe(Arrays.asList(topic));

// poll for new data
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
log.info("Key: " + record.key() + ", Value: " + record.value());
log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
}
}

} catch (WakeupException e) {
log.info("Wake up exception!");
// we ignore this as this is an expected exception when closing a consumer
} catch (Exception e) {
log.error("Unexpected exception", e);
} finally {
consumer.close(); // this will also commit the offsets if need be.
log.info("The consumer is now gracefully closed.");
}
}

}

GITHUB 링크 





LIST

Copyright © 2014 visionboy.me All Right Reserved.