网站开发架构,天津武清网站开发,网站的页面动态需要哪些方法做,企业网站 域名注册大数据挑战 在公司需要处理不断增长的数据量的各个领域中#xff0c;对大数据的概念有不同的理解。 在大多数这些情况下#xff0c;需要以某种方式设计所考虑的系统#xff0c;以便能够处理该数据#xff0c;而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲#xff0c… 大数据挑战 在公司需要处理不断增长的数据量的各个领域中对大数据的概念有不同的理解。 在大多数这些情况下需要以某种方式设计所考虑的系统以便能够处理该数据而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲这导致需要构建高度可伸缩的系统以便可以根据在给定时间点需要处理的数据量来分配更多资源。 构建这样的系统是一项耗时且复杂的活动因此可以使用第三方框架和库来提供现成的可伸缩性要求。 在Java应用程序中已经有很多不错的选择本文将简要讨论一些最受欢迎的选择 行动框架 我们将通过实现一个简单的管道来处理每个设备的数据以测量给定区域的空气质量指数从而演示每个框架。 为简单起见我们假定来自设备的数字数据是分批接收或以流方式接收的。 在整个示例中我们将使用THRESHOLD常量表示该值在该值之上我们认为一个区域被污染。 阿帕奇火花 在Spark中我们需要先将数据转换为正确的格式。 我们将使用数据集但我们也可以选择数据帧或RDD弹性分布式数据集作为数据表示的替代方法。 然后我们可以应用许多Spark转换和操作以便以分布式方式处理数据。 public long countPollutedRegions(String[] numbers) { // runs a Spark master that takes up 4 cores SparkSession session SparkSession.builder(). appName( AirQuality ). master( local[4] ). getOrCreate(); // converts the array of numbers to a Spark dataset Dataset numbersSet session.createDataset(Arrays.asList(numbers), Encoders.STRING()); // runs the data pipeline on the local spark long pollutedRegions numbersSet.map(number - Integer.valueOf(number), Encoders. INT ()) .filter(number - number THRESHOLD).count(); return pollutedRegions; } 如果要更改上述应用程序以从外部源读取数据写入外部数据源并在Spark集群而不是本地Spark实例上运行我们将具有以下执行流程 Spark驱动程序可以是单独的实例也可以是Spark群集的一部分。 Apache Flink 与Spark相似我们需要在Flink DataSet中表示数据然后对其应用必要的转换和操作 public long countPollutedRegions(String[] numbers) throws Exception { // creates a Flink execution environment with proper configuration StreamExecutionEnvironment env StreamExecutionEnvironment. createLocalEnvironment(); // converts the array of numbers to a Flink dataset and creates // the data pipiline DataStream stream env.fromCollection(Arrays.asList(numbers)). map(number - Integer.valueOf(number)) .filter(number - number THRESHOLD).returns(Integer. class ); long pollutedRegions 0; Iterator numbersIterator DataStreamUtils.collect(stream); while (numbersIterator.hasNext()) { pollutedRegions; numbersIterator.next(); } return pollutedRegions; } 如果要更改上述应用程序以从外部源读取数据写入外部数据源并在Flink群集上运行我们将具有以下执行流程 将应用程序提交到Flink群集的Flink客户端是Flink CLI实用程序或JobManager的UI。 阿帕奇风暴 在Storm中数据管道被创建为Spouts数据源和Bolts数据处理单元的拓扑。 由于Storm通常会处理无限制的数据流因此我们会将空气质量指数编号数组的处理模拟为有限制的流 public void countPollutedRegions(String[] numbers) throws Exception { // builds the topology as a combination of spouts and bolts TopologyBuilder builder new TopologyBuilder(); builder.setSpout( numbers-spout StormAirQualitySpout(numbers)); numbers-spout , new StormAirQualitySpout(numbers)); builder.setBolt( number-bolt , new StormAirQualityBolt()). shuffleGrouping( numbers-spout shuffleGrouping( numbers-spout ); // prepares Storm conf and along with the topology submits it for // execution to a local Storm cluster Config conf new Config(); conf.setDebug( true ); LocalCluster localCluster null; try { localCluster new LocalCluster(); localCluster.submitTopology( airquality-topology , conf, builder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } catch (InterruptedException ex) { localCluster.shutdown(); } } 我们有一个喷嘴可以为空气质量指数编号的数组提供数据源还有一个仅过滤指示污染区域的螺栓 public class StormAirQualitySpout extends BaseRichSpout { private boolean emitted false ; private SpoutOutputCollector collector; private String[] numbers; public StormAirQualitySpout(String[] numbers) { this .numbers numbers; } Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( number )); } Override public void open(Map paramas, TopologyContext context, SpoutOutputCollector collector) { this .collector collector; } Override public void nextTuple() { // we make sure that the numbers array is processed just once by // the spout if (!emitted) { for (String number : numbers) { collector.emit( new Values(number)); } emitted true ; } } } public class StormAirQualityBolt extends BaseRichBolt { private static final int THRESHOLD 10; private int pollutedRegions 0; Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( number )); } Override public void prepare(Map params, TopologyContext context, OutputCollector collector) { } Override public void execute(Tuple tuple) { String number tuple.getStringByField( number ); Integer numberInt Integer.valueOf(number); if (numberInt THRESHOLD) { pollutedRegions; } } } 我们正在使用LocalCluster实例提交到本地Storm集群这对于开发很方便但是我们想将Storm拓扑提交到生产集群。 在这种情况下我们将具有以下执行流程 阿帕奇点燃 在Ignite中我们需要先将数据放入分布式缓存中然后再运行数据处理管道该管道是在Ignite群集上以分布式方式执行的SQL查询的前者 public long countPollutedRegions(String[] numbers) { IgniteConfiguration igniteConfig new IgniteConfiguration(); CacheConfiguration cacheConfig new CacheConfiguration(); // cache key is number index in the array and value is the number cacheConfig.setIndexedTypes(Integer. class , String. class ); cacheConfig.setName(NUMBERS_CACHE); igniteConfig.setCacheConfiguration(cacheConfig); try (Ignite ignite Ignition.start(igniteConfig)) { IgniteCache cache ignite.getOrCreateCache(NUMBERS_CACHE); // adds the numbers to the Ignite cache try (IgniteDataStreamer streamer ignite.dataStreamer(cache.getName())) { int key 0; for (String number : numbers) { streamer.addData(key, number); } } // performs an SQL query over the cached numbers SqlFieldsQuery query new SqlFieldsQuery( select * from String where _val THRESHOLD); FieldsQueryCursorList cursor cache.query(query); int pollutedRegions cursor.getAll().size(); return pollutedRegions; } } 如果我们要在Ignite群集中运行应用程序它将具有以下执行流程 榛树喷射机 Hazelcast Jet在Hazelcast IMDG之上运行并且与Ignite相似如果我们要处理数据我们需要首先将其放入Hazelcast IMDG群集中 public long countPollutedRegions(String[] numbers) { // prepares the Jet data processing pipeline Pipeline p Pipeline.create(); p.drawFrom(Sources.list( numbers )). map(number - Integer.valueOf((String) number)) .filter(number - number THRESHOLD).drainTo(Sinks.list( filteredNumbers )); JetInstance jet Jet.newJetInstance(); IList numbersList jet.getList( numbers ); numbersList.addAll(Arrays.asList(numbers)); try { // submits the pipeline in the Jet cluster jet.newJob(p).join(); // gets the filtered data from Hazelcast IMDG List filteredRecordsList jet.getList( filteredNumbers ); int pollutedRegions filteredRecordsList.size(); return pollutedRegions; } finally { Jet.shutdownAll(); } } 但是请注意Jet还提供集成而无需外部数据源并且不需要将数据存储在IMDG群集中。 您也可以在不首先将数据存储到列表中的情况下进行聚合查看Github中包含改进版本的完整示例。 感谢Hazelcast工程团队的Jaromir和Can的宝贵意见。 如果我们要在Hazelcast Jet集群中运行该应用程序它将具有以下执行流程 卡夫卡流 Kafka Streams是一个客户端库使用Kafka主题作为数据处理管道的源和接收器。 为了在我们的方案中使用Kafka Streams库我们将把空气质量指数数字放入数字 Kafka主题中 public long countPollutedRegions() { List result new LinkedList(); // key/value pairs contain string items final Serde stringSerde Serdes.String(); // prepares and runs the data processing pipeline final StreamsBuilder builder new StreamsBuilder(); builder.stream( numbers , Consumed.with(stringSerde, stringSerde)) .map((key, value) - new KeyValue(key, Integer.valueOf(value))). filter((key, value) - value THRESHOLD) .foreach((key, value) - { result.add(value.toString()); }); final Topology topology builder.build(); final KafkaStreams streams new KafkaStreams(topology, createKafkaStreamsConfiguration()); streams.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } int pollutedRegions result.size(); System.out.println( Number of severely polluted regions: pollutedRegions); streams.close(); return pollutedRegions; } private Properties createKafkaStreamsConfiguration() { Properties props new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, text-search-config ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092 ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } 我们的Kafka Stream应用程序实例将具有以下执行流程 脉冲星函数 Apache Pulsar函数是轻量级的计算过程可与Apache Pulsar集群一起以无服务器的方式工作。 假设我们在Pulsar集群中传输空气质量指数我们可以编写一个函数来计算超出给定阈值的指数数量并将结果写回到Pulsar如下所示 public class PulsarFunctionsAirQualityApplication implements Function { private static final int HIGH_THRESHOLD 10; Override public Void process(String input, Context context) throws Exception { int number Integer.valueOf(input); if (number HIGH_THRESHOLD) { context.incrCounter( pollutedRegions , 1); } return null; } } 该函数以及Pulsar集群的执行流程如下 Pulsar函数可以在Pulsar群集中运行也可以作为单独的应用程序运行。 摘要 在本文中我们简要回顾了一些可用于在Java中实现大数据处理系统的最受欢迎的框架。 所提供的每个框架都相当大值得单独发表一篇文章。 尽管非常简单但我们的空气质量指数数据管道却展示了这些框架的运行方式您可以以此为基础来扩展您可能会进一步感兴趣的每个框架中的知识。 您可以在此处查看完整的代码示例。 翻译自: https://www.javacodegeeks.com/2019/12/popular-frameworks-for-big-data-processing-in-java.html