服装设计网站模板下载,专业版式设计网站,互联网公司介绍,wordpress打赏赚钱基于持久化的wordCount程序#xff01;中途遇到了一个坑#xff01; 自己手动封装一个静态线程池#xff0c;使用RDD的foreachPartition操作#xff0c;并且在该操作内部#xff0c;从静态连接池中#xff0c;通过静态方法#xff0c;获取一个连接#xff0c;使用之后…基于持久化的wordCount程序中途遇到了一个坑 自己手动封装一个静态线程池使用RDD的foreachPartition操作并且在该操作内部从静态连接池中通过静态方法获取一个连接使用之后再换回来这样的话可以在对个RDD的partition之间也可以复用连接了而且可以让连接池采取懒创建的策略并且空闲一段时间后将其释放掉。 代码 package com.bynear.spark_Streaming;
import com.bynear.tool.ConnectionPool;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/
public class PersisWordCount {public static void main(String[] args) {final SparkConf conf new SparkConf().setAppName(persiswordcount).setMaster(local[2]);JavaSparkContext jsc new JavaSparkContext(conf);JavaStreamingContext jssc new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint(hdfs://Spark01:9000/zjs/chepoint);JavaReceiverInputDStreamString lines jssc.socketTextStream(localhost, 9999);JavaDStreamString words lines.flatMap(new FlatMapFunctionString, String() {Overridepublic IterableString call(String line) throws Exception {return Arrays.asList(line.split( ));}});JavaPairDStreamString, Integer pairs words.mapToPair(new PairFunctionString, String, Integer() {Overridepublic Tuple2String, Integer call(String word) throws Exception {return new Tuple2String, Integer(word, 1);}});final JavaPairDStreamString, Integer wordcount pairs.updateStateByKey(new Function2ListInteger, OptionalInteger, OptionalInteger() {Overridepublic OptionalInteger call(ListInteger values, OptionalInteger state) throws Exception {Integer newValue 0;if (state.isPresent()) {newValue state.get();}for (Integer value : values) {newValue value;}return Optional.of(newValue);}});wordcount.foreachRDD(new FunctionJavaPairRDDString, Integer, Void() {Overridepublic Void call(JavaPairRDDString, Integer wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunctionIteratorTuple2String, Integer() {Overridepublic void call(IteratorTuple2String, Integer wordcounts) throws Exception {Connection conn ConnectionPool.getConection();Tuple2String, Integer wordcount null;while (wordcounts.hasNext()) {wordcount wordcounts.next();String sql insert into word (word,count) values ( wordcount._1 , wordcount._2 );System.out.println(sqlconnYES);Statement stmt conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();}
}手动搭建的线程池
package com.bynear.tool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/*** 2018/5/16* 12:24*/
public class ConnectionPool {// 静态的Connection队列public static LinkedListConnection connectionQueue;// 加载驱动static {try {Class.forName(com.mysql.jdbc.Driver);} catch (ClassNotFoundException e) {e.printStackTrace();}}// 获取连接多线程访问并发控制public synchronized static Connection getConection() {connectionQueue new LinkedListConnection();try {if (connectionQueue.isEmpty()) {for (int i 0; i 2; i) {Connection conn DriverManager.getConnection(jdbc:mysql://192.168.2.10:3306/testdb,root, 123456);connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);}
}最开始自己搭建的线程池中用的方法为 if (connectionQueuenull) { for (int i 0; i 2; i) { Connection conn DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”, “root”, “123456”); connectionQueue.push(conn); } } 将代码提交到集群上时一直抱空指指针。 后来 System.out.println(sqlconn”YES”);输出一下conn conn ConnectionPool.getConection(); insert into wordcount (word,count) values (‘heool,word’,1)nullYES 为null
跑成功代码 if (connectionQueue.isEmpty()) { for (int i 0; i 2; i) { Connection conn DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”, “root”, “123456”); connectionQueue.push(conn); } } 输出结果在SQL中查询 mysql select * from word; —-—————————————- | id | updated_time | word | count | —-—————————————- | 1 | 2018-05-16 01:11:10 | ???,?? | 1 | | 2 | 2018-05-16 01:11:15 | ???,?? | 1 | | 3 | 2018-05-16 01:13:00 | hello,word | 1 | | 4 | 2018-05-16 01:16:00 | hello | 1 | | 5 | 2018-05-16 01:16:00 | word | 1 | | 6 | 2018-05-16 01:16:05 | hello | 1 | | 7 | 2018-05-16 01:16:05 | word | 1 | —-—————————————- 7 rows in set (0.00 sec) 完美成功