kafka传输图片demo
因为项目推进,需要在框架里实现新的功能,将已经写好的利用kafka实现图片的生产消费demo在博客中存档
VideoEventGenerator类
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;
import org.opencv.core.Core;
import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.OutputStream;
import java.sql.Timestamp;/*** 图片序列化发送实现类* @author zhoutao* @data 2019/8/3*/
public class VideoEventGenerator implements Runnable {//记录日志private static final Logger logger = Logger.getLogger(VideoEventGenerator.class);private String cameraId;private String url;private Producer<String,byte[]> producer;private String topic;//构造方法public VideoEventGenerator(String cameraId, String url, Producer<String,byte[]> producer, String topic) {this.cameraId = cameraId;this.url = url;this.producer = producer;this.topic = topic;}//加载opencv库
// static {
// System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
// }//run方法public void run(){logger.info("开始处理相机:"+cameraId+"它的url是"+url);try{videoprocess(cameraId,url,producer,topic);}catch(Exception e){logger.error(e.getMessage());}}//将图片利用BufferedImage转为字节数组private void videoprocess(String cameraId,String url,Producer<String,byte[]> producer, String topic)throws Exception{String timestamp = new Timestamp(System.currentTimeMillis()).toString();BufferedImage image = ImageIO.read(new FileInputStream(url));OutputStream bOut = new ByteArrayOutputStream();ImageIO.write(image,"jpg",bOut);byte[]data=((ByteArrayOutputStream) bOut).toByteArray();ProducerRecord<String, byte[]> datas = new ProducerRecord<String, byte[]>(topic, cameraId,data);try {producer.send(datas);
// logger.info("Generated events for cameraId="+cameraId+" timestamp="+timestamp);}catch(Exception e) {e.printStackTrace();}finally{logger.info("Generated events for cameraId="+cameraId+" timestamp="+timestamp);}}
}
VideoStreamCollector类
import java.awt.image.BufferedImage;
import java.io.*;
import java.util.Collections;
import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.log4j.Logger;import javax.imageio.ImageIO;/**** 1.创建kafka生产者* 2.采集图片多线程发送* @author zhoutao* @data 2019/8/2*/
public class VideoStreamCollector {private static final Logger logger = Logger.getLogger(VideoEventGenerator.class);public static void main(String[] args) throws Exception {//将从配置文件读取到的配置加入Properties类中Properties prop = PropertyFileReader.readPropertyFile();Properties properties = new Properties();Properties properties2=new Properties();properties.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));properties.put("acks", prop.getProperty("kafka.acks"));properties.put("retries", prop.getProperty("kafka.retries"));properties.put("batch.size", prop.getProperty("kafka.batch.size"));properties.put("linger.ms", prop.getProperty("kafka.linger.ms"));properties.put("max.request.size", prop.getProperty("kafka.max.request.size"));properties.put("compression.type", prop.getProperty("kafka.compression.type"));properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");properties2.put("bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"));properties2.put("group.id", "video");properties2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");//创建生产者Producer<String, byte[]> producer = new KafkaProducer<String,byte[]>(properties);generateIoTEvent(producer,prop.getProperty("kafka.topic"),prop.getProperty("camera.id"),prop.getProperty("camera.url"));//创建消费者KafkaConsumer<String,byte[]> consumer=new KafkaConsumer<String,byte[]>(properties2);//订阅主题consumer.subscribe(Collections.singletonList("kafkatest"));//接收数据acceptData(consumer);}//创立图片事件private static void generateIoTEvent(Producer<String, byte[]> producer, String topic, String camId, String videoUrl) throws Exception {String[] urls = videoUrl.split(",");String[] ids = camId.split(",");if(urls.length != ids.length){throw new Exception("相机数与图片数应相同");}logger.info("图片路数:"+urls.length);//多线程调用发送实现类for(int i=0;i<urls.length;i++){Thread t = new Thread(new VideoEventGenerator(ids[i].trim(),urls[i].trim(),producer,topic));t.start();}}//接收数据方法private static void acceptData( KafkaConsumer<String,byte[]> consumer)throws Exception{try{while(true){ConsumerRecords<String,byte[]> records=consumer.poll(100);for(ConsumerRecord<String,byte[]> record:records){System.out.println("获取数据");byte[] data=record.value();ByteArrayInputStream in = new ByteArrayInputStream(data);BufferedImage image = ImageIO.read(in);Long time=System.currentTimeMillis();OutputStream bOut = new FileOutputStream("C:/Users/Administrator.000/Desktop/"+time.toString()+".jpg");ImageIO.write(image,"jpg",bOut);}}}catch(Exception ex){ex.printStackTrace();}finally {consumer.close();}}}
PropertyFileReader类
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/****读取kafka配置文件* @author zhoutao* @data 2019/8/2*/
public class PropertyFileReader {private static final org.apache.log4j.Logger logger = Logger.getLogger(PropertyFileReader.class);private static Properties prop = new Properties();public static Properties readPropertyFile() throws Exception {if (prop.isEmpty()) {//实现获取在classpath路径下的资源文件的输入流。InputStream input = PropertyFileReader.class.getClassLoader().getResourceAsStream("stream-collector.properties");try {prop.load(input);} catch (IOException e) {logger.error(e);throw e;} finally {if (input != null) {input.close();}}}return prop;}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>edu.zt</groupId><artifactId>mavenDemo_idea</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- kafka --><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.0.1</version></dependency><!-- gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.0</version></dependency><!-- commons-lang --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!-- log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><!-- opencv --><!-- https://mvnrepository.com/artifact/opencv/opencv --><dependency><groupId>org.bytedeco</groupId><artifactId>opencv-platform</artifactId><version>4.1.0-1.5.1</version></dependency></dependencies></project>
log4j.properties
# Root logger option
log4j.rootLogger=INFO, file, stdout# Direct log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=F:/tmp/stream-collector.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Stream-collector.properties
# Kafka properties
kafka.bootstrap.servers=10.34.51.202:9092
kafka.acks=1
kafka.retries=1
# 20 MB
kafka.batch.size=20971520
kafka.linger.ms=5
kafka.compression.type=gzip
# 2 MB
kafka.max.request.size=2097152
kafka.topic=kafkatest
//url和id以逗号分隔,顺序必须对应
camera.url=C:/Users/Administrator.000/Desktop/timg.jpg,C:/Users/Administrator.000/Desktop/timg2.jpg
camera.id=cam1,cam2
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!