Compare commits

..

No commits in common. "server_five" and "server_five_liuyunhu" have entirely different histories.

13 changed files with 148 additions and 119 deletions

View File

@ -6,7 +6,6 @@ import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
/**
* @author fufanrui
@ -21,7 +20,7 @@ import java.util.List;
public class RealTimeDataRequest implements Serializable {
private List<Long> userId;
private Long userId;
private String vin;

View File

@ -23,6 +23,7 @@ public class RedisService {
public RedisTemplate redisTemplate;
// ... 其他已有方法 ...
/**
@ -64,6 +65,7 @@ public class RedisService {
*
* @param key Redis
* @param timeout
*
* @return true=false=
*/
public boolean expire (final String key, final long timeout) {
@ -76,6 +78,7 @@ public class RedisService {
* @param key Redis
* @param timeout
* @param unit
*
* @return true=false=
*/
public boolean expire (final String key, final long timeout, final TimeUnit unit) {
@ -86,6 +89,7 @@ public class RedisService {
*
*
* @param key Redis
*
* @return
*/
public long getExpire (final String key) {
@ -96,6 +100,7 @@ public class RedisService {
* key
*
* @param key
*
* @return true false
*/
public Boolean hasKey (String key) {
@ -106,6 +111,7 @@ public class RedisService {
*
*
* @param key
*
* @return
*/
public <T> T getCacheObject (final String key) {
@ -126,6 +132,7 @@ public class RedisService {
*
*
* @param collection
*
* @return
*/
public boolean deleteObject (final Collection collection) {
@ -137,6 +144,7 @@ public class RedisService {
*
* @param key
* @param dataList List
*
* @return
*/
public <T> long setCacheList (final String key, final List<T> dataList) {
@ -148,6 +156,7 @@ public class RedisService {
* list
*
* @param key
*
* @return
*/
public <T> List<T> getCacheList (final String key) {
@ -159,6 +168,7 @@ public class RedisService {
*
* @param key
* @param dataSet
*
* @return
*/
public <T> BoundSetOperations<String, T> setCacheSet (final String key, final Set<T> dataSet) {
@ -169,12 +179,12 @@ public class RedisService {
}
return setOperation;
}
/**
* Set
*
* @param key
* @param setValue
*
* @return
*/
public <T> BoundSetOperations<String, T> setCacheSet (final String key, final T setValue) {
@ -182,12 +192,12 @@ public class RedisService {
setOperation.add(setValue);
return setOperation;
}
/**
* Set
*
* @param key
* @param setValue
*
* @return
*/
public <T> void deleteSet(String key, String setValue) {
@ -195,11 +205,11 @@ public class RedisService {
BoundSetOperations setOperations = redisTemplate.boundSetOps(key);
setOperations.remove(setValue);
}
/**
* set
*
* @param key
*
* @return
*/
public <T> Set<T> getCacheSet (final String key) {
@ -207,6 +217,7 @@ public class RedisService {
}
/**
* Map
*
@ -223,6 +234,7 @@ public class RedisService {
* Map
*
* @param key
*
* @return
*/
public <T> Map<String, T> getCacheMap (final String key) {
@ -245,6 +257,7 @@ public class RedisService {
*
* @param key Redis
* @param hKey Hash
*
* @return Hash
*/
public <T> T getCacheMapValue (final String key, final String hKey) {
@ -257,6 +270,7 @@ public class RedisService {
*
* @param key Redis
* @param hKeys Hash
*
* @return Hash
*/
public <T> List<T> getMultiCacheMapValue (final String key, final Collection<Object> hKeys) {
@ -268,6 +282,7 @@ public class RedisService {
*
* @param key Redis
* @param hKey Hash
*
* @return
*/
public boolean deleteCacheMapValue (final String key, final String hKey) {
@ -278,6 +293,7 @@ public class RedisService {
*
*
* @param pattern
*
* @return
*/
public Collection<String> keys (final String pattern) {
@ -286,7 +302,7 @@ public class RedisService {
public void setVinAndUserId(RealTimeDataRequest realTimeDataRequest) {
String key = "vin:"+realTimeDataRequest.getVin();
redisTemplate.opsForSet().add(key, realTimeDataRequest);
redisTemplate.opsForValue().set(key, realTimeDataRequest);
}
public void stopViewingData(String vin) {

View File

@ -1,7 +1,5 @@
package com.couplet.analyze.msg;
import com.couplet.analyze.msg.model.ModelsKafkaMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
@ -20,8 +18,5 @@ public class CoupletMsgApplication {
public static void main(String[] args) {
SpringApplication.run(CoupletMsgApplication.class);
System.out.println("解析系统启动成功");
new ModelsKafkaMessage().initKafkaConsumer();
}
}

View File

@ -1,6 +1,5 @@
package com.couplet.analyze.msg.model;
import com.couplet.analyze.common.event.AnalyzeEventCache;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
@ -11,7 +10,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -38,7 +36,7 @@ import static java.lang.Thread.sleep;
@Component
@Slf4j
public class ModelsKafkaMessage {
private static final String TOPIC_NAME = "topic_lyh";
private static final String TOPIC_NAME = "lyh";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
@ -52,16 +50,13 @@ public class ModelsKafkaMessage {
private AnalyzeEventCache analyzeEventCache;
//kafka消费者初始化
@PostConstruct
public void initKafkaConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "lll");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
@ -91,7 +86,7 @@ public class ModelsKafkaMessage {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
log.info("接收到的数据:" + record.value());
System.out.println("接收到的数据:" + record.value());
String str = hexToString(record.value());
List<CoupletMsgData> coupletMsgDataList = sendMsg(str);
for (CoupletMsgData msgData : coupletMsgDataList) {

View File

@ -5,7 +5,6 @@ import com.couplet.analyze.common.contents.AnalyzeEventContents;
import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.common.core.text.Convert;
import com.couplet.common.core.utils.StringUtils;
import com.couplet.common.domain.Fence;
import com.couplet.common.domain.request.FenceAndLogeRequest;
import com.couplet.common.redis.service.RedisService;
@ -47,9 +46,12 @@ public class ElectronicFenceServiceImpl implements IncidentService {
if (redisService.hasKey(fenceKey)) {
Set<Fence> cacheSet = redisService.getCacheSet(fenceKey);
log.info("电子围栏事件redis存在.......");
// jingdu;
// longitude;
// weidu;
// latitude;
for (Fence fence : cacheSet) {
String fenceLongitudeLatitude = fence.getFenceLongitudeLatitude();
if (StringUtils.isEmpty(fenceLongitudeLatitude)){
/**
*
*/
@ -75,7 +77,6 @@ public class ElectronicFenceServiceImpl implements IncidentService {
}
}
}
}
log.info("电子围栏事件结束.......");
}
log.info("电子围栏事件结束.......");

View File

@ -5,7 +5,6 @@ import com.couplet.analyze.msg.domain.CoupletMsgData;
import com.couplet.analyze.msg.mapper.IncidentMapper;
import com.couplet.analyze.msg.service.IncidentService;
import com.couplet.analyze.msg.service.impl.realTimeData.RealTimeJudge;
import com.couplet.common.core.utils.StringUtils;
import com.couplet.common.domain.request.RealTimeDataRequest;
import com.couplet.common.redis.service.RedisService;
import com.couplet.remote.RemoteRealTimeService;
@ -13,8 +12,6 @@ import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -26,6 +23,12 @@ import java.util.concurrent.TimeUnit;
@Log4j2
public class RealTimeDataServiceImpl implements IncidentService {
/**
*
*/
@Autowired
private IncidentMapper incidentMapper;
@Autowired
private RedisService redisService;
@ -37,16 +40,28 @@ public class RealTimeDataServiceImpl implements IncidentService {
@Override
public void incident(CoupletMsgData coupletMsgData) {
log.info("实时数据事件开始.....");
if (redisService.hasKey("实时轨迹vin:"+ coupletMsgData.getVin())){
boolean a= ("实时轨迹vin:" + coupletMsgData.getVin()).equals("实时轨迹vin:"+ coupletMsgData.getVin());
if (a){
log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), coupletMsgData);
RealTimeDataRequest cacheObject = redisService.getCacheObject("vin:" + coupletMsgData.getVin());
// //判断是否有缓存数据
// if (redisService.hasKey("vin:query:" + coupletMsgData.getVin())){
// redisService.deleteObject("vin:query:" + coupletMsgData.getVin());
// }
// if (RealTimeJudge.isJudge(coupletMsgData.getVin())) {
if (coupletMsgData.getVin().equals(cacheObject.getVin())){
// log.info("有实时数据,值为:[{}]开始传输实时数据", coupletMsgData.getVin());
//判断数据是否一致,
// if (RealTimeJudge.addRealTime(cacheObject)) {
log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject);
redisService.setCacheSet("vin:query:" + coupletMsgData.getVin(), coupletMsgData);
redisService.expire("vin:"+coupletMsgData.getVin(),10, TimeUnit.MINUTES);
// } else {
// log.info("[{}]有缓存数据,值为:[{}],且缓存数据与实时数据不一致,开始传输实时数据", coupletMsgData.getVin(), cacheObject);
// }
}
}
log.info("[{}]开始传输实时数据", coupletMsgData.getVin());
log.info("实时数据事件结束.....");
}
/**

View File

@ -37,7 +37,7 @@ public class RealTimeJudge {
userIds = new HashSet<>();
setMap.put(realTimeDataRequest.getVin(),userIds);
}
// userIds.add(realTimeDataRequest.getUserId());
userIds.add(realTimeDataRequest.getUserId());
return true;
}

View File

@ -44,12 +44,12 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
*/
@Autowired
private StringRedisTemplate redisTemplate;
/**
*
*/
// @Autowired
// private RemoteFenceService remoteFenceService;
@Override
public List<Fence> pageQuery(FenceConfig fenceConfig) {
List<Fence> list = fenceMapper.pageQuery(fenceConfig);
@ -70,26 +70,36 @@ public class FenceServiceImpl extends ServiceImpl<FenceMapper, Fence> implements
// remoteFenceService.fenceQueue(fenceUpdateRequest);
}
@Override
public void fenceInsert(FenceRequest fenceRequest) {
}
/**
* :
*
* @param
* @param request
* @param fenceRequest
*/
@Override
public void fenceInsert(FenceRequest fenceRequest) {
String username = SecurityUtils.getUsername();
fenceRequest.setCrateName(username);
fenceRequest.setMaintainerName(username);
//先添加围栏
fenceMapper.insertFence(fenceRequest);
fenAndLogoService.addBach(fenceRequest.getFenceId(), fenceRequest.getLogoIds());
/**
*
*/
redisTemplate.opsForValue().set("fenceInsert", JSON.toJSONString(fenceRequest), 10, TimeUnit.MINUTES);
}
// @Override
// public void fenceInsert(HttpServletRequest request, FenceRequest fenceRequest) {
// String username = SecurityUtils.getUsername();
// fenceRequest.setCrateName(username);
// //先添加围栏
// fenceMapper.insertFence(fenceRequest);
// String[] logoIds = fenceRequest.getLogoIds();
// String[] parts = new String[0];
// for (String logoId : logoIds) {
// //把前台传入的字符串分割成数组
// parts = logoId.split(",");
// //再添加围栏和标识中间表
// fenAndLogoService.addBach(fenceRequest.getFenceId(), parts);
// }
// /**
// * 电子围栏发送改变
// */
// redisTemplate.opsForValue().set("fenceInsert", JSON.toJSONString(fenceRequest), 10, TimeUnit.MINUTES);
// }
@Override
public void removeByFenceId(Long fenceId) {

View File

@ -21,8 +21,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import springfox.documentation.spring.web.json.Json;
import java.lang.reflect.Array;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@ -53,26 +53,21 @@ public class VehicleDetectionServiceImpl implements VehicleDetectionService{
public List<CoupletMsgData> monitorinDataList(String vin) {
String key = "vin:query:" + vin;
log.info("key为:"+key);
Set<CoupletMsgData> cacheSet = redisService.getCacheSet(key);
ArrayList<CoupletMsgData> coupletMsgData = new ArrayList<>(cacheSet);
return coupletMsgData;
CoupletMsgData coupletMsgData = redisService.getCacheObject(key);
ArrayList<CoupletMsgData> coupletMsgDataArrayList = new ArrayList<>();
coupletMsgDataArrayList.add(coupletMsgData);
return coupletMsgDataArrayList;
}
@Override
public void monitorinData(String vin) {
//创建对象
RealTimeDataRequest realTimeDataRequest = new RealTimeDataRequest();
//获取用户id
Long userId = SecurityUtils.getUserId();
//设置车辆vin
realTimeDataRequest.setVin(vin);
//创建hashSet集合
HashSet<Long> objects = new HashSet<>();
//添加车辆id
objects.add(userId);
//把对象放入hashSet集合中
//存储的对象是:key:业务+vin value: hashSet集合类型的对象
redisService.setCacheSet("实时轨迹vin:"+ vin,objects);
realTimeDataRequest.setUserId(userId);
// analyzeEventCache.queryEvent("查询实时数据"+vin,realTimeDataRequest);
// redisService.expire("查询实时数据"+vin,4,TimeUnit.MINUTES);
redisService.setVinAndUserId(realTimeDataRequest);
}
@Override

View File

@ -60,7 +60,8 @@
INSERT INTO `couplet-cloud`.`couplet_fence_info`
(`fence_name`, `fence_longitude_latitude`, `fence_description`, `is_delete`, `fence_state`, `create_time`,
`update_time`, `create_name`, `maintainer_name`, `alarm_status`,`fence_condition`)
VALUES (#{fenceName}, NULL, #{fenceDescription} , 0, 0, now(), NULL, #{crateName}, #{maintainerName}, 0, 1)
VALUES
(#{fenceName}, null, #{fenceDescription}, 0, 0, now(), null, null, #{maintainerName}, 0 ,0)
</insert>

View File

@ -74,7 +74,7 @@ public class MqController {
RealTimeDataRequest realTimeDataRequest = new RealTimeDataRequest();
realTimeDataRequest.setVin(vin);
Long userId = SecurityUtils.getUserId();
realTimeDataRequest.setUserId(userId);
rabbitTemplate.convertAndSend(RabbitMQConfig.VinExchangeName, RabbitMQConfig.VinRoutingKey, realTimeDataRequest,
message -> {
message.getMessageProperties().setMessageId(IdUtils.randomUUID());

View File

@ -76,7 +76,7 @@ public class MqttMonitor {
//Kafka生产者配置
private static final String TOPIC_NAME = "topic_lhy";
private static final String TOPIC_NAME = "lyh";
private static final String BOOTSTRAP_SERVERS = "39.103.133.136:9092";
//线程池,用于异步处理消息到来时的业务逻辑

View File

@ -15,9 +15,11 @@ spring:
discovery:
# 服务注册地址
server-addr: 121.89.211.230:8848
namespace: 172469
config:
# 配置中心地址
server-addr: 121.89.211.230:8848
namespace: 172469
# 配置文件格式
file-extension: yml
# 共享配置
@ -36,7 +38,7 @@ mqtt:
# broker: mqtt://115.159.47.13:1883
username:
password:
clientId: aaaaaad
clientId: liuyunhu
qos: 0
topic: xiaoYao
topic: liuyunhu