commit()yml
parent
25bc628f8f
commit
6f52789cc0
|
@ -92,5 +92,9 @@
|
||||||
<artifactId>iotdb-session</artifactId>
|
<artifactId>iotdb-session</artifactId>
|
||||||
<version>0.13.1</version>
|
<version>0.13.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.muyu</groupId>
|
||||||
|
<artifactId>cloud-common-rabbit</artifactId>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package com.muyu.carData;
|
package com.muyu.carData;
|
||||||
|
|
||||||
|
import com.muyu.carData.listener.MyListener;
|
||||||
import com.muyu.common.security.annotation.EnableMyFeignClients;
|
import com.muyu.common.security.annotation.EnableMyFeignClients;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
@ -19,7 +20,8 @@ import javax.swing.*;
|
||||||
public class CarDataApplication {
|
public class CarDataApplication {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(CarDataApplication.class,args);
|
SpringApplication application = new SpringApplication(CarDataApplication.class);
|
||||||
System.out.println("caused: Handler dispatch failed; nested exception is java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;;caused: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;");
|
application.addListeners(new MyListener());
|
||||||
|
application.run(args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ public class KafkaConfig {
|
||||||
//生产者可用于缓冲等待发送到服务器的记录的总内存字节数
|
//生产者可用于缓冲等待发送到服务器的记录的总内存字节数
|
||||||
configs.put("buffer-memory",3554432);
|
configs.put("buffer-memory",3554432);
|
||||||
/**
|
/**
|
||||||
*生产者producer要求leader节点在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化
|
* ,用于控制发送记录在服务端的持久化
|
||||||
*acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送
|
*acks=0,设置为0,则生产者producer将不会等待来自服务器的任何确认.该记录将立即添加到套接字(socket)缓冲区并视为已发送
|
||||||
* .在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
|
* .在这种情况下,无法保证服务器已收到记录,并且重试配置(retries)将不会生效(因为客户端通常不会知道任何故障),每条记录返回的偏移量始终设置为-1.
|
||||||
*acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,
|
*acks=1,设置为1,leader节点会把记录写入本地日志,不需要等待所有follower节点完全确认就会立即应答producer.在这种情况下,
|
||||||
|
|
|
@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
/**
|
/**卡夫卡消费者
|
||||||
* @Author:张腾
|
* @Author:张腾
|
||||||
* @Package:com.muyu.carData.consumer
|
* @Package:com.muyu.carData.consumer
|
||||||
* @Project:cloud-server-8
|
* @Project:cloud-server-8
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.carData.testcontroller;
|
package com.muyu.carData.controller;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Cache;
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
import com.muyu.carData.config.cacheconfig.CaffeineConfig;
|
import com.muyu.carData.config.cacheconfig.CaffeineConfig;
|
|
@ -1,4 +1,4 @@
|
||||||
package com.muyu.carData.testcontroller;
|
package com.muyu.carData.controller;
|
||||||
|
|
||||||
import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig;
|
import com.muyu.carData.config.lotdbconfig.IotDBSessionConfig;
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
@ -44,11 +44,13 @@ public class IotDBController {
|
||||||
@GetMapping("/insertData/{insertSize}/{count}")
|
@GetMapping("/insertData/{insertSize}/{count}")
|
||||||
public String insert(@PathVariable(name = "insertSize") int insertSize,@PathVariable(name = "count") int count) throws IoTDBConnectionException, StatementExecutionException {
|
public String insert(@PathVariable(name = "insertSize") int insertSize,@PathVariable(name = "count") int count) throws IoTDBConnectionException, StatementExecutionException {
|
||||||
Session session = iotDBSessionConfig.iotSession();
|
Session session = iotDBSessionConfig.iotSession();
|
||||||
|
//schemaList 属性及类型
|
||||||
List<MeasurementSchema> schemaList = new ArrayList<>();
|
List<MeasurementSchema> schemaList = new ArrayList<>();
|
||||||
schemaList.add(new MeasurementSchema("id", TSDataType.INT32));
|
schemaList.add(new MeasurementSchema("id", TSDataType.INT32));
|
||||||
schemaList.add(new MeasurementSchema("name", TSDataType.TEXT));
|
schemaList.add(new MeasurementSchema("name", TSDataType.TEXT));
|
||||||
schemaList.add(new MeasurementSchema("sex", TSDataType.TEXT));
|
schemaList.add(new MeasurementSchema("sex", TSDataType.TEXT));
|
||||||
|
|
||||||
|
//tablet 封装数据
|
||||||
Tablet tablet = new Tablet("root.yang.baling", schemaList);
|
Tablet tablet = new Tablet("root.yang.baling", schemaList);
|
||||||
|
|
||||||
//以当前时间戳作为插入的起始时间戳
|
//以当前时间戳作为插入的起始时间戳
|
|
@ -1,7 +1,6 @@
|
||||||
package com.muyu.carData.testcontroller;
|
package com.muyu.carData.controller;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
import com.muyu.carData.pojo.Student;
|
|
||||||
import lombok.extern.log4j.Log4j2;
|
import lombok.extern.log4j.Log4j2;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
@ -25,21 +24,15 @@ public class KafkaProducerController {
|
||||||
@Autowired
|
@Autowired
|
||||||
private KafkaProducer kafkaProducer;
|
private KafkaProducer kafkaProducer;
|
||||||
|
|
||||||
private final String topicName = "test";
|
private final String topicName = "carJsons";
|
||||||
|
|
||||||
@GetMapping("/produceTest")
|
@GetMapping("/producer")
|
||||||
public String produceTest() {
|
public String produceTest(JSONObject data) {
|
||||||
try {
|
try {
|
||||||
Student stu = Student.builder().id(2)
|
|
||||||
.name("杨闪闪")
|
|
||||||
.sex("男")
|
|
||||||
.build();
|
|
||||||
String stuStr = JSONObject.toJSONString(stu);
|
|
||||||
log.info("Topic:{}", topicName);
|
log.info("Topic:{}", topicName);
|
||||||
log.info("Java对象:{}",stu);
|
log.info("转换为JSON:{}",data);
|
||||||
log.info("转换为JSON:{}",stuStr);
|
|
||||||
//使用KafkaProducer发送消息
|
//使用KafkaProducer发送消息
|
||||||
ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>(topicName, stuStr);
|
ProducerRecord<String, String> stringProducerRecord = new ProducerRecord(topicName, data);
|
||||||
kafkaProducer.send(stringProducerRecord);
|
kafkaProducer.send(stringProducerRecord);
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("Producer写入Topic异常,异常信息是:{}",e.getMessage());
|
log.error("Producer写入Topic异常,异常信息是:{}",e.getMessage());
|
||||||
|
@ -47,4 +40,6 @@ public class KafkaProducerController {
|
||||||
return "消息发送成功";
|
return "消息发送成功";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.muyu.carData.event;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import org.springframework.context.ApplicationEvent;
|
||||||
|
|
||||||
|
/**自定义事件
|
||||||
|
* @Author:张腾
|
||||||
|
* @Package:com.muyu.carData.event
|
||||||
|
* @Project:cloud-server-8
|
||||||
|
* @name:EsSaveEvent
|
||||||
|
* @Date:2024/9/29 21:15
|
||||||
|
*/
|
||||||
|
public class EsSaveEvent extends ApplicationEvent {
|
||||||
|
|
||||||
|
private JSONObject data;
|
||||||
|
|
||||||
|
|
||||||
|
public EsSaveEvent(JSONObject source) {
|
||||||
|
super(source);
|
||||||
|
this.data = source;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
package com.muyu.carData.listener;
|
||||||
|
|
||||||
|
import com.muyu.carData.event.EsSaveEvent;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @Author:张腾
|
||||||
|
* @Package:com.muyu.carData.listener
|
||||||
|
* @Project:cloud-server-8
|
||||||
|
* @name:CustomEventListener
|
||||||
|
* @Date:2024/9/29 23:49
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class CustomEventListener {
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handMyEvent(EsSaveEvent event){
|
||||||
|
//处理事件详情
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
package com.muyu.carData.listener;
|
||||||
|
|
||||||
|
import com.muyu.carData.event.EsSaveEvent;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
|
||||||
|
/**自定义监听器
|
||||||
|
* @Author:张腾
|
||||||
|
* @Package:com.muyu.carData.listener
|
||||||
|
* @Project:cloud-server-8
|
||||||
|
* @name:MyListener
|
||||||
|
* @Date:2024/9/29 21:18
|
||||||
|
*/
|
||||||
|
@Log4j2
|
||||||
|
public class MyListener implements ApplicationListener<EsSaveEvent> {
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(EsSaveEvent event) {
|
||||||
|
log.info("监听到自定义事件........");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package com.muyu.carData.pulisher;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.muyu.carData.event.EsSaveEvent;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.extern.log4j.Log4j2;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**事件发布测试
|
||||||
|
* @Author:张腾
|
||||||
|
* @Package:com.muyu.carData.pulisher
|
||||||
|
* @Project:cloud-server-8
|
||||||
|
* @name:CustomEventPublisher
|
||||||
|
* @Date:2024/9/29 23:51
|
||||||
|
*/
|
||||||
|
@Log4j2
|
||||||
|
@Component
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class CustomEventPublisher {
|
||||||
|
|
||||||
|
private ApplicationEventPublisher applicationEventPublisher;
|
||||||
|
|
||||||
|
public void publish(JSONObject data){
|
||||||
|
EsSaveEvent esSaveEvent = new EsSaveEvent(data);
|
||||||
|
applicationEventPublisher.publishEvent(esSaveEvent);
|
||||||
|
log.info("事件发布成功 - 消息是:{}",data);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue