feat kafka的分区

new-master
rouchen 2024-06-12 14:30:40 +08:00
parent 481f9e3e3b
commit 54a49a3481
5 changed files with 160 additions and 145 deletions

View File

@ -1,6 +0,0 @@
#FileLock
#Mon Jun 10 14:45:14 CST 2024
server=10.3.44.57\:60299
hostName=xiaoxin
method=file
id=19000e4d2757c72203e914587eab1e9cbbfe1286271

View File

@ -1,70 +1,70 @@
// This file is auto-generated, don't edit it. Thanks.
package com.muyu;
import com.aliyun.ecs20140526.models.RunInstancesResponse;
import com.aliyun.tea.*;
public class Sample {
/**
* 使AK&SKClient
* @return Client
* @throws Exception
*/
public static com.aliyun.ecs20140526.Client createClient() throws Exception {
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
.setAccessKeyId("LTAI5tBnh5t91qYtNK53Zf3M")
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
.setAccessKeySecret("DNTCIVeIWuWVZkwHVYOlqXF42RH8ja");
// Endpoint 请参考 https://api.aliyun.com/product/Ecs
config.endpoint = "ecs-cn-hangzhou.aliyuncs.com";
return new com.aliyun.ecs20140526.Client(config);
}
public static void main(String[] args_) throws Exception {
java.util.List<String> args = java.util.Arrays.asList(args_);
com.aliyun.ecs20140526.Client client = Sample.createClient();
com.aliyun.ecs20140526.models.RunInstancesRequest.RunInstancesRequestSystemDisk systemDisk = new com.aliyun.ecs20140526.models.RunInstancesRequest.RunInstancesRequestSystemDisk()
.setSize("20");
com.aliyun.ecs20140526.models.RunInstancesRequest runInstancesRequest = new com.aliyun.ecs20140526.models.RunInstancesRequest()
.setRegionId("cn-hangzhou")
.setInstanceType("ecs.e-c1m2.xlarge")
.setLaunchTemplateId("lt-bp13nqg5cfc8ju0nkhgt")
.setImageId("m-bp1d61lpyg1wospcguj4")
.setVSwitchId("vsw-bp1pp6pd19ko18jzgmjpz")
.setUniqueSuffix(true)
.setInternetChargeType("PayByTraffic")
.setSystemDisk(systemDisk)
.setSecurityGroupIds(java.util.Arrays.asList(
"sg-bp16xo23jf7ppzgrsiqt"
));
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
try {
// 复制代码运行请自行打印 API 的返回值
RunInstancesResponse runInstancesResponse = client.runInstancesWithOptions(runInstancesRequest, runtime);
System.out.println("--------------------查询实例--------------------");
//打印响应结果
System.out.println(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(runInstancesResponse)));
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
}
}
}
//// This file is auto-generated, don't edit it. Thanks.
//package com.muyu;
//
//import com.aliyun.ecs20140526.models.RunInstancesResponse;
//import com.aliyun.tea.*;
//
//public class Sample {
//
// /**
// * 使用AK&SK初始化账号Client
// * @return Client
// * @throws Exception
// */
// public static com.aliyun.ecs20140526.Client createClient() throws Exception {
// // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// // 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
// com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
// // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
// .setAccessKeyId("LTAI5tBnh5t91qYtNK53Zf3M")
// // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
// .setAccessKeySecret("DNTCIVeIWuWVZkwHVYOlqXF42RH8ja");
// // Endpoint 请参考 https://api.aliyun.com/product/Ecs
// config.endpoint = "ecs-cn-hangzhou.aliyuncs.com";
// return new com.aliyun.ecs20140526.Client(config);
// }
//
//
// public static void main(String[] args_) throws Exception {
// java.util.List<String> args = java.util.Arrays.asList(args_);
// com.aliyun.ecs20140526.Client client = Sample.createClient();
// com.aliyun.ecs20140526.models.RunInstancesRequest.RunInstancesRequestSystemDisk systemDisk = new com.aliyun.ecs20140526.models.RunInstancesRequest.RunInstancesRequestSystemDisk()
// .setSize("20");
// com.aliyun.ecs20140526.models.RunInstancesRequest runInstancesRequest = new com.aliyun.ecs20140526.models.RunInstancesRequest()
// .setRegionId("cn-hangzhou")
// .setInstanceType("ecs.e-c1m2.xlarge")
// .setLaunchTemplateId("lt-bp13nqg5cfc8ju0nkhgt")
// .setImageId("m-bp1d61lpyg1wospcguj4")
// .setVSwitchId("vsw-bp1pp6pd19ko18jzgmjpz")
// .setUniqueSuffix(true)
// .setInternetChargeType("PayByTraffic")
// .setSystemDisk(systemDisk)
// .setSecurityGroupIds(java.util.Arrays.asList(
// "sg-bp16xo23jf7ppzgrsiqt"
// ));
// com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
// try {
// // 复制代码运行请自行打印 API 的返回值
// RunInstancesResponse runInstancesResponse = client.runInstancesWithOptions(runInstancesRequest, runtime);
// System.out.println("--------------------查询实例--------------------");
// //打印响应结果
// System.out.println(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(runInstancesResponse)));
//
// } catch (TeaException error) {
// // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// // 错误 message
// System.out.println(error.getMessage());
// // 诊断地址
// System.out.println(error.getData().get("Recommend"));
// com.aliyun.teautil.Common.assertAsString(error.message);
// } catch (Exception _error) {
// TeaException error = new TeaException(_error.getMessage(), _error);
// // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// // 错误 message
// System.out.println(error.getMessage());
// // 诊断地址
// System.out.println(error.getData().get("Recommend"));
// com.aliyun.teautil.Common.assertAsString(error.message);
// }
// }
//}

View File

@ -1,60 +1,60 @@
// This file is auto-generated, don't edit it. Thanks.
package com.muyu;
import com.aliyun.ecs20140526.models.DescribeInstanceStatusResponse;
import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
import com.aliyun.tea.*;
import lombok.extern.log4j.Log4j2;
@Log4j2
public class Sample1 {
/**
* 使AK&SKClient
* @return Client
* @throws Exception
*/
public static com.aliyun.ecs20140526.Client createClient() throws Exception {
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
.setAccessKeyId(System.getenv("LTAI5t9jSmMArEckX7ZxbVAE"))
// 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
.setAccessKeySecret(System.getenv("1EtjgU7DnKkiVBE7rHLYDwkwgf7lI6"));
// Endpoint 请参考 https://api.aliyun.com/product/Ecs
config.endpoint = "ecs.cn-shanghai.aliyuncs.com";
return new com.aliyun.ecs20140526.Client(config);
}
public static void main(String[] args_) throws Exception {
java.util.List<String> args = java.util.Arrays.asList(args_);
com.aliyun.ecs20140526.Client client = Sample.createClient();
com.aliyun.ecs20140526.models.DescribeInstanceStatusRequest describeInstanceStatusRequest = new com.aliyun.ecs20140526.models.DescribeInstanceStatusRequest()
.setRegionId("cn-shanghai")
.setInstanceId(java.util.Arrays.asList(
"i-uf65ppbzjylqikhn2euq"
));
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
try {
// 复制代码运行请自行打印 API 的返回值
DescribeInstanceStatusResponse describeInstanceStatusResponse = client.describeInstanceStatusWithOptions(describeInstanceStatusRequest, runtime);
System.out.println(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(describeInstanceStatusResponse)));
} catch (TeaException error) {
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
} catch (Exception _error) {
TeaException error = new TeaException(_error.getMessage(), _error);
// 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// 错误 message
System.out.println(error.getMessage());
// 诊断地址
System.out.println(error.getData().get("Recommend"));
com.aliyun.teautil.Common.assertAsString(error.message);
}
}
}
//// This file is auto-generated, don't edit it. Thanks.
//package com.muyu;
//
//import com.aliyun.ecs20140526.models.DescribeInstanceStatusResponse;
//import com.aliyun.ecs20140526.models.DescribeInstancesResponse;
//import com.aliyun.tea.*;
//import lombok.extern.log4j.Log4j2;
//
//@Log4j2
//public class Sample1 {
//
// /**
// * 使用AK&SK初始化账号Client
// * @return Client
// * @throws Exception
// */
// public static com.aliyun.ecs20140526.Client createClient() throws Exception {
// // 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考。
// // 建议使用更安全的 STS 方式更多鉴权访问方式请参见https://help.aliyun.com/document_detail/378657.html。
// com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
// // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。
// .setAccessKeyId(System.getenv("LTAI5t9jSmMArEckX7ZxbVAE"))
// // 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
// .setAccessKeySecret(System.getenv("1EtjgU7DnKkiVBE7rHLYDwkwgf7lI6"));
// // Endpoint 请参考 https://api.aliyun.com/product/Ecs
// config.endpoint = "ecs.cn-shanghai.aliyuncs.com";
// return new com.aliyun.ecs20140526.Client(config);
// }
//
// public static void main(String[] args_) throws Exception {
// java.util.List<String> args = java.util.Arrays.asList(args_);
// com.aliyun.ecs20140526.Client client = Sample.createClient();
// com.aliyun.ecs20140526.models.DescribeInstanceStatusRequest describeInstanceStatusRequest = new com.aliyun.ecs20140526.models.DescribeInstanceStatusRequest()
// .setRegionId("cn-shanghai")
// .setInstanceId(java.util.Arrays.asList(
// "i-uf65ppbzjylqikhn2euq"
// ));
// com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
// try {
// // 复制代码运行请自行打印 API 的返回值
// DescribeInstanceStatusResponse describeInstanceStatusResponse = client.describeInstanceStatusWithOptions(describeInstanceStatusRequest, runtime);
// System.out.println(com.aliyun.teautil.Common.toJSONString(TeaModel.buildMap(describeInstanceStatusResponse)));
// } catch (TeaException error) {
// // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// // 错误 message
// System.out.println(error.getMessage());
// // 诊断地址
// System.out.println(error.getData().get("Recommend"));
// com.aliyun.teautil.Common.assertAsString(error.message);
// } catch (Exception _error) {
// TeaException error = new TeaException(_error.getMessage(), _error);
// // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
// // 错误 message
// System.out.println(error.getMessage());
// // 诊断地址
// System.out.println(error.getData().get("Recommend"));
// com.aliyun.teautil.Common.assertAsString(error.message);
// }
// }
//}

View File

@ -2,14 +2,20 @@ package com.muyu.web.mqtt;
import com.muyu.web.utils.ConversionUtil;
import com.muyu.vehicle.MessageData;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* MessageCallbackService
*
@ -17,29 +23,44 @@ import org.springframework.stereotype.Service;
* Date 2024/6/6 15:08
*/
@Service
@Log4j2
public class MessageCallbackService implements MqttCallback {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private KafkaAdmin kafkaAdmin;
@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost:"+cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println("topic:"+topic);
System.out.println("Qos:"+mqttMessage.getQos());
System.out.println("message content:"+new String(mqttMessage.getPayload()));
log.info("topic:{}",topic);
log.info("Qos:{}",mqttMessage.getQos());
log.info("message content:{}",new String(mqttMessage.getPayload()));
String s = new String(mqttMessage.getPayload());
MessageData main = ConversionUtil.main(s);
String vin = main.getVin();
ProducerRecord<String, String> stringObjectProducerRecord = new ProducerRecord<>(vin,main.toString());
kafkaTemplate.send(stringObjectProducerRecord);
try {
List<NewTopic> newTopicList = createNewTopics(topic, main.getVin(),main);
for (NewTopic newTopic : newTopicList) {
kafkaAdmin.createOrModifyTopics(newTopic);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private List<NewTopic> createNewTopics(String topic, String vin, MessageData messageData) {
List<NewTopic> newTopics = new ArrayList<>();
String topicName = topic + "-" + vin;
newTopics.add(new NewTopic(topicName, 8, (short) 1));
ProducerRecord<String, Object> stringObjectProducerRecord = new ProducerRecord<>(topicName, messageData.toString());
kafkaTemplate.send(stringObjectProducerRecord);
return newTopics;
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());