From daffa478a27e05f1342dc244fbe15bdedb4e2e6c Mon Sep 17 00:00:00 2001 From: rouchen <3133657697@qq.com> Date: Mon, 17 Jun 2024 22:31:07 +0800 Subject: [PATCH] =?UTF-8?q?fix=20kafka=E7=9A=84=E6=B6=88=E8=B4=B9=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- logs/vehicle.log.2024-06-16.0.gz | Bin 0 -> 4320 bytes .../java/com/muyu/event/common/CarEvent.java | 24 +++++ .../event/controller/EventController.java | 29 ++++++ .../com/muyu/event/mapper/EventMapper.java | 19 ++++ .../com/muyu/event/service/EventService.java | 16 ++++ .../event/service/impl/EventServiceImpl.java | 25 +++++ .../com/muyu/kafka/SimpleKafkaConsumer1.java | 88 ++++++++++++++++++ src/main/resources/mapper/EventMapper.xml | 9 ++ 8 files changed, 210 insertions(+) create mode 100644 logs/vehicle.log.2024-06-16.0.gz create mode 100644 src/main/java/com/muyu/event/common/CarEvent.java create mode 100644 src/main/java/com/muyu/event/controller/EventController.java create mode 100644 src/main/java/com/muyu/event/mapper/EventMapper.java create mode 100644 src/main/java/com/muyu/event/service/EventService.java create mode 100644 src/main/java/com/muyu/event/service/impl/EventServiceImpl.java create mode 100644 src/main/java/com/muyu/kafka/SimpleKafkaConsumer1.java create mode 100644 src/main/resources/mapper/EventMapper.xml diff --git a/logs/vehicle.log.2024-06-16.0.gz b/logs/vehicle.log.2024-06-16.0.gz new file mode 100644 index 0000000000000000000000000000000000000000..8b3027ee5bfdb5e070cb348734e08433cf16c4a3 GIT binary patch literal 4320 zcmV<65FhU!iwFP!00000|Lt8%kK;&kK6`-q4~g9ZN&|GI_2EZs=D^Dz}^~b!|-9T2R~sLhJ7*Y(;n;v_RnlL_O^e)5t*b!iKI$W z=}}$71{x)i$%ro_BO)V{A1Osrv?Iwlk`1CL9ZB!V8r4*Tyc#-w_?nzQKl_RhHyYC6 z_-0Jcqx5O_cr*%p*Gc^-?C3f+VFWq95?l^7#YP;&kq<$So(NCzCYx-IT-=$;2&+#_L!eSoKa|Wfj5VZ~+EqG(t2*#23u- zlPNGCz@r)&R5YyIE0^0oa)z`pE#CxQMA>ro?rG^8qLbm0?238QPXe z^T=Qpn#tr}jK3z~rYq!4rqjhaQ2u-4h!mG{J470F=Z=K8Ap9dpw&m=z%07g{W4y#ShG zU=^)4BjA1*&a9Y=Hg33c%#1D@LARH&o1gNYhar3VUu9i1nibCEkEh5Dh{@2V5{_6xM&C=foZY@Lj0FO|`fxOvGXG zN?>}uDRrk|-w#XL5JbSh>f$VUbH71n(s%5x5DNaxcMT6(Sbt0P{4>u6tGg! z4AV{s5f>L-V($dXW61jhuxiSNBk+05EMgMq#m0l=t$=cSweq3|TseJopN};Qb{DVWI==Y+^f@*6O>B zvd@d63w=K05^?WosvPhava@iEM4+Y_`qy~n473^Zq=T4UPvO-Ap3xDHTyWKwPoIEo zLb2j0AT2Zp@qRC*P*#Kq&Up0Y%lu>Eg6|;WXvk8Ui7T9%DnR!c!<64aU&CY7zHdm@ z%1KJqtJ2|MoO;o9n7K6Osk(SGAf2wRQ(2(%w%lmQ^SN==njjLJa?8{NsLrZP#WcVn z^L*|AS57eIPK$eRar$M{hl#(R7jzw2#ftIMPd@!bL~oK83gv`Dp-3Q}pNslh^K&*Zuu;-e=0j}>vr%R# ztTE<&V-&OLjKe0jOj--B;W%u4(b#6|oU%`sisE!jwQ#8OeXm3;7$*pBUqDU}EYVb?I&EQmRn}xvC8aZ5rg;aM8Mr#pnj)8Q9K?RDae#SZSGjew?d++yW;>EkMM#=Mt!2Y~fw z%Ba?&c49oO)#t-Il&w5X!D^hyHu~QXOu@CmxD5R?#U zD$Zt*Ruo#xL6PNN*Y<3Qitkh12XgBtecV04?Wdn^THV3HnUkcPb2QhYrz^5;^uWU5 zRrH2cfT%1<`hnl2{GJ*W%dHpuvk-DQ9l#^yPs z;>@B_OEe&&#YSYdYR`)z%3MxMb^{sgQLkxXn=E6Lwc)403HuXO;^JIr_D$5Y#_B7I z%}t5Amd{d(wAw7S(`l!y>@srSfLNYJ^DkMbs&zRSr_`6H5K*b`o?$1w3FJ&r&B<(~ zv-<3{cskT`+pGE#w!e!hr>>cdLmK)KbqDmhSRP@k+k&+Es@C(|=(=gIe$B-3tFQ4R zF|0h&CRfiK6>{vF07P41(X&+(YVp2DXB9v}fu)2WD1r zV75xVx7KIgbHvv4zU=*H-gCsZCjDTT_lBu5$I-NXQ~kj37L}A;Frps<^FA|I1vjf< z-Y;Xg?LJ!bV43&Adh-uNHq84S55P_yPz?*JLQ>j+pmComx?(e1yz< z-WhxTA<-={??IVOWvOo4=r&~EQf#8$%YrwdcUKaf)fr_<1||K4@d7+tb*ZCGyNT{)A3@9$rP+4j@d#C}0^1 z?ezA>d4PpL00En_f?=LVB!OH$bl?y6Jl}_acnNZSha3a;wjONj(dC;gu}+rQ0wFs$_e#88*^<^L zxo(>7Y%FH@HFW2O?z{uK^SXUfHsdl4g}b3}Hx%xM!d*$>-cL<}-G(N-#+bxUZMDbe z9ihB+dt=c}WnoHNQ9qX6H>)C$Zgu+A5uj~X#9`m+e=ES4)`a=lWnq{bwMH*x@$?qU zB}lZFtDc+?CN#9zMcx84pi5i-`TjG_jD%R$CS6N@w z-SV#HweF911xREnSKaKshrFu{Y{k9hJ@c*vFzs%@R_9&W>+`NSVnw$<9^Ms4tVutd zH=bHu!|pl8-g#HT@OH(G-iCKoZ?5Kzr)wEbmv+I#c!0bsVLf>lw4a7|RX+gwj)+7b zJ?|)Xwi|MI(S@UB3ULM6LmGh!>et4v)*b@vhSu6Sp(`9}n{#Jdv8>~)nb%ew+K zDwUNV_KQ^;M#TnrSHhFj)ft_4;9X@pSLds)&ASpBOOLZ{i+7c&t9oDi$IH7Cotkpj z8s60d&Jy?N)=k5^YIs)-@2cTlHN2~ach&H&8s1gIyJ~n>4e#oq@~&jLquA7%f8u)E zyen1Gw2kwwvc62WZFj=EDrwyt?+TEp64mD)7rvjos~qgze$jagysHdY)jijH%DWPX z4ddf^-<3dIzV-HSc~@OaH%-sl_b*Kg!`o8xzN?kY)x7U&O~WXGj-j%?hSfv2p7w@VT&@Opbs#e9j63`62 zZi`@}j2@xPoPQi=(~KTeqfp(ddcD}Pfu|_aH1P7>3b1Ka$cbRg={)>y_hE?~l7p(K zHXRx}-L5}*Pde6qC*bF}z?dA#ikFVTEq*NcKNDo8LejE-)Y zN@av7)UYd-Xe)uy!KIIgxclpokX@7MRUIM(m9F}+?y8bxdL5}PersXc;%BBRdy=QS z2dcklv6<}oo;!!m06Ya6Qg4XOFPJXDc)`-1<; zr;yXo8%0o5P7=nCr6alIx5Mw#ls-%PUqCJkxbt98fPenxckkc*&xhZ=`|zv({`2qu zUNfmILB_%d`5y5v3RPqzw`g^M%?$w-{E}M0s#FI2{@p*l|Bt_Z|2Mz=;~)MNzIL9$ zHPafd)r}*V!yl(E4oT?re0>orICzzx#*x|M$Nl?{((QwQrc4aH2^>Y~PJcd_mfM9zRt< zmY9uiM!q{tWiH9nAZwicFwQfWX`MdHMNBU37#imIZyZ0(g95cPsE%Rg5x4V86McqU zv+j;u-V1VkkKJ97D|> bindingEvent() { + return Result.success(eventService.getCarEventList()); + } +} diff --git a/src/main/java/com/muyu/event/mapper/EventMapper.java b/src/main/java/com/muyu/event/mapper/EventMapper.java new file mode 100644 index 0000000..3b059b4 --- /dev/null +++ b/src/main/java/com/muyu/event/mapper/EventMapper.java @@ -0,0 +1,19 @@ +package com.muyu.event.mapper; + +import com.muyu.event.common.CarEvent; +import org.apache.ibatis.annotations.Mapper; + +import java.util.List; + +/** + * EventMapper + * + * @author Yangle + * Date 2024/6/17 20:00 + */ +@Mapper +public interface EventMapper { + List getCarEventList(); + + +} diff --git a/src/main/java/com/muyu/event/service/EventService.java b/src/main/java/com/muyu/event/service/EventService.java new file mode 100644 index 0000000..ee17f5d --- /dev/null +++ b/src/main/java/com/muyu/event/service/EventService.java @@ -0,0 +1,16 @@ +package com.muyu.event.service; + +import com.muyu.event.common.CarEvent; + +import java.util.List; + +/** + * EventService + * + * @author Yangle + * Date 2024/6/17 19:57 + */ +public interface EventService { + List getCarEventList(); + +} diff --git a/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java b/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java new file mode 100644 index 0000000..03605e4 --- /dev/null +++ b/src/main/java/com/muyu/event/service/impl/EventServiceImpl.java @@ -0,0 +1,25 @@ +package com.muyu.event.service.impl; + +import com.muyu.event.common.CarEvent; +import com.muyu.event.mapper.EventMapper; +import com.muyu.event.service.EventService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * EventServiceImpl + * + * @author Yangle + * Date 2024/6/17 19:57 + */ +@Service +public class EventServiceImpl implements EventService { + @Autowired + private EventMapper eventMapper; + @Override + public List getCarEventList() { + return eventMapper.getCarEventList(); + } +} diff --git a/src/main/java/com/muyu/kafka/SimpleKafkaConsumer1.java b/src/main/java/com/muyu/kafka/SimpleKafkaConsumer1.java new file mode 100644 index 0000000..fc72089 --- /dev/null +++ b/src/main/java/com/muyu/kafka/SimpleKafkaConsumer1.java @@ -0,0 +1,88 @@ +//package com.muyu.kafka; +// +//import com.muyu.iotDB.service.IotDbServer; +//import com.muyu.mqtt.dao.MessageData; +//import lombok.extern.log4j.Log4j2; +//import org.apache.iotdb.rpc.IoTDBConnectionException; +//import org.apache.iotdb.rpc.StatementExecutionException; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.apache.kafka.common.TopicPartition; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.scheduling.annotation.Scheduled; +//import org.springframework.stereotype.Component; +// +//import javax.annotation.Resource; +//import java.rmi.ServerException; +//import java.time.Duration; +//import java.util.ArrayList; +//import java.util.Collections; +//import java.util.List; +//import java.util.Properties; +// +///** +// * 定时器 SimpleKafkaConsumer +// * +// * @author Yangle +// * Date 2024/6/16 22:18 +// */ +//@Component +//@Log4j2 +//public class SimpleKafkaConsumer1 { +// +// @Autowired +// private RedisTemplate redisTemplate; +// +// +// public void consumer1() { +// System.out.println(123); +// } +// +//// public void consumer() { +//// List dataArrayList = new ArrayList<>(); +//// log.info("添加到reids定时器开启"); +//// // 配置Kafka消费者属性 +//// Properties props = new Properties(); +//// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); +//// props.put(ConsumerConfig.GROUP_ID_CONFIG, "Partitions"); +//// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +//// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +//// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +//// +//// // 创建Kafka消费者实例 +//// KafkaConsumer consumer = new KafkaConsumer<>(props); +//// +//// // 订阅主题 +//// TopicPartition topicPartition = new TopicPartition("test1", 0); +//// consumer.assign(Collections.singletonList(topicPartition)); +//// +//// log.info("定时器结束"); +//// // 持续消费消息 +//// while (true) { +//// ConsumerRecords records = consumer.poll(Duration.ofMillis(10)); +//// for (ConsumerRecord record : records) { +//// System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); +//// String value = record.value(); +//// log.info("value:{}", value); +//// MessageData messageData1 = com.alibaba.fastjson2.JSONObject.parseObject(value, MessageData.class); +//// log.info("messageData1:{}", messageData1); +//// dataArrayList.add(messageData1); +//// } +//// if (dataArrayList.size() >= 10) { +//// for (MessageData messageData : dataArrayList) { +//// // 将数据添加到Redis中,这里以messageData的id作为key,messageData对象序列化为JSON字符串作为value +//// redisTemplate.opsForList().rightPush(messageData.getVin(), messageData.toString()); +//// } +//// dataArrayList.clear(); +//// } +//// +//// } +// +// +// +//} +// diff --git a/src/main/resources/mapper/EventMapper.xml b/src/main/resources/mapper/EventMapper.xml new file mode 100644 index 0000000..03a1663 --- /dev/null +++ b/src/main/resources/mapper/EventMapper.xml @@ -0,0 +1,9 @@ + + + + + + +