网站统计分析,重庆市渝兴建设投资有限公司网站,用DW做的网站生成链接,学院网站建设策划书这是我在上一个JavaOne上的会议之一。 这篇文章将扩展主题并使用Batch JSR-352 API进入一个实际的应用程序。 此应用程序与MMORPG 魔兽世界集成。 由于JSR-352是Java EE世界中的新规范#xff0c;所以我认为许多人不知道如何正确使用它。 确定本规范适用的用例也可能是一个挑… 这是我在上一个JavaOne上的会议之一。 这篇文章将扩展主题并使用Batch JSR-352 API进入一个实际的应用程序。 此应用程序与MMORPG 魔兽世界集成。 由于JSR-352是Java EE世界中的新规范所以我认为许多人不知道如何正确使用它。 确定本规范适用的用例也可能是一个挑战。 希望该示例可以帮助您更好地理解用例。 抽象 《魔兽世界》是一款全球超过800万玩家玩的游戏。 该服务按地区提供美国US 欧洲EU 中国和韩国。 每个区域都有一组称为Realm的服务器您可以使用这些服务器进行连接以玩游戏。 对于此示例我们仅研究美国和欧盟地区。 该游戏最有趣的功能之一是允许您使用拍卖行买卖称为“ 物品”的游戏内商品。 每个领域都有两个拍卖行 。 平均每个领域交易约70.000 项 。 让我们计算一些数字 512 境界 美国和欧盟 每个领域 7万个 物品 整个商品超过3500万 数据 《魔兽世界》的另一个有趣之处在于开发人员提供了REST API来访问大多数游戏内信息包括拍卖行的数据。 在此处检查完整的API。 拍卖行的数据分两步获得。 首先我们需要查询对应的Auction House Realm REST端点以获取对JSON文件的引用。 接下来我们需要访问该URL并下载包含所有拍卖行 物品信息的文件。 这是一个例子 http://eu.battle.net/api/wow/auction/data/aggra-portugues 应用程序 我们的目标是建立一个下载拍卖行的应用程序对其进行处理并提取指标。 这些指标将建立商品价格随时间变化的历史记录。 谁知道 也许借助这些信息我们可以预测价格波动并在最佳时间购买或出售商品 。 设置 对于设置我们将在Java EE 7中使用一些其他功能 Java EE 7 角JS 角度ng-grid UI引导程序 谷歌图表 野蝇 职位 批处理JSR-352作业将执行主要工作。 作业是封装整个批处理过程的实体。 作业将通过作业规范语言连接在一起。 使用JSR-352 作业只是这些步骤的容器。 它组合了逻辑上属于流程的多个步骤。 我们将把业务登录分为三个工作 准备 –创建所需的所有支持数据。 列出领域 创建文件夹以复制文件。 文件 –查询领域以检查是否有新文件要处理。 处理 –下载文件处理数据提取指标。 编码 后端–具有Java 8的Java EE 7 大多数代码将在后端。 我们需要Batch JSR-352 但我们还将使用Java EE的许多其他技术例如JPA JAX-RS CDI和JSON-P 。 由于“ 准备工作”仅用于初始化应用程序资源以进行处理因此我将跳过它而深入到最有趣的部分。 文件作业 文件作业是AbstractBatchlet的实现。 批处理是批处理规范中可用的最简单的处理样式。 这是一个面向任务的步骤其中任务被调用一次执行并返回退出状态。 对于执行各种非面向项目的任务例如执行命令或执行文件传输此类型最有用。 在这种情况下我们的Batchlet将在每个Realm上对每个域发出REST请求以进行迭代并使用包含要处理的数据的文件检索URL。 这是代码 LoadAuctionFilesBatchlet Named
public class LoadAuctionFilesBatchlet extends AbstractBatchlet {Injectprivate WoWBusiness woWBusiness;InjectBatchProperty(name region)private String region;InjectBatchProperty(name target)private String target;Overridepublic String process() throws Exception {ListRealm realmsByRegion woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);return COMPLETED;}void getRealmAuctionFileInformation(Realm realm) {try {Client client ClientBuilder.newClient();Files files client.target(target realm.getSlug()).request(MediaType.TEXT_PLAIN).async().get(Files.class).get(2, TimeUnit.SECONDS);files.getFiles().forEach(auctionFile - createAuctionFile(realm, auctionFile));} catch (Exception e) {getLogger(this.getClass().getName()).log(Level.INFO, Could not get files for realm.getRealmDetail());}}void createAuctionFile(Realm realm, AuctionFile auctionFile) {auctionFile.setRealm(realm);auctionFile.setFileName(auctions. auctionFile.getLastModified() .json);auctionFile.setFileStatus(FileStatus.LOADED);if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {woWBusiness.createAuctionFile(auctionFile);}}
} 关于此的一个很酷的事情是Java 8的使用parallelStream()一次调用多个REST请求很容易 您真的可以注意到其中的区别。 如果您想尝试一下只需运行示例然后用stream()替换parallelStream() stream()并检出即可。 在我的机器上使用parallelStream()可使任务执行速度提高约5或6倍。 更新资料 通常我不会使用这种方法。 我这样做了因为部分逻辑涉及调用慢速的REST请求而parallelStreams确实在这里闪耀。 可以使用批处理分区执行此操作但是很难实现。 我们还需要每次都在服务器池中收集新数据因此如果跳过一个或两个文件这并不可怕。 请记住如果您不想错过任何一条记录块处理样式将更适合。 感谢Simon Simonelli引起我的注意。 由于美国和欧盟的领域要求调用不同的REST端点因此它们非常适合分区。 分区意味着该任务将运行到多个线程中。 每个分区一个线程。 在这种情况下我们有两个分区。 要完成作业定义我们需要提供一个JoB XML文件。 这需要放置在META-INF/batch-jobs目录中。 这是此作业的files-job.xml files-job.xml job idloadRealmAuctionFileJob xmlnshttp://xmlns.jcp.org/xml/ns/javaee version1.0step idloadRealmAuctionFileStepbatchlet refloadAuctionFilesBatchletpropertiesproperty nameregion value#{partitionPlan[region]}/property nametarget value#{partitionPlan[target]}//properties/batchletpartitionplan partitions2properties partition0property nameregion valueUS/property nametarget valuehttp://us.battle.net/api/wow/auction/data///propertiesproperties partition1property nameregion valueEU/property nametarget valuehttp://eu.battle.net/api/wow/auction/data///properties/plan/partition/step
/job 在files-job.xml我们需要定义我们Batchlet在batchlet元素。 对于分区只需定义partition元素并为每个plan分配不同的properties 。 然后可以使用这些properties使用表达式#{partitionPlan[region]}和#{partitionPlan[target]}将值后期绑定到LoadAuctionFilesBatchlet 。 这是一种非常简单的表达式绑定机制仅适用于简单的属性和字符串。 处理作业 现在我们要处理领域拍卖数据文件。 使用上一份工作中的信息我们现在可以下载文件并对数据进行某些处理。 JSON文件具有以下结构 item-auctions-sample.json {realm: {name: Grim Batol,slug: grim-batol},alliance: {auctions: [{auc: 279573567, // Auction Iditem: 22792, // Item for sale Idowner: Miljanko, // Seller NameownerRealm: GrimBatol, // Realmbid: 3800000, // Bid Valuebuyout: 4000000, // Buyout Valuequantity: 20, // Numbers of items in the AuctiontimeLeft: LONG, // Time left for the Auctionrand: 0,seed: 1069994368},{auc: 278907544,item: 40195,owner: Mongobank,ownerRealm: GrimBatol,bid: 38000,buyout: 40000,quantity: 1,timeLeft: VERY_LONG,rand: 0,seed: 1978036736}]},horde: {auctions: [{auc: 278268046,item: 4306,owner: Thuglifer,ownerRealm: GrimBatol,bid: 570000,buyout: 600000,quantity: 20,timeLeft: VERY_LONG,rand: 0,seed: 1757531904},{auc: 278698948,item: 4340,owner: Celticpala,ownerRealm: Aggra(Português),bid: 1000000,buyout: 1000000,quantity: 10,timeLeft: LONG,rand: 0,seed: 0}]}
} 该文件包含从其下载的领域的拍卖列表。 在每个记录中我们可以检查待售物品价格卖方和拍卖结束前的剩余时间。 拍卖的算法按拍卖行类型进行汇总 Alliance和Horde 。 对于process-job我们要读取JSON文件转换数据并将其保存到数据库。 这可以通过块处理来实现。 块是一种ETL提取–转换–加载样式的处理适合处理大量数据。 块一次读取一个数据并在事务内创建要写出的块。 从ItemReader读入一项交给ItemProcessor并进行聚合。 一旦读取的项目数等于提交间隔就通过ItemWriter写入整个块然后提交事务。 ItemReader 实际文件太大以致无法将它们完全加载到内存中否则可能会耗尽它。 相反我们使用JSON-P API以流方式解析数据。 AuctionDataItemReader Named
public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {private JsonParser parser;private AuctionHouse auctionHouse;Injectprivate JobContext jobContext;Injectprivate WoWBusiness woWBusiness;Overridepublic void open(Serializable checkpoint) throws Exception {setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));AuctionFile fileToProcess getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSING);woWBusiness.updateAuctionFile(fileToProcess);}Overridepublic void close() throws Exception {AuctionFile fileToProcess getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSED);woWBusiness.updateAuctionFile(fileToProcess);}Overridepublic Object readItem() throws Exception {while (parser.hasNext()) {JsonParser.Event event parser.next();Auction auction new Auction();switch (event) {case KEY_NAME:updateAuctionHouseIfNeeded(auction);if (readAuctionItem(auction)) {return auction;}break;}}return null;}Overridepublic Serializable checkpointInfo() throws Exception {return null;}protected void updateAuctionHouseIfNeeded(Auction auction) {if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {auctionHouse AuctionHouse.ALLIANCE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {auctionHouse AuctionHouse.HORDE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {auctionHouse AuctionHouse.NEUTRAL;}auction.setAuctionHouse(auctionHouse);}protected boolean readAuctionItem(Auction auction) {if (parser.getString().equalsIgnoreCase(auc)) {parser.next();auction.setAuctionId(parser.getLong());parser.next();parser.next();auction.setItemId(parser.getInt());parser.next();parser.next();parser.next();parser.next();auction.setOwnerRealm(parser.getString());parser.next();parser.next();auction.setBid(parser.getInt());parser.next();parser.next();auction.setBuyout(parser.getInt());parser.next();parser.next();auction.setQuantity(parser.getInt());return true;}return false;}public void setParser(JsonParser parser) {this.parser parser;}
} 要打开JSON Parse流我们需要Json.createParser并传递输入流的引用。 要读取元素我们只需要调用hasNext()和next()方法。 这将返回一个JsonParser.Event 它使我们能够检查解析器在流中的位置。 从Batch API ItemReader的readItem()方法中读取并返回元素。 当没有更多元素可读取时返回null以完成处理。 注意我们还实现了从ItemReader open和close的方法。 这些用于初始化和清理资源。 它们只执行一次。 ItemProcessor ItemProcessor是可选的。 它用于转换读取的数据。 在这种情况下我们需要向竞价添加其他信息。 AuctionDataItemProcessor Named
public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {Overridepublic Object processItem(Object item) throws Exception {Auction auction (Auction) item;auction.setRealm(getContext().getRealm());auction.setAuctionFile(getContext().getFileToProcess());return auction;}
} ItemWriter 最后我们只需要将数据写到数据库中即可 AuctionDataItemWriter Named
public class AuctionDataItemWriter extends AbstractItemWriter {PersistenceContextprotected EntityManager em;Overridepublic void writeItems(ListObject items) throws Exception {items.forEach(em::persist);}
} 在我的计算机上具有70 k记录文件的整个过程大约需要20秒。 我确实注意到了一些非常有趣的事情。 在编写此代码之前我使用的是注入的EJB它通过persist操作来调用方法。 这总共花费了30秒因此注入EntityManager并执行持久操作可以直接为我节省三分之一的处理时间。 我只能推测该延迟是由于堆栈调用的增加而造成的其中EJB拦截器位于中间。 这是在Wildfly中发生的。 我将对此进行进一步调查。 要定义块我们需要将其添加到process-job.xml文件中 process-job.xml step idprocessFile nextmoveFileToProcessedchunk item-count100reader refauctionDataItemReader/processor refauctionDataItemProcessor/writer refauctionDataItemWriter//chunk
/step 在item-count属性中我们定义每个处理块中可以容纳多少个元素。 这意味着每100个事务就会提交一次。 这对于保持较小的事务大小和检查数据很有用。 如果我们需要停止然后重新开始操作我们可以这样做而不必再次处理每个项目。 我们必须自己编写逻辑代码。 该示例中不包括此功能但以后会做。 跑步 要运行作业我们需要获得JobOperator的引用。 JobOperator提供了一个界面来管理作业处理的各个方面包括操作命令例如开始重新启动和停止以及与作业存储库相关的命令例如检索作业和步骤执行。 要运行先前的files-job.xml Job我们执行 执行工作 JobOperator jobOperator BatchRuntime.getJobOperator();
jobOperator.start(files-job, new Properties()); 请注意我们使用Job xml文件的名称而没有扩展名到JobOperator 。 下一步 我们仍然需要汇总数据以提取指标并将其显示在网页中。 这篇文章已经很长了因此我将在以后的文章中介绍以下步骤。 无论如何该部分的代码已经在Github存储库中。 检查资源部分。 资源资源 您可以从我的github存储库中克隆完整的工作副本然后将其部署到Wildfly。 您可以在此处找到说明进行部署。 翻译自: https://www.javacodegeeks.com/2014/10/java-ee-7-batch-processing-and-world-of-warcraft-part-1.html