首席数据官

Hi, 请登录

实战▍一个完整的电信客服分析平台大数据项目:架构、实现、数据

作者|黑泽明军 编辑|丹顶鹤5号 电信客服分析平台(附代码)

编者按:

很难见到这种一个完整的大数据项目,从项目背景、项目架构到项目实现都有大量的实例,包括数据存储和数据采集和各个模块的运行设置等等。

通信运营商的数据量存储量是最多的,也是数据化较为完善的领域,数据价值也非常高,如何从电信运营商的数据里「挖掘」到金子?如何把数据转化为生产力是电信运营商最为紧迫的需求,相信这篇干货满满的文章能够给你带来新的思路。

文章目录

一、项目背景

二、项目架构

三、项目实现

一、项目背景

通信运营商每时每刻会产生大量的通信数据,例如:通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如:当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等。我们以此为背景,寻找一个切入点,学习其中的方法论。

二、项目架构

三、项目实现

系统环境:

开发工具:

尖叫提示:idea2017.2.5 必须使用 maven3.3.9,不要使用 maven3.5,有部分兼容性问题。

集群环境(CDH版):

尖叫提示:学习的时候使用的普通版本的,企业开发中使用的是 CDH 版本的。

硬件环境:

3.1、数据生产

此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。

3.1.1、数据结构

我们将在 HBase 中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个 flag 作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。如下图所示:

数据结构如下:

3.1.2、编写代码

思路:

新建 module 项目:ct_producer

pom.xml 文件配置

 properties>        project.build.sourceEncoding>UTF-8project.build.sourceEncoding>    properties>
dependencies> dependency> groupId>junitgroupId> artifactId>junitartifactId> version>4.12version> scope>testscope> dependency> dependencies>
build> plugins> plugin> groupId>org.apache.maven.pluginsgroupId> artifactId>maven-surefire-pluginartifactId> version>2.12.4version> configuration> skipTests>trueskipTests> configuration> plugin> plugins> build>

1) 随机输入一些手机号码以及联系人,保存于 Java 的集合中。

新建类:ProductLog

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }

2) 创建随机生成通话时间的方法:randomBuildTime()

该时间生成后的格式为:yyyy-MM-dd HH:mm:ss,并使之可以根据传入的起始时间和结束时间来随机生成。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }

3) 创建生产日志一条日志的方法:productLog()

随机抽取两个电话号码,随机产生通话建立时间,随机通话时长,将这几个字段拼接成一个字符串,然后return,便可以产生一条通话的记录。需要注意的是,如果随机出的两个电话号码一样,需要重新随机(随机过程可优化,但并非此次重点)。通话时长的随机为30分钟以内,即:60秒 * 30,并格式化为4位数字,例如:0600(10分钟)。

 /**     * 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360     */    public String productLog() {
String caller = null; String callee = null;
String callerName = null; String calleeName = null;
// 随机获取主叫手机号 int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20) caller = phoneList.get(callerIndex); callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号 while (true) { int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20) callee = phoneList.get(calleeIndex); calleeName = phoneNameMap.get(callee);
if (!caller.equals(callee)) { break; } }
// 随机获取通话建立的时间 String buildTime = randomBuildTime(startTime, endTime);
// 随机获取通话的时长 DecimalFormat df = new DecimalFormat("0000"); String duration = df.format((int) (30 * 60 * Math.random()));
StringBuilder sb = new StringBuilder(); sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration); return sb.toString();
// System.out.println(caller + "," + callerName + "," + callee + "," + calleeName + "," + buildTime + "," + duration); }

4) 创建写入日志方法:writeLog()

productLog() 方法每产生一条日志,便将日志写入到本地文件中,所以建立一个专门用于日志写入的方法,需要涉及到 IO 操作,需要注意的是,输出流每次写一条日之后需要 flush,不然可能导致积攒多条数据才输出一次。最后需要将 productLog() 方法放置于 while 死循环中执行。

/**     * 将数据写入到文件中     */    public void writeLog(String filePath) {        try {            OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath), "UTF-8");
while (true) { Thread.sleep(200);
String log = productLog(); System.out.println(log);
osw.write(log + "\n"); osw.flush(); // 一定要手动flush,这样能确保每条数据写入到文件一次 }
} catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }

5) 在主函数中初始化以上逻辑客服数据分析,并测试:

  public static void main(String[] args) throws InterruptedException {
if (args == null || args.length 0) { System.out.println("No arguments"); return; }
ProductLog productLog = new ProductLog(); productLog.initPhone(); productLog.writeLog(args[0]);
// 测试 // String logPath = "d:\\temp\\ct_log\\log.csv"; // productLog.writeLog(logPath); }

3.1.3、打包测试

1) 打包方式

如果在 eclipse 中,则需要如下 maven 参数进行打包:

-P local clean package:不打包第三方依赖-P dev clean package install:打包第三方依赖

如果在 idea 中,则需要在 maven project 视图中一次选择如下按钮进行打包:详细操作请参看课堂演示:

LifeCycle --> package(双击)

分别在 Windows 上和 Linux 中进行测试:

Windows:
java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /本地目录/callLog.csv

2) 为日志生成任务编写 bash 脚本:productLog.sh,文件内容如下,该文件放在 /opt/module/flume/job/ct/ 目录下,并授予执行权限。

#!/bin/bashjava -cp /opt/module/flume/job/ct/ct_producer-1.0-SNAPSHOT.jar com.china.producer.ProductLog /opt/module/flume/job/ct/calllog.csv

3.2、数据采集/消费(存储)

欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架 flume 和 kafka 的定位是什么。我们在此需要将实时数据通过 flume 采集到 kafka 然后供给给 hbase 消费。

flume:Cloudera 公司研发

kafka:Linkedin 公司研发

HBase:实时保存一条一条流入的数据(万金油)

情景:

因此我们常用的一种模型是:

消费存储模块流程图:

公司中的业务情景:

1、公司已经设计好架构了,耐心了解每一个框架应对的是哪一个业务的功能,之后按照框架进行分层。

2、公司没有架构,需要自己搭建,需要按照客户的需求,先对需求进行分层,根据需求用对应的框架实现,之后对框架进行分层。(架构师的思想:宏观格局,5万的月薪,这样才刺激!)

3.2.1、数据采集:采集实时产生的数据到 kafka 集群

思路:

1) 配置 kafka

使用新版本 kafka_2.11-0.11.0.2,不使用老版本 kafka_2.10-0.8.2.1。

新旧版本的区别:

server.properties

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }0

配置环境变量,并使得配置后的环境变量生效

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }1

分发安装包或者同步复制到 hadoop103 和 hadoop104

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }2

注意:分发之后记得配置其他机器的环境变量。

分别在 hadoop103 和 hadoop104 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1、broker.id=2

注意:broker.id 不得重复。

2) 先启动 zookeeper 集群 (kafka 集群 依赖于 zookeeper 集群),再启动 kafka 集群(即启动 3 台 kafka 的 broker 服务)

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }3

3) 创建 kafka 主题

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }4

检查一下是否创建主题成功:

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }5

删除topic

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }6

注意:需要 server.properties 中设置 ic.enable=true 否则只是标记删除或者直接重启。

4) 启动 kafka 控制台消费者,等待 flume 信息的输入

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }7

5) 配置 flume(flume-kafka.conf)

在 hadoop102 的 /opt/module/flume/job 目录下创建一个 ct 文件夹,进入该文件夹,创建一个文件 flume-kafka.conf,文件内容如下:

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }8

注意:需要使用新版本的 flume 的配置文件参考案列。(版本:apache-flume-1.7.0)

6) 进入 flume 根目录下,启动 flume

/** * @author chenmingjun * 2019-03-13 13:35 */public class ProductLog {
/** * 生产数据 */
private String startTime = "2017-01-01"; private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话 private List phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射 private Map phoneNameMap = new HashMap();
/** * 初始化随机的电话号码和姓名 */ public void initPhone() { phoneList.add("13242820024"); phoneList.add("14036178412"); phoneList.add("16386074226"); phoneList.add("13943139492"); phoneList.add("18714767399"); phoneList.add("14733819877"); phoneList.add("13351126401"); phoneList.add("13017498589"); phoneList.add("16058589347"); phoneList.add("18949811796"); phoneList.add("13558773808"); phoneList.add("14343683320"); phoneList.add("13870632301"); phoneList.add("13465110157"); phoneList.add("15382018060"); phoneList.add("13231085347"); phoneList.add("13938679959"); phoneList.add("13779982232"); phoneList.add("18144784030"); phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁"); phoneNameMap.put("14036178412", "卫艺"); phoneNameMap.put("16386074226", "仰莉"); phoneNameMap.put("13943139492", "陶欣悦"); phoneNameMap.put("18714767399", "施梅梅"); phoneNameMap.put("14733819877", "金虹霖"); phoneNameMap.put("13351126401", "魏明艳"); phoneNameMap.put("13017498589", "华贞"); phoneNameMap.put("16058589347", "华啟倩"); phoneNameMap.put("18949811796", "仲采绿"); phoneNameMap.put("13558773808", "卫丹"); phoneNameMap.put("14343683320", "戚丽红"); phoneNameMap.put("13870632301", "何翠柔"); phoneNameMap.put("13465110157", "钱溶艳"); phoneNameMap.put("15382018060", "钱琳"); phoneNameMap.put("13231085347", "缪静欣"); phoneNameMap.put("13938679959", "焦秋菊"); phoneNameMap.put("13779982232", "吕访琴"); phoneNameMap.put("18144784030", "沈丹"); phoneNameMap.put("18637946280", "褚美丽"); }9

7) 运行生产日志的任务脚本,观察 kafka 控制台消费者是否成功显示产生的数据

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }0

3.2.2、编写代码:数据消费(HBase)

如果以上操作均成功,则开始编写操作 HBase 的代码,用于消费数据,将产生的数据实时存储在 HBase 中。

思路:

创建新的 module 项目:ct_consumer

pom.xml 文件配置:

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }1

1) 新建类:HBaseConsumer

该类主要用于读取 kafka 中缓存的数据,然后调用 HBase API,持久化数据。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }2

2) 新建类:PropertiesUtil

该类主要用于将常用的项目所需的参数外部化,解耦,方便配置。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }3

3) 创建 kafka.properties 文件,并放置于 resources 目录下

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }4

4) 将 hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties 放置于 resources 目录

5) 新建类:HBaseUtil

该类主要用于封装一些 HBase 的常用操作,比如:创建命名空间、创建表等等。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }5

工作经验小结

6) 新建类:HBaseDAO(完成以下内容后,考虑数据 put 的效率如何优化)

该类主要用于执行具体的保存数据的操作,rowkey 的生成规则等等。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }6

注意:生成的时间戳要是 string 类型的。不能是 long 类型的。

注意:"xxx".getBytes(); 与 Bytes.toBytes("xxx"); 有区别,

Bytes.toBytes("xxx"); 的底层默认是 "xxx".getBytes(UTF8_CHARSET);,

而 "xxx".getBytes(); 底层默认是 "xxx".getBytes(ISO-8859-1_CHARSET);

二者编码不一样,混着用,就会出现中文乱码!!!

3.2.3、编写测试单元:范围查找数据(本方案已弃用,但需掌握)

使用 scan 查看 HBase 中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时间点的数据。进行该单元测试前,需要先运行数据采集任务,确保 HBase 中已有数据存在。

新建工具过滤器工具类:HBaseFilterUtil

新建单元测试类:HBaseScanTest1(这是个当前情景被废弃的方案,现用方案:HBaseScanTest2 后续讲解)

3.2.4、运行测试:HBase 消费数据

尖叫提示:请将 Linux 允许打开的文件个数和进程数进行优化,优化 RegionServer 与 Zookeeper 会话的超时时间。

项目成功后,则将项目打包后在 linux 中运行测试。

1) 打包 HBase 消费者代码

a) 在 windows 中,进入工程的 pom.xml 所在目录下(建议将该工程的 pom.xml 文件拷贝到其他临时目录中,例如我把 pom.xml 文件拷贝到了 C:\Users\bruce\Desktop\maven-lib 目录下),然后使用 mvn 命令下载工程所有依赖的 jar 包

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }7

b) idea 中使用 maven 打包工程

c) 测试执行该 jar 包(在两种环境下测试)

方案一:推荐使用 * 通配符,将所有依赖加入到 classpath 中,不可使用 *.jar的方式。

尖叫提示:如果是在 Linux 中测试运行,注意文件夹之间的分隔符。自己的工程要单独在 cp 中指定,不要直接放在依赖的 /lib 目录下(即在 Linux 环境下,工程 ct_consumer-1.0-SNAPSHOT.jar 与所依赖的 jar 不能放在同一的目录中)。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }8

方案二:最最推荐,使用 java.ext.dirs 参数将所有依赖的目录添加进 classpath 中。

注意:在 Linux 环境下:-Djava.ext.dirs=属性后边的路径必须使用绝对路径。

 /**     * 根据传入的时间区间,在此范围内随机产生通话建立的时间     * 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()     *     * @param startTime     * @param endTime     * @return     */    public String randomBuildTime(String startTime, String endTime) {        try {            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");            Date startDate = sdf1.parse(startTime);            Date endDate = sdf1.parse(endTime);
if (endDate.getTime() return null; }
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) { e.printStackTrace(); }
return null; }9

方案三:不推荐,将所有依赖的 jar 包直接以绝对路径的方式添加进 classpath 中,以下 为 windows 中的示例,linux 中需要把分号替换为冒号。

 /**     * 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360     */    public String productLog() {
String caller = null; String callee = null;
String callerName = null; String calleeName = null;
// 随机获取主叫手机号 int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20) caller = phoneList.get(callerIndex); callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号 while (true) { int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20) callee = phoneList.get(calleeIndex); calleeName = phoneNameMap.get(callee);
if (!caller.equals(callee)) { break; } }
// 随机获取通话建立的时间 String buildTime = randomBuildTime(startTime, endTime);
// 随机获取通话的时长 DecimalFormat df = new DecimalFormat("0000"); String duration = df.format((int) (30 * 60 * Math.random()));
StringBuilder sb = new StringBuilder(); sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration); return sb.toString();
// System.out.println(caller + "," + callerName + "," + callee + "," + calleeName + "," + buildTime + "," + duration); }0

3.2.5、编写代码:优化数据存储方案

现在我们要使用 HBase 查找数据时,尽可能的使用 rowKey 去精准的定位数据位置,而非使用 ColumnValueFilter 或者 SingleColumnValueFilter,按照单元格 Cell 中的 Value 过滤数据,这样做在数据量巨大的情况下,效率是极低的!如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非 ColumnValueFilter 就无用武之地。现在,我们将使用协处理器,将数据一分为二。

思路:

编码:

1) 新建协处理器类:CalleeWriteObserver,并覆写 postPut() 方法,该方法会在数据成功插入之后被回调

协处理器的使用步骤:

在执行代码之前,我们先手动删除 hbase 上的表 和 命名空间,命令如下:

 /**     * 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360     */    public String productLog() {
String caller = null; String callee = null;
String callerName = null; String calleeName = null;
// 随机获取主叫手机号 int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20) caller = phoneList.get(callerIndex); callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号 while (true) { int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
试看结束,如继续查看请付费↓↓↓↓
打赏0.5元才能查看本内容,立即打赏

来源【首席数据官】,更多内容/合作请关注「辉声辉语」公众号,送10G营销资料!

版权声明:本文内容来源互联网整理,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 jkhui22@126.com举报,一经查实,本站将立刻删除。

相关推荐

二维码
评论