Spark Streaming + Kafka + Opencv + Face Recognizer + HDFS Sequence File + Mysql
阿新 • • 發佈:2019-01-25
<pre name="code" class="java">/** * Created by lwc on 6/17/16. */ import java.io.*; import java.sql.*; import java.util.*; import kafka.serializer.DefaultDecoder; import kafka.serializer.StringDecoder; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.opencv.core.*; import org.opencv.face.Face; import org.opencv.face.FaceRecognizer; import org.opencv.imgproc.Imgproc; import scala.Tuple2; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.opencv.imgcodecs.Imgcodecs; import org.opencv.objdetect.CascadeClassifier; import org.apache.spark.streaming.Durations; class GlobleData implements Serializable { private static final long serialVersionUID = 1L; public Map<Integer, String> idToNameMapping; // public FaceRecognizer faceRecognizer; // public Map<String, Mat> lableMat = new HashMap<String, Mat>(); public Map<String, String> lableMat = new HashMap<String, String>(); } public class AppMatSeq { static Map<Integer, String> idToNameMapping; static FaceRecognizer faceRecognizer; static MatOfInt labelsBuf; static List<Mat> mats; static Map<String, String> lableMat = new HashMap<String, String>(); static String fzString; static GlobleData globleData = new GlobleData(); @SuppressWarnings("rawtypes") public static void train() throws Exception { String uri = "hdfs://10.75.161.88/newfaces.seq"; mats = new ArrayList<Mat>(); idToNameMapping = new HashMap<Integer, String>(); Configuration conf = new Configuration(); Path path = new Path(uri); System.out.println("0"); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); System.out.println("1"); Map<Text, OutputStream> keyStream = new HashMap<Text, OutputStream>(); Text key = new Text(); Text value = new Text(); int count = 0; while (reader.next(key, value)) { if (!idToNameMapping.containsValue(key.toString().split("_")[0])) { idToNameMapping.put(count++, key.toString().split("_")[0]); } if (key.toString().trim() != null && !keyStream.containsKey(key)) { keyStream.put(new Text(key), new ByteArrayOutputStream(1024)); } keyStream.get(key).write(value.getBytes(), 0, value.getLength()); } Map<String, Integer> nameToId = new HashMap<String, Integer>(); for (Map.Entry entry : idToNameMapping.entrySet()) { nameToId.put((String) entry.getValue(), (Integer) entry.getKey()); } Mat mat; ByteArrayOutputStream bs = null; int counter = 0; labelsBuf = new MatOfInt(new int[keyStream.size()]); for (Map.Entry out : keyStream.entrySet()) { bs = ((ByteArrayOutputStream) out.getValue()); bs.flush();//Imgcodecs.CV_LOAD_IMAGE_GRAYSCALE mat = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_IMWRITE_JPEG_OPTIMIZE); Mat matSave = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_LOAD_IMAGE_COLOR); mats.add(mat.clone()); int labelId = nameToId.get(out.getKey().toString().split("_")[0]); // lableMat.put(out.getKey().toString().split("_")[0], matSave.clone()); lableMat.put(out.getKey().toString().split("_")[0], matToJson(matSave.clone())); labelsBuf.put(counter++, 0, labelId); } IOUtils.closeStream(bs); IOUtils.closeStream(reader); faceRecognizer = Face.createFisherFaceRecognizer(); // FaceRecognizer faceRecognizer = Face.createEigenFaceRecognizer(); // FaceRecognizer faceRecognizer = Face.createLBPHFaceRecognizer(); faceRecognizer.train(mats, labelsBuf); if (faceRecognizer == null) { System.out.println("in the static after tain, face rec is null"); } else { System.out.println("!!!!!!!!face rec is not null"); } // globleData.faceRecognizer = faceRecognizer; globleData.idToNameMapping = idToNameMapping; globleData.lableMat = lableMat; } @SuppressWarnings("serial") public static void main(String[] args) throws Exception { System.loadLibrary(Core.NATIVE_LIBRARY_NAME);//54 System.out.println("train before"); train(); System.out.println("train after"); String brokers = args[0]; String topics = args[1]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaVideoData"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); final Broadcast<GlobleData> bcVar = jsc.broadcast(globleData); JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(2)); // for graceful shutdown of the application ... // Runtime.getRuntime().addShutdownHook(new Thread() { // @Override // public void run() { // System.out.println("Shutting down streaming app..."); // if (producer != null) // producer.close(); // jssc.stop(true, true); // System.out.println("Shutdown of streaming app complete."); // } // }); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("group.id", "groupid"); kafkaParams.put("consumer.id", "consumerid"); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream( jssc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topicsSet ); JavaDStream<String> content = messages.map(new Function<Tuple2<String, byte[]>, String>() { //@Override public String call(Tuple2<String, byte[]> tuple2) throws IOException { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); if ((tuple2 == null) || (tuple2._2().length < 1000)) return null; Mat image = new Mat(new Size(640, 480), 16); image.put(0, 0, tuple2._2()); List<Mat> detectResults = detectFace(image); if (detectResults.size() == 0) return null; Mat person = detectResults.get(0); FaceRecognizer fz = Face.createFisherFaceRecognizer(); fz.load("/tmp/faceRec.yml"); GlobleData gd = bcVar.value(); Map<Integer, String> id2Name = gd.idToNameMapping; Map<String, String> lm = gd.lableMat; int[] label = new int[1]; double[] confidence = new double[1]; fz.predict(person, label, confidence); fz = null; System.gc(); System.out.println("confidence: " + confidence[0]); int predictedLabel = label[0]; System.out.println("Predicted label: " + id2Name.get(predictedLabel)); System.out.println("**********"); System.out.println("**********"); System.out.println("**********"); try { Class.forName("com.mysql.jdbc.Driver"); Connection connection = DriverManager.getConnection("jdbc:mysql://10.75.161.87/", "devnet", "devnet"); Statement statement = connection.createStatement(); statement.executeUpdate("use images;"); PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO faces VALUES (NULL, ?, ?, ?, ?, ?)"); Imgcodecs.imwrite("/tmp/1.jpg", image); preparedStatement.setBinaryStream(1, new FileInputStream("/tmp/1.jpg")); preparedStatement.setString(2, String.valueOf(System.currentTimeMillis())); preparedStatement.setString(3, String.valueOf(confidence[0])); preparedStatement.setString(4, id2Name.get(predictedLabel)); Imgcodecs.imwrite("/tmp/2.jpg", matFromJson(lm.get(id2Name.get(predictedLabel)))); preparedStatement.setBinaryStream(5, new FileInputStream("/tmp/2.jpg")); preparedStatement.execute(); connection.close(); System.out.println("sql insert, name = " + id2Name.get(predictedLabel)); } catch (Exception e) { System.out.println(e.toString()); } return id2Name.get(predictedLabel); } }); content.count().print(); // Start the computation jssc.start(); jssc.awaitTermination(); } public static List<Mat> detectFace(Mat inputFrame) { List<Mat> detectedElements = new ArrayList<Mat>(); Mat mRgba = new Mat(); Mat mGrey = new Mat(); inputFrame.copyTo(mRgba); inputFrame.copyTo(mGrey); MatOfRect results = new MatOfRect(); Imgproc.cvtColor(mRgba, mGrey, Imgproc.COLOR_BGR2GRAY); Imgproc.equalizeHist(mGrey, mGrey); CascadeClassifier cascadeClassifier = new CascadeClassifier(GetResourceFilePath("/haarcascade_frontalface_alt.xml").toString()); cascadeClassifier.detectMultiScale(mGrey, results); Rect[] classifiedElements = results.toArray(); for (Rect rect : classifiedElements) { Mat face = new Mat(mGrey, rect); Mat resizedImage = new Mat(); Imgproc.resize(face, resizedImage, new Size((double) 92, (double) 112)); face.release(); detectedElements.add(resizedImage); } mRgba.release(); mGrey.release(); results.release(); System.out.println("Get face: " + detectedElements.size()); return detectedElements; } public static String GetResourceFilePath(String filename) { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); InputStream inputStream = null; OutputStream outputStream = null; String tempFilename = "/tmp" + filename; try { // read this file into InputStream inputStream = BigData.KafkaSpark.App.class.getResourceAsStream(filename); if (inputStream == null) System.out.println("empty streaming"); // write the inputStream to a FileOutputStream outputStream = new FileOutputStream(tempFilename); int read; byte[] bytes = new byte[102400]; while ((read = inputStream.read(bytes)) != -1) { outputStream.write(bytes, 0, read); // System.out.println("read bytes is " + Integer.toString(read)); } outputStream.flush(); System.out.println("Load XML file, Done!"); } catch (IOException e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream != null) { try { // outputStream.flush(); outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return tempFilename; } public static String matToJson(Mat mat) { JsonObject obj = new JsonObject(); if (mat.isContinuous()) { int cols = mat.cols(); int rows = mat.rows(); int elemSize = (int) mat.elemSize(); byte[] data = new byte[cols * rows * elemSize]; mat.get(0, 0, data); obj.addProperty("rows", mat.rows()); obj.addProperty("cols", mat.cols()); obj.addProperty("type", mat.type()); // We cannot set binary data to a json object, so: // Encoding data byte array to Base64. String dataString = new String(Base64.encodeBase64(data)); obj.addProperty("data", dataString); Gson gson = new Gson(); String json = gson.toJson(obj); return json; } return "{}"; } public static Mat matFromJson(String json) { JsonParser parser = new JsonParser(); JsonObject JsonObject = parser.parse(json).getAsJsonObject(); int rows = JsonObject.get("rows").getAsInt(); int cols = JsonObject.get("cols").getAsInt(); int type = JsonObject.get("type").getAsInt(); String dataString = JsonObject.get("data").getAsString(); byte[] data = Base64.decodeBase64(dataString.getBytes()); Mat mat = new Mat(rows, cols, type); mat.put(0, 0, data); return mat; } }
import java.io.*; import java.sql.*; import java.util.*; import kafka.serializer.DefaultDecoder; import kafka.serializer.StringDecoder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.opencv.core.*; import org.opencv.face.Face; import org.opencv.face.FaceRecognizer; import org.opencv.imgproc.Imgproc; import scala.Tuple2; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /*import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords;*/ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.opencv.imgcodecs.Imgcodecs; import org.opencv.objdetect.CascadeClassifier; import org.apache.spark.streaming.Durations; public class App { static Producer<String, String> producer; static Map<Integer, String> idToNameMapping; static FaceRecognizer faceRecognizer; static MatOfInt labelsBuf; static List<Mat> mats; static Map<String, Mat> lableMat = new HashMap<String, Mat>(); @SuppressWarnings("serial") public static class ConvertToWritableTypes implements PairFunction<Tuple2<String, byte[]>, Text, BytesWritable> { @SuppressWarnings({"unchecked", "rawtypes"}) public Tuple2<Text, BytesWritable> call(Tuple2<String, byte[]> record) { return new Tuple2(new Text(record._1), new BytesWritable(record._2)); } } public static void train(){ try { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); String uri = "hdfs://10.75.161.88/stars.seq"; // String uri = "hdfs://10.75.161.242:9000/all.seq"; mats = new ArrayList<Mat>(); idToNameMapping = new HashMap<Integer, String>(); Configuration conf = new Configuration(); Path path = new Path(uri); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); // System.out.println("1"); Map<Text, OutputStream> keyStream = new HashMap<Text, OutputStream>(); Text key = new Text(); Text value = new Text(); int count = 0; while (reader.next(key, value)) { if (!idToNameMapping.containsValue(key.toString().split("_")[0])) { idToNameMapping.put(count++, key.toString().split("_")[0]); } if (key.toString().trim() != null && !keyStream.containsKey(key)) { keyStream.put(new Text(key), new ByteArrayOutputStream(1024)); } keyStream.get(key).write(value.getBytes(), 0, value.getLength()); } Map<String, Integer> nameToId = new HashMap<String, Integer>(); for (Map.Entry entry : idToNameMapping.entrySet()) { nameToId.put((String) entry.getValue(), (Integer) entry.getKey()); } Mat mat; ByteArrayOutputStream bs = null; int counter = 0; labelsBuf = new MatOfInt(new int[keyStream.size()]); for (Map.Entry out : keyStream.entrySet()) { bs = ((ByteArrayOutputStream) out.getValue()); bs.flush();//Imgcodecs.CV_LOAD_IMAGE_GRAYSCALE mat = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_IMWRITE_JPEG_OPTIMIZE); Mat matSave = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_LOAD_IMAGE_COLOR); mats.add(detectElements(mat.clone()).get(0)); Imgcodecs.imwrite("/tmp/" + new Random().nextInt(100) + ".jpg",detectElements(mat.clone()).get(0)); int labelId = nameToId.get(out.getKey().toString().split("_")[0]); lableMat.put(out.getKey().toString().split("_")[0], matSave.clone()); labelsBuf.put(counter++, 0, labelId); } IOUtils.closeStream(bs); IOUtils.closeStream(reader); faceRecognizer = Face.createFisherFaceRecognizer(); // FaceRecognizer faceRecognizer = Face.createEigenFaceRecognizer(); // FaceRecognizer faceRecognizer = Face.createLBPHFaceRecognizer(); faceRecognizer.train(mats, labelsBuf); if(faceRecognizer == null) { System.out.println("in the static after tain, face rec is null"); } else { System.out.println("!!!!!!!!face rec is not null"); } } catch (Exception e) { System.out.println(e.toString()); System.exit(100); } } @SuppressWarnings("serial") public static void main(String[] args) throws Exception{ System.loadLibrary(Core.NATIVE_LIBRARY_NAME);//54 System.out.println("train after"); String brokers = "10.75.161.54:9092"; // String brokers = "172.16.1.11:9092"; String Stringbrokers = "10.75.161.88:9092"; // String brokers = "10.140.92.221:9092"; String topics = "ddp_video_source"; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaVideoData"); sparkConf.setMaster("local"); sparkConf.set("spark.testing.memory", "2147480000"); final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); // for graceful shutdown of the application ... Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.out.println("Shutting down streaming app..."); if (producer != null) producer.close(); jssc.stop(true, true); System.out.println("Shutdown of streaming app complete."); } }); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("group.id", "groupid"); kafkaParams.put("consumer.id", "consumerid"); // kafkaParams.put("bootstrap.servers", "cdhmanage"); kafkaParams.put("zookeeper.connect", "cdh3:2181/kafka"); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream( jssc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topicsSet ); train(); JavaDStream<String> content = messages.map(new Function<Tuple2<String, byte[]>, String>() { //@Override public String call(Tuple2<String, byte[]> tuple2) throws IOException { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); String lable = null; if (tuple2 == null) { System.out.println("null"); return null; } else { if (tuple2._2().length > 1000) { Mat image = new Mat(new Size(640, 480), 16); image.put(0, 0, tuple2._2()); // Imgcodecs.imwrite("/tmp/test"+ new Random().nextInt(100) +".jpg", image); System.out.println("tuple2._2().length > 1000"); if (detectElements(image.clone()).size() > 0) { Mat person = detectElements(image.clone()).get(0); System.out.println(person.width() + "person.width"); System.out.println(person.height() + "person.height"); if(faceRecognizer == null) { System.out.println("after tain, face rec is null"); } if(person == null) { System.out.println("person is null"); } int predictedLabel = faceRecognizer.predict(person); // Imgcodecs.imwrite("/home/test/person"+ new Random().nextInt(100) +".pgm", person); System.out.println("Predicted label: " + idToNameMapping.get(predictedLabel)); System.out.println("**********"); System.out.println("**********"); System.out.println("**********"); System.out.println("**********"); try { Class.forName("com.mysql.jdbc.Driver"); Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/", "root", "root"); Statement statement = connection.createStatement(); statement.executeUpdate("CREATE DATABASE IF NOT EXISTS images;"); statement.executeUpdate("use images;"); statement.executeUpdate("CREATE TABLE IF NOT EXISTS faces (\n" + " id INT NOT NULL AUTO_INCREMENT,\n" + " originalImage MediumBlob NOT NULL,\n" + " timeLabel VARCHAR(100) NOT NULL,\n" + " matchedImage MediumBlob NOT NULL,\n" + " PRIMARY KEY (id)\n" + ")\n" + " ENGINE = InnoDB;"); PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO faces VALUES (NULL, ?, ?, ?)"); Imgcodecs.imwrite("/tmp/1.jpg", image); preparedStatement.setBinaryStream(1, new FileInputStream("/tmp/1.jpg")); // new BufferedInputStream(new FileInputStream(new File(""))); preparedStatement.setString(2, "Time:" + System.nanoTime() + ", name:" + idToNameMapping.get(predictedLabel)); /*for (Map.Entry kv: idToNameMapping.entrySet() ) { System.out.println(kv.getKey().toString() + " id"); System.out.println(kv.getValue().toString() + " value"); } for (Map.Entry kv: lableMat.entrySet() ) { System.out.println(kv.getKey() + " key"); System.out.println("/tmp/value"); Imgcodecs.imwrite("/tmp/" + kv.getKey() + ".pgm", (Mat)kv.getValue()); }*/ Imgcodecs.imwrite("/tmp/2.jpg", lableMat.get(idToNameMapping.get(predictedLabel))); preparedStatement.setBinaryStream(3, new FileInputStream("/tmp/2.pgm")); preparedStatement.execute(); connection.close(); System.out.println("sql insert, name = " + idToNameMapping.get(predictedLabel)); } catch (Exception e) { System.out.println(e.toString()); } } else { System.out.println("NO person"); } } else { System.out.println("tuple2._2().length < 1000"); } return lable; } } }); content.count().print(); // Start the computation jssc.start(); jssc.awaitTermination(); } public static List<Mat> detectElements(Mat inputFrame) { // Imgcodecs.imwrite("/tmp/" + new Random().nextInt(100) + ".pgm", inputFrame); System.loadLibrary(Core.NATIVE_LIBRARY_NAME); List<Mat> detectedElements = new ArrayList<Mat>(10); Mat mRgba = new Mat(); Mat mGrey = new Mat(); inputFrame.copyTo(mRgba); inputFrame.copyTo(mGrey); MatOfRect results = new MatOfRect(); Imgproc.cvtColor( mRgba, mGrey, Imgproc.COLOR_BGR2GRAY); Imgproc.equalizeHist( mGrey, mGrey ); CascadeClassifier cascadeClassifier = new CascadeClassifier(GetResourceFilePath("/haarcascade_frontalface_alt.xml")); cascadeClassifier.detectMultiScale(mGrey, results); Rect[] classifiedElements = results.toArray(); System.out.println("Dectected person: " + classifiedElements.length); for (Rect rect : classifiedElements) { // and adds it to the Mat convert = new Mat(); Mat face = new Mat(mRgba.clone(), rect); Imgproc.cvtColor(face, face, Imgproc.COLOR_BGR2GRAY); face.convertTo(convert, Imgproc.COLOR_BGR2GRAY); detectedElements.add(resizeFace(convert)); // Imgcodecs.imwrite("/tmp/face" + new Random().nextInt(10)+ ".pgm", resizeFace(convert)); } System.out.println("Get fave: " + detectedElements.size()); return detectedElements; } public static Mat resizeFace(Mat originalImage) { System.loadLibrary(Core.NATIVE_LIBRARY_NAME); Mat resizedImage = new Mat(); Imgproc.resize(originalImage, resizedImage, new Size((double)92,(double)112)); return resizedImage; } public static String GetResourceFilePath(String filename) { InputStream inputStream = null; OutputStream outputStream = null; String tempFilename = "/tmp" + filename; try { // read this file into InputStream inputStream = App.class.getResourceAsStream(filename); if (inputStream == null) System.out.println("empty streaming"); // write the inputStream to a FileOutputStream outputStream = new FileOutputStream(tempFilename); int read; byte[] bytes = new byte[102400]; while ((read = inputStream.read(bytes)) != -1) { outputStream.write(bytes, 0, read); // System.out.println("read bytes is " + Integer.toString(read)); } outputStream.flush(); System.out.println("Load XML file, Done!"); } catch (IOException e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream != null) { try { // outputStream.flush(); outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return tempFilename; } }
package kafkaCamera; /** * Created by lwc on 5/31/16. */ import java.io.UnsupportedEncodingException; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.*; import org.opencv.core.Core; import org.opencv.core.Size; import org.opencv.core.Mat; import org.opencv.videoio.VideoCapture; public class KafkaDistributor implements Runnable { private boolean tracking = true; private String brokerUrl = "10.75.161.54:9092"; private String topic = "ddp_video_source"; public void Init(String cameraUrlIn, String brokerUrlIn, String topicIn) { cameraUrl = cameraUrlIn; brokerUrl = brokerUrlIn; topic = topicIn; } public void run() { boolean isOpen = false; VideoCapture capture = null; System.loadLibrary(Core.NATIVE_LIBRARY_NAME); // try { // capture = new VideoCapture(); // isOpen = capture // .open(cameraUrl); // } catch (Exception e) { // e.printStackTrace(); // } int count = 0; while (!isOpen) { capture = new VideoCapture(); isOpen = capture .open(0); System.out.println("Try to open times: " + ++count); } if (!isOpen) { System.out.println("not open the stream!"); return; } Mat frame = new Mat(new Size(640, 480), 16);//new Size(640, 480), 16 Properties props = new Properties(); props.put("bootstrap.servers", brokerUrl); props.put("metadata.broker.list", brokerUrl); props.put("acks", "all"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>( props); Future<RecordMetadata> a = null; long frameCount = 0; while (tracking) { capture.read(frame); // org.opencv.imgcodecs.Imgcodecs.imwrite("/tmp/camera" + new Random().nextInt(100) + ".pgm", frame); // frameCount; byte[] frameArray = new byte[((int) frame.total() * frame .channels())]; frame.get(0, 0, frameArray); System.out.println("FrameSize:" + frameArray.length); System.out.println("Mat:height " + frame.height()); System.out.println("Mat: width " + frame.width()); System.out.println("channels: " + frame.channels()); System.out.println("type: " + frame.type()); // Mat formatFram = new Mat(); // frame.convertTo(formatFram, 0); // frame = formatFram; // System.out.println("FrameSize:" + ""); //a = producer.send(new ProducerRecord<String, byte[]>("ddp_video_source", Long // .toString(frameCount), frameArray)); a = producer.send(new ProducerRecord<String, byte[]>(topic, Long .toString(frameCount), frameArray)); System.out.println("Send one frame" + a.isDone()); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } producer.close(); } public static void main(String[] args) throws UnsupportedEncodingException { KafkaDistributor distributor = new KafkaDistributor(); // distributor.Init(URLDecoder.decode(args[0], "UTF-8"), args[1], args[2]); // distributor.Init(args[0], args[1], args[2]); Thread producerProcess = new Thread(distributor); producerProcess.start(); } }
package opencvImageSeq;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
/**
* Created by lwc on 5/23/16.
*/
public class ImageSeqWriter {
public static void main(String[] args) throws Exception {
File inputDir = new File("/home/test/faceData");
if(!inputDir.isDirectory()) {
throw new Exception("input dir is wrong");
}
File[] inputFiles = inputDir.listFiles();
List<String> imageNames = new ArrayList<>();
InputStream inputStream = null;
// String uri = "hdfs://localhost:9000/all.seq";
String uri = "hdfs://10.75.161.88/stars.seq";
// String uri = "hdfs://10.75.161.242:9000/all.seq";
Configuration conf = new Configuration();
// FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
Text key;
Text value;
// SequenceFile.Writer writer;
// writer = SequenceFile.createWriter(fs, conf, path,key.getClass(), value.getClass());
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class));
for (File file: inputFiles) {
inputStream = new BufferedInputStream(new FileInputStream(file));
String imageName =file.getName();
imageNames.add(imageName);
key = new Text(imageName);
value = new Text();
byte[] buffer = new byte[1024];
while ((inputStream.read(buffer))!= -1) {
value.set(buffer);
writer.append(key, value);//將每條記錄追加到SequenceFile.Writer例項的末尾
value.clear();
}
}
for (String name: imageNames
) {
System.out.println(name);
}
IOUtils.closeStream(inputStream);
IOUtils.closeStream(writer);
}
}