网站建设加推广需要多少钱,小程序开发制作软件,南昌定制网站建设,wordpress ad widgetKafka.utils#xff0c;顾名思义#xff0c;就是一个工具套件包#xff0c;里面的类封装了很多常见的功能实现——说到这里#xff0c;笔者有一个感触#xff1a;当初为了阅读Kafka源代码而学习了Scala语言#xff0c;本以为Kafka的实现会用到很多函数编程(Functional Pr…Kafka.utils顾名思义就是一个工具套件包里面的类封装了很多常见的功能实现——说到这里笔者有一个感触当初为了阅读Kafka源代码而学习了Scala语言本以为Kafka的实现会用到很多函数编程(Functional Programming, FP)结果目前来看大部分还是很朴素地以面向对象的方式来实现的只有很少一部分集合的处理使用诸如mapreduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。 下图就是kafka.utils包的所有代码 因为很难像其他包代码之间有逻辑关系我们就一个一个说吧 一、Annotations.scala 这个源代码文件中定义了3个注释类threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的Target(ElementType.TYPE)因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation)很容易知道它们的含义分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为threadsafe的。 二. CommandLineUtils.scala 这个文件使用JOpt Simple库负责解析命令行参数具体使用用法参见官网http://pholser.github.io/jopt-simple/ Kafka在这个文件中提供了一个objectCommandLineUtils。具体包含的方法有 1. printUsageAndDie: 打印命令使用方法并终止程序 2. checkRequiredArgs使用Jopts Simple的API(以下皆同)检查是否缺少必要参数 3. checkInvalidArgs检查指定的参数是否存在不兼容情况即哪些参数不能同时使用 4. parseKeyValueArgs解析keyvalue格式的参数对并返回一个Properties对象 三、Crc32.scala 这个类就是CRC32校验码的实现类来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长里面有很多位操作由于CRC32计算不在本次研究范围所以就了解到这吧。 四、DelayedItem.scala 这个类是个泛型类实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T一个延迟时间以及延迟时间的单位。另外实现这个接口的话必须要实现一个compareTo和getDelay方法。 1. getDelay: 计算距离触发时间还剩下多长时间 2. compareTo: 比较2个Delayed对象的延迟触发时间 五、FileLock.scala 顾名思义FileLock就是一个文件锁它的构造函数接收一个文件对象并总是先尝试创建这个文件(如果不存在的话)然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法 1. lock: 对文件加锁如果该文件上已有锁抛出异常 2. tryLock: 尝试对文件加锁如果成功返回true否则返回false 3. unlock 如果持有锁使用FileLock.release方法释放锁 4. destroy 先释放锁然后调用FileChannel的close方法销毁该channel 六、IteratorTemplate.scala 这个文件视图定义一个迭代器模板主要为遍历消息集合使用。迭代器模板有一个状态字段因此在定义迭代器模板抽象类之前首先定义了一个State状态object以及一组具体的状态object完成(DONE)READY(准备就绪)NOT_READY(未准备)和FAILED(失败)。 之后就是定义IteratorTemplate抽象类了它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:) 如前所述该类有个字段表明了迭代器的状态state还有一个nextItem字段执行遍历中的下一个对象当然初始化为null——说起null想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员Scala推荐使用Option来代替null的可Kafka的代码中null还是随处可见当然可能也是为了更好更自然地与Java集成。 这个抽象类提供很多方法但似乎只有一个抽象方法makeNext其他全是具体方法 1. next如果迭代器已遍历完并无法找到下一项或下一项为空直接抛出异常否则将状态置为NOT_READY并返回下一项 2. peek只是探查一下迭代器是否遍历完如果是抛出异常否则直接返回下一项并不做非空判断也不做状态设置 3. hasNext: 如果状态为FAILED直接抛出异常如果是DONE返回false如果是READY返回true否则调用maybeComputeNext方法 4. makeNext: 返回下一项这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新 5. maybeComputeNext调用makeNext获取到下一项如果状态是DONE返回false否则返回true并将状态置为READY 6. allDone 将状态置为DONE并返回null 7. resetStatus顾名思义就是重置状态字段为NOT_READY 七、JSON.scala JSON的一个封装类用于JSON到String的相互转换该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double不过该类创建一个简单函数用于将数字型字符串转为换Integer并指定其为JSON.globalNumberParser。该类只有2个方法 1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象如果出错则抛出异常 2. encode 讲一个对象编码成json字符串。这个对象只能是nullBooleanStringNumberMap[String, T]Array[T]或Iterable[T]中的一种否则会报错转载于:https://www.cnblogs.com/huxi2b/p/4378439.html