Skip to content

Commit

Permalink
program added
Browse files Browse the repository at this point in the history
  • Loading branch information
Your Name committed Oct 12, 2024
1 parent f0324b4 commit 53cc69f
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.stream.error.handle.controller;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.BeanUtil;
import com.stream.error.handle.dto.UserRequest;
import com.stream.error.handle.util.CsvReaderUtils;
import com.stream.error.handle.model.User;
import com.stream.error.handle.producer.KafkaMessageProducer;

import org.apache.commons.beanutils.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
Expand All @@ -18,15 +23,27 @@ public class EventController {
private KafkaMessageProducer producer;

@Autowired
public EventController(KafkaMessageProducer publisher) {
this.producer = publisher;
public EventController(KafkaMessageProducer producer) {
this.producer = producer;
}

@PostMapping("/publish")
public ResponseEntity<?> publishEvent() {
@PostMapping("/publish/user")
public ResponseEntity<?> publishUserEvent(@RequestBody UserRequest userRequest) {


try {
User user = User.builder().id(userRequest.getId()).email(userRequest.getEmail()).gender(userRequest.getGender()).firstName(userRequest.getFirstName()).lastName(userRequest.getLastName()).ipAddress(userRequest.getIpAddress()).build();
producer.sendUserEvent(user);
return ResponseEntity.ok("Message published successfully");
} catch (Exception exception) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
@PostMapping("/publish/csv")
public ResponseEntity<?> publishEventFromCsvUser() {
try {
List<User> users = CsvReaderUtils.readDataFromCsv();
users.forEach(usr -> producer.sendEvents(usr));

producer.publishCsvUserEvent();
return ResponseEntity.ok("Message published successfully");
} catch (Exception exception) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.stream.error.handle.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User {

private int id;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.stream.error.handle.producer;

import com.stream.error.handle.model.User;
import com.stream.error.handle.util.CsvReaderUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CompletableFuture;

@Service
Expand All @@ -25,13 +30,12 @@ public KafkaMessageProducer(@Qualifier("kafkaTemplateUser") KafkaTemplate<String



public void sendEvents(User user) {
public void sendUserEvent(User user) {
try {
CompletableFuture<SendResult<String, User>> future = kafkaTemplate.send(topicName, user);
future.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Sent message=[" + user.toString() +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
//System.out.println("Sent message=[" + user.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");

System.out.println("Sent record to topic " + result.getRecordMetadata().topic() +
" partition " + result.getRecordMetadata().partition() +
Expand All @@ -46,5 +50,15 @@ public void sendEvents(User user) {
}
}

public ResponseEntity<String> publishCsvUserEvent(){
try {
List<User> users = CsvReaderUtils.readDataFromCsv();
users.forEach(usr -> kafkaTemplate.send((Message<?>) usr));
return ResponseEntity.ok("Message published successfully");
} catch (Exception exception) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}


}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 53cc69f

Please sign in to comment.