作者|黑泽明军 编辑|丹顶鹤5号 电信客服分析平台(附代码)
编者按:
很难见到这种一个完整的大数据项目,从项目背景、项目架构到项目实现都有大量的实例,包括数据存储和数据采集和各个模块的运行设置等等。
通信运营商的数据量存储量是最多的,也是数据化较为完善的领域,数据价值也非常高,如何从电信运营商的数据里「挖掘」到金子?如何把数据转化为生产力是电信运营商最为紧迫的需求,相信这篇干货满满的文章能够给你带来新的思路。
文章目录
一、项目背景
二、项目架构
三、项目实现
一、项目背景
通信运营商每时每刻会产生大量的通信数据,例如:通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如:当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等。我们以此为背景,寻找一个切入点,学习其中的方法论。
二、项目架构
三、项目实现
系统环境:
开发工具:
尖叫提示:idea2017.2.5 必须使用 maven3.3.9,不要使用 maven3.5,有部分兼容性问题。
集群环境(CDH版):
尖叫提示:学习的时候使用的普通版本的,企业开发中使用的是 CDH 版本的。
硬件环境:
3.1、数据生产
此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。
3.1.1、数据结构
我们将在 HBase 中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个 flag 作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。如下图所示:
数据结构如下:
3.1.2、编写代码
思路:
新建 module 项目:ct_producer
pom.xml 文件配置
properties>
project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
properties>
dependencies>
dependency>
groupId>junitgroupId>
artifactId>junitartifactId>
version>4.12version>
scope>testscope>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.apache.maven.pluginsgroupId>
artifactId>maven-surefire-pluginartifactId>
version>2.12.4version>
configuration>
skipTests>trueskipTests>
configuration>
plugin>
plugins>
build>
1) 随机输入一些手机号码以及联系人,保存于 Java 的集合中。
新建类:ProductLog
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
2) 创建随机生成通话时间的方法:randomBuildTime()
该时间生成后的格式为:yyyy-MM-dd HH:mm:ss,并使之可以根据传入的起始时间和结束时间来随机生成。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
3) 创建生产日志一条日志的方法:productLog()
随机抽取两个电话号码,随机产生通话建立时间,随机通话时长,将这几个字段拼接成一个字符串,然后return,便可以产生一条通话的记录。需要注意的是,如果随机出的两个电话号码一样,需要重新随机(随机过程可优化,但并非此次重点)。通话时长的随机为30分钟以内,即:60秒 * 30,并格式化为4位数字,例如:0600(10分钟)。
/**
* 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360
*/
public String productLog() {
String caller = null;
String callee = null;
String callerName = null;
String calleeName = null;
// 随机获取主叫手机号
int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
caller = phoneList.get(callerIndex);
callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号
while (true) {
int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
callee = phoneList.get(calleeIndex);
calleeName = phoneNameMap.get(callee);
if (!caller.equals(callee)) {
break;
}
}
// 随机获取通话建立的时间
String buildTime = randomBuildTime(startTime, endTime);
// 随机获取通话的时长
DecimalFormat df = new DecimalFormat("0000");
String duration = df.format((int) (30 * 60 * Math.random()));
StringBuilder sb = new StringBuilder();
sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);
return sb.toString();
// System.out.println(caller + "," + callerName + "," + callee + "," + calleeName + "," + buildTime + "," + duration);
}
4) 创建写入日志方法:writeLog()
productLog() 方法每产生一条日志,便将日志写入到本地文件中,所以建立一个专门用于日志写入的方法,需要涉及到 IO 操作,需要注意的是,输出流每次写一条日之后需要 flush,不然可能导致积攒多条数据才输出一次。最后需要将 productLog() 方法放置于 while 死循环中执行。
/**
* 将数据写入到文件中
*/
public void writeLog(String filePath) {
try {
OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath), "UTF-8");
while (true) {
Thread.sleep(200);
String log = productLog();
System.out.println(log);
osw.write(log + "\n");
osw.flush(); // 一定要手动flush,这样能确保每条数据写入到文件一次
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
5) 在主函数中初始化以上逻辑客服数据分析,并测试:
public static void main(String[] args) throws InterruptedException {
if (args == null || args.length 0) {
System.out.println("No arguments");
return;
}
ProductLog productLog = new ProductLog();
productLog.initPhone();
productLog.writeLog(args[0]);
// 测试
// String logPath = "d:\\temp\\ct_log\\log.csv";
// productLog.writeLog(logPath);
}
3.1.3、打包测试
1) 打包方式
如果在 eclipse 中,则需要如下 maven 参数进行打包:
-P local clean package:不打包第三方依赖
-P dev clean package install:打包第三方依赖
如果在 idea 中,则需要在 maven project 视图中一次选择如下按钮进行打包:详细操作请参看课堂演示:
LifeCycle --> package(双击)
分别在 Windows 上和 Linux 中进行测试:
Windows:
java -cp ct_producer-1.0-SNAPSHOT.jar producer.ProductLog /本地目录/callLog.csv
2) 为日志生成任务编写 bash 脚本:productLog.sh,文件内容如下,该文件放在 /opt/module/flume/job/ct/ 目录下,并授予执行权限。
#!/bin/bash
java -cp /opt/module/flume/job/ct/ct_producer-1.0-SNAPSHOT.jar com.china.producer.ProductLog /opt/module/flume/job/ct/calllog.csv
3.2、数据采集/消费(存储)
欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架 flume 和 kafka 的定位是什么。我们在此需要将实时数据通过 flume 采集到 kafka 然后供给给 hbase 消费。
flume:Cloudera 公司研发
kafka:Linkedin 公司研发
HBase:实时保存一条一条流入的数据(万金油)
情景:
因此我们常用的一种模型是:
消费存储模块流程图:
公司中的业务情景:
1、公司已经设计好架构了,耐心了解每一个框架应对的是哪一个业务的功能,之后按照框架进行分层。
2、公司没有架构,需要自己搭建,需要按照客户的需求,先对需求进行分层,根据需求用对应的框架实现,之后对框架进行分层。(架构师的思想:宏观格局,5万的月薪,这样才刺激!)
3.2.1、数据采集:采集实时产生的数据到 kafka 集群
思路:
1) 配置 kafka
使用新版本 kafka_2.11-0.11.0.2,不使用老版本 kafka_2.10-0.8.2.1。
新旧版本的区别:
server.properties
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
0
配置环境变量,并使得配置后的环境变量生效
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
1
分发安装包或者同步复制到 hadoop103 和 hadoop104
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
2
注意:分发之后记得配置其他机器的环境变量。
分别在 hadoop103 和 hadoop104 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1、broker.id=2
注意:broker.id 不得重复。
2) 先启动 zookeeper 集群 (kafka 集群 依赖于 zookeeper 集群),再启动 kafka 集群(即启动 3 台 kafka 的 broker 服务)
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
3
3) 创建 kafka 主题
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
4
检查一下是否创建主题成功:
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
5
删除topic
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
6
注意:需要 server.properties 中设置 ic.enable=true 否则只是标记删除或者直接重启。
4) 启动 kafka 控制台消费者,等待 flume 信息的输入
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
7
5) 配置 flume(flume-kafka.conf)
在 hadoop102 的 /opt/module/flume/job 目录下创建一个 ct 文件夹,进入该文件夹,创建一个文件 flume-kafka.conf,文件内容如下:
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
8
注意:需要使用新版本的 flume 的配置文件参考案列。(版本:apache-flume-1.7.0)
6) 进入 flume 根目录下,启动 flume
/**
* @author chenmingjun
* 2019-03-13 13:35
*/
public class ProductLog {
/**
* 生产数据
*/
private String startTime = "2017-01-01";
private String endTime = "2017-12-31";
// 用于存放待随机的联系人电话
private List
phoneList = new ArrayList();
// 用于存放联系人电话与姓名的映射
private Map
phoneNameMap = new HashMap();
/**
* 初始化随机的电话号码和姓名
*/
public void initPhone() {
phoneList.add("13242820024");
phoneList.add("14036178412");
phoneList.add("16386074226");
phoneList.add("13943139492");
phoneList.add("18714767399");
phoneList.add("14733819877");
phoneList.add("13351126401");
phoneList.add("13017498589");
phoneList.add("16058589347");
phoneList.add("18949811796");
phoneList.add("13558773808");
phoneList.add("14343683320");
phoneList.add("13870632301");
phoneList.add("13465110157");
phoneList.add("15382018060");
phoneList.add("13231085347");
phoneList.add("13938679959");
phoneList.add("13779982232");
phoneList.add("18144784030");
phoneList.add("18637946280");
phoneNameMap.put("13242820024", "李雁");
phoneNameMap.put("14036178412", "卫艺");
phoneNameMap.put("16386074226", "仰莉");
phoneNameMap.put("13943139492", "陶欣悦");
phoneNameMap.put("18714767399", "施梅梅");
phoneNameMap.put("14733819877", "金虹霖");
phoneNameMap.put("13351126401", "魏明艳");
phoneNameMap.put("13017498589", "华贞");
phoneNameMap.put("16058589347", "华啟倩");
phoneNameMap.put("18949811796", "仲采绿");
phoneNameMap.put("13558773808", "卫丹");
phoneNameMap.put("14343683320", "戚丽红");
phoneNameMap.put("13870632301", "何翠柔");
phoneNameMap.put("13465110157", "钱溶艳");
phoneNameMap.put("15382018060", "钱琳");
phoneNameMap.put("13231085347", "缪静欣");
phoneNameMap.put("13938679959", "焦秋菊");
phoneNameMap.put("13779982232", "吕访琴");
phoneNameMap.put("18144784030", "沈丹");
phoneNameMap.put("18637946280", "褚美丽");
}
9
7) 运行生产日志的任务脚本,观察 kafka 控制台消费者是否成功显示产生的数据
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
0
3.2.2、编写代码:数据消费(HBase)
如果以上操作均成功,则开始编写操作 HBase 的代码,用于消费数据,将产生的数据实时存储在 HBase 中。
思路:
创建新的 module 项目:ct_consumer
pom.xml 文件配置:
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
1
1) 新建类:HBaseConsumer
该类主要用于读取 kafka 中缓存的数据,然后调用 HBase API,持久化数据。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
2
2) 新建类:PropertiesUtil
该类主要用于将常用的项目所需的参数外部化,解耦,方便配置。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
3
3) 创建 kafka.properties 文件,并放置于 resources 目录下
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
4
4) 将 hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties 放置于 resources 目录
5) 新建类:HBaseUtil
该类主要用于封装一些 HBase 的常用操作,比如:创建命名空间、创建表等等。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
5
工作经验小结
6) 新建类:HBaseDAO(完成以下内容后,考虑数据 put 的效率如何优化)
该类主要用于执行具体的保存数据的操作,rowkey 的生成规则等等。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
6
注意:生成的时间戳要是 string 类型的。不能是 long 类型的。
注意:"xxx".getBytes(); 与 Bytes.toBytes("xxx"); 有区别,
Bytes.toBytes("xxx"); 的底层默认是 "xxx".getBytes(UTF8_CHARSET);,
而 "xxx".getBytes(); 底层默认是 "xxx".getBytes(ISO-8859-1_CHARSET);
二者编码不一样,混着用,就会出现中文乱码!!!
3.2.3、编写测试单元:范围查找数据(本方案已弃用,但需掌握)
使用 scan 查看 HBase 中是否正确存储了数据,同时尝试使用过滤器查询扫描指定通话时间点的数据。进行该单元测试前,需要先运行数据采集任务,确保 HBase 中已有数据存在。
新建工具过滤器工具类:HBaseFilterUtil
新建单元测试类:HBaseScanTest1(这是个当前情景被废弃的方案,现用方案:HBaseScanTest2 后续讲解)
3.2.4、运行测试:HBase 消费数据
尖叫提示:请将 Linux 允许打开的文件个数和进程数进行优化,优化 RegionServer 与 Zookeeper 会话的超时时间。
项目成功后,则将项目打包后在 linux 中运行测试。
1) 打包 HBase 消费者代码
a) 在 windows 中,进入工程的 pom.xml 所在目录下(建议将该工程的 pom.xml 文件拷贝到其他临时目录中,例如我把 pom.xml 文件拷贝到了 C:\Users\bruce\Desktop\maven-lib 目录下),然后使用 mvn 命令下载工程所有依赖的 jar 包
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
7
b) idea 中使用 maven 打包工程
c) 测试执行该 jar 包(在两种环境下测试)
方案一:推荐使用 * 通配符,将所有依赖加入到 classpath 中,不可使用 *.jar的方式。
尖叫提示:如果是在 Linux 中测试运行,注意文件夹之间的分隔符。自己的工程要单独在 cp 中指定,不要直接放在依赖的 /lib 目录下(即在 Linux 环境下,工程 ct_consumer-1.0-SNAPSHOT.jar 与所依赖的 jar 不能放在同一的目录中)。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
8
方案二:最最推荐,使用 java.ext.dirs 参数将所有依赖的目录添加进 classpath 中。
注意:在 Linux 环境下:-Djava.ext.dirs=属性后边的路径必须使用绝对路径。
/**
* 根据传入的时间区间,在此范围内随机产生通话建立的时间
* 公式:startDate.getTime() + (endDate.getTime() - startDate.getTime()) * Math.random()
*
* @param startTime
* @param endTime
* @return
*/
public String randomBuildTime(String startTime, String endTime) {
try {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
Date startDate = sdf1.parse(startTime);
Date endDate = sdf1.parse(endTime);
if (endDate.getTime()
return null;
}
long randomTS = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTS);
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String resultTimeString = sdf2.format(resultDate);
return resultTimeString;
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
9
方案三:不推荐,将所有依赖的 jar 包直接以绝对路径的方式添加进 classpath 中,以下 为 windows 中的示例,linux 中需要把分号替换为冒号。
/**
* 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360
*/
public String productLog() {
String caller = null;
String callee = null;
String callerName = null;
String calleeName = null;
// 随机获取主叫手机号
int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
caller = phoneList.get(callerIndex);
callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号
while (true) {
int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
callee = phoneList.get(calleeIndex);
calleeName = phoneNameMap.get(callee);
if (!caller.equals(callee)) {
break;
}
}
// 随机获取通话建立的时间
String buildTime = randomBuildTime(startTime, endTime);
// 随机获取通话的时长
DecimalFormat df = new DecimalFormat("0000");
String duration = df.format((int) (30 * 60 * Math.random()));
StringBuilder sb = new StringBuilder();
sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);
return sb.toString();
// System.out.println(caller + "," + callerName + "," + callee + "," + calleeName + "," + buildTime + "," + duration);
}
0
3.2.5、编写代码:优化数据存储方案
现在我们要使用 HBase 查找数据时,尽可能的使用 rowKey 去精准的定位数据位置,而非使用 ColumnValueFilter 或者 SingleColumnValueFilter,按照单元格 Cell 中的 Value 过滤数据,这样做在数据量巨大的情况下,效率是极低的!如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非 ColumnValueFilter 就无用武之地。现在,我们将使用协处理器,将数据一分为二。
思路:
编码:
1) 新建协处理器类:CalleeWriteObserver,并覆写 postPut() 方法,该方法会在数据成功插入之后被回调
协处理器的使用步骤:
在执行代码之前,我们先手动删除 hbase 上的表 和 命名空间,命令如下:
/**
* 生产数据的形式:13651311090,18611213803,2017-10-17 08:15:20,0360
*/
public String productLog() {
String caller = null;
String callee = null;
String callerName = null;
String calleeName = null;
// 随机获取主叫手机号
int callerIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
caller = phoneList.get(callerIndex);
callerName = phoneNameMap.get(caller);
// 随机获取被叫手机号
while (true) {
int calleeIndex = (int) (Math.random() * phoneList.size()); // [0, 20)
来源【首席数据官】,更多内容/合作请关注「辉声辉语」公众号,送10G营销资料!
版权声明:本文内容来源互联网整理,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 jkhui22@126.com举报,一经查实,本站将立刻删除。