小米14即将推出高通骁龙8 Gen3,与之前的8系列旗舰芯片有点不同
06-21
1.概述公司线下数据仓库的现状,数据仓库部门每天凌晨之后才处理昨天的线上业务数据,所以业务人员只能看到下一次的报表日,数据为T -1,因此数据具有滞后性。尤其是在互联网金融公司,业务人员需要对信用风险进行管理,及时调整一些风控规则和策略。
不过效果并不能立即看到,需要等到第二天。天才可以看到调整的效果,所以需要实时数据仓库。
线上业务数据基本存储在Mysql和MongoDB数据库中,因此实时数据仓库将基于这两个工作流程来实现。本文重点介绍基于MongoDB的实时数据仓库的架构。
由于线上MongoDB是Sharding模式,规模中等,由于数据量比较大,集群的IO一直处于高负载状态,查询功能无法真正开放给业务人员-时间查询。期间,由于某业务部分的查询条件键值不正确,导致全库扫描(COLLSCAN),导致业务出现大量Slow-Query。
因此,在线集群不再提供个人查询需求。根据目前的情况,我们基础设施部门研究并提出了基于MongoDB的实时数据仓库的技术解决方案。
2、具体实现步骤 2.1 架构图 a) 架构图中的“绿”线是为风控业务人员提供实时查询策略效果的流程图。由于服务器资源有限,从在线MongoDB-Sharding到离线MongoDB-RS(副本)实时同步,所以不可能保存所有数据,而且保存数据的有效期也有限制。
早期实施规划中,实时数据默认保留14天(需要在离线mongodb数据库的数据表中添加过期索引) b) 架构图 提供了中间的“蓝色”线实时数据仓库并保留历史数据。 2.2 Debezium CDC实现流程 mongodb同步工具:mongo-kafka官方提供的jar包,有Source和Sink功能,但不支持CDC。
无法从在线MongoDB库同步到离线MongoDB库。最初选择Confluence工具是因为它集成了多个同步组件,是目前比较流行的同步工具。
它也是一个可靠的高性能流处理平台。但由于MongoDB同步需求的变化,需要选择一款支持CDC的同步工具——Debezium。
Debezium-MongoDB 连接器可以监视 MongoDB 副本集或 MongoDB 分片集群中的数据库和集合中的文档更改,并将这些更改作为事件记录在 Kafka 主题中。连接器自动处理分片集群中分片的添加或删除、每个副本集的成员资格更改、每个副本集中的选举以及通信问题的待解决。
当前方案:使用 Debezium Souce 将 mongo 数据同步到 Kafka,然后使用 Mongo-Kafka Sink 功能将 Kafka 数据同步到离线 MongoDB 库。这不仅可以解决Kafka从数据仓库实时读取的问题,还可以解决政审部门查询离线MongoDB库的问题。
2.2.1 工具集成代码语言:javascript 复制 1)下载源代码 地址:业务需求 为每一条更新/删除数据记录添加oid标识,提供数据仓库的可追溯性。 3) 要实现该方法,请打开 debezium/RecordMakers.java::createRecords() 并添加 value.put("objectid", objId); 4)编译命令:mvn install -pl debezium-connector-mongodb -Ddocker.skip.build=true -Ddocker.skip.run=true -DskipITs=true5)构建新的docker镜像并复制编译好的包:debezium-connector- mongodb/target/debezium-connector-mongodb-0.10.0.Final.jar 到 debezium/connect: 0.10 Docker 容器内。
重新提交并推送到测试环境。 6) 封装Sink函数。
将编译好的Mongo-Kafka jar包(mongo-kafka-0.3-SNAPSHOT-all.jar)复制到debezium/connect:0.10 Docker容器中的/kafka/connect/mongodb-kafka-connect目录下。需要提前创建mongodb-kafka-connect目录。
重新提交并将映像推送到测试环境。7)容器中的目录结构 [kafka@deb-connect ~]$ ls -l connect/total 8drwxr-xr-x 1 kafka kafka 52 Dec 1 16:18 debezium-connector-mongodbdrwxr-xr-x 1 kafka kafka Oct 2 00 :52 debezium-connector-mysqldrwxr-xr-x 1 kafka kafka 10 月 2 日 00:52 debezium-connector-oracledrwxr-xr-x 1 kafka kafka 10 月 2 日 00:52 debezium-connector-postgresdrwxr-xr-x 1 kafka kafka 10 月2 00:52 debezium-connector-sqlserverdrwxrwxr-x 1 kafka kafka 46 Nov 28 08:27 mongodb-kafka-connect 复制代码 2.2.2 Debezium在线部署代码语言:javascript 复制 # 由于需要提供Source和Sink功能,根据同步库的数量,适当增加Docker的数量,保证任务的正常高效执行。
基于相同GROUP_ID的集群支持负载均衡。默认数据格式为:Avro。
# 依赖的环境变量如下: GROUP_ID: "DW-MongoToKafka" KAFKA_HEAP_OPTS: "-Xms2G -Xmx8G" SERVICE_3_NAME: "dw-mongo-connect" SANITIZE_FIELD_NAMES: "true" CONNECT_PRODUCER_MAX_REQUEST_SIZE: KAFKA_PRODUCER_MAX_REQUEST_SIZE: STATUS_STORAGE_TOP IC:“debezium_connect_status” CONFIG_STORAGE_TOPIC:“debezium_connect_configs” OFFSET_STORAGE_TOPIC:“debezium_connect_offsets” KEY_CONVERTER:“io.confluence.connect.avro.AvroConverter” VALUE_CONVERTER:“io.confluence.connect.avro.AvroConverter” INTERNAL_KEY_CONVERTER:“org.apache.kafka.connect.json.JsonConverter” INTERNAL_VALUE_CONVERTER:“org .apache.kafka.connect.json.JsonConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: " CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: " BOOTSTRAP_SERVERS: "dn5.infra.app:, dn6.infra.app:, dn7.infra.app: "复制代码 2.2.3 创建源连接器 代码语言: javascript 复制 # 使用API??创建源连接器,实现MongoDB-Sharding数据实时同步到Kafka Topiccurl -X POST -H "Content-Type : application/ json" --data'{ "name": "debezium-source-table name", "config": { "connector.class":"io.debezium.connector.mongodb.MongoDbConnector", "sanitize.field .names" :"true", "tasks.max":"1", "mongodb.hosts":"mongos 地址:端口", "mongodb.user":"用户名", "mongodb.password":"密码" , " mongodb.name":"datawarehouse.mongo.debezium", "database.whitelist":"库名称", "collection.whitelist":"库名称.表名称", "max.request.size":"" , " database.history.kafka.bootstrap.servers":"dn5.infra.app:" }}' 创建 Sink Connector 代码语言: javascript copy # 使用 API 创建 Sink Connector,实现Kafka数据实时增量同步到离线MongoDB-RS库curl -X POST -H "Content-Type: application/json" --data'{ "name": "debezium-sink-table name", "config": { "tasks.max":"1", "database ":"目标库", "topics":"填写源连接器同步的主题", "connection.uri":"用户名:密码@IP:PORT/库名", "collection": “表名称”,“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,“change.data.capture.handler”:“com.mongodb.kafka.connect.sink.cdc.debezium.mongodb。 MongoDbHandler" }}' 主题数据保留时间 代码语言: javascript copy # 由于kafka服务器存储有限,请根据业务数据需求修改主题。
保留过期时间为3天 kafka-topics --zookeeper zk 地址: --alter --topic TopicName --config retention.ms=0 复制代码 2.2.6 检查 Debezium 同步数据的效果 A) 查看 Prometheus 监控的 Dashboard kafka B) 查看离线MongoDB-RS库下的数据 2.2.7 提问&记录 代码语言:javascript 复制 # 由于在线Mongo-Sharding集群对DataBase有严格的权限管理,所以创建连接器后,通常会出现权限拒绝的问题。错误信息如下 [11-30 16:49:52, ERROR MongoDB|datawarehouse.mongo.debezium|confrs Error while attempts to get oplogposition: ExceptionauthentiatingMongoCredential{mechanism=SCRAM-SHA-1, userName='synchronization user',source='admin',password=
" max.request.size ":"" 修改为16M2.3,连接Presto这一步比较简单,根据presto官方提供的配置说明2.3.1,添加配置文件代码语言:javascript copy #创建mongodb.propertiesconnector.name=etc/catalog下的mongodbmongodb.seeds=IP:7mongodb.credentials=用户名:密码@库名mongodb.schema-collection=presto_mongomongodb.socket-keep-alive=true复制代码2.3.2重启presto代码语言: javascript 复制 bin/launcher stopbin/launcher start 复制代码 2.3.3 问题&记录 问题:连接mongo从presto读取数据时,发现所有字段没有显示? 解决方案:查询mongo库中的schema数据并发现有些字段值缺失。登录mongo手动更新schema数据,添加指定的domain值的显示定义为varchar类型。
修改前 修改后 2.4 连接 SuperSet 打开 superset 界面,选择添加数据源,打开 SQL 编辑器,即可实时查询 mongo 数据 3.准实时报表 结构图“蓝色”线实现过程比较简单。基于Flume对接Kafka并写入Hive,这是数据仓库平台上的定时任务。
实现比较简单,数据实时同步。但基于数据仓库的特点,无法做到分钟级的报表,但是可以做到。
到每小时的水平。如果需要准实时的报表,则需要基于Druid或者Kylin等分析引擎来处理数据。
这个解决方案将在后面的博文中介绍。4.总结在实现mongodb实时数仓架构的过程中,由于环境的不同,在部署过程中会遇到很多问题,但是不用害怕。
正是因为这些问题,才能让你对各个模块的内部实现原理和实现有更深入的了解。机制,耐心点,最终会解决的。
另外,上述基于MongoDB的实时数据仓库架构并不是最优的。主要是根据公司目前的业务结构以及各种系统、网络等环境的限制,在研究的基础上提出的实时解决方案。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-17
06-18
06-18
06-18
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用