基于MongoDB的实时数据仓库实现

发布于:2024-10-24 编辑:匿名 来源:网络

1.概述公司线下数据仓库的现状,数据仓库部门每天凌晨之后才处理昨天的线上业务数据,所以业务人员只能看到下一次的报表日,数据为T -1,因此数据具有滞后性。尤其是在互联网金融公司,业务人员需要对信用风险进行管理,及时调整一些风控规则和策略。

不过效果并不能立即看到,需要等到第二天。天才可以看到调整的效果,所以需要实时数据仓库。

线上业务数据基本存储在Mysql和MongoDB数据库中,因此实时数据仓库将基于这两个工作流程来实现。本文重点介绍基于MongoDB的实时数据仓库的架构。

由于线上MongoDB是Sharding模式,规模中等,由于数据量比较大,集群的IO一直处于高负载状态,查询功能无法真正开放给业务人员-时间查询。期间,由于某业务部分的查询条件键值不正确,导致全库扫描(COLLSCAN),导致业务出现大量Slow-Query。

因此,在线集群不再提供个人查询需求。根据目前的情况,我们基础设施部门研究并提出了基于MongoDB的实时数据仓库的技术解决方案。

2、具体实现步骤 2.1 架构图 a) 架构图中的“绿”线是为风控业务人员提供实时查询策略效果的流程图。由于服务器资源有限,从在线MongoDB-Sharding到离线MongoDB-RS(副本)实时同步,所以不可能保存所有数据,而且保存数据的有效期也有限制。

早期实施规划中,实时数据默认保留14天(需要在离线mongodb数据库的数据表中添加过期索引) b) 架构图 提供了中间的“蓝色”线实时数据仓库并保留历史数据。 2.2 Debezium CDC实现流程 mongodb同步工具:mongo-kafka官方提供的jar包,有Source和Sink功能,但不支持CDC。

无法从在线MongoDB库同步到离线MongoDB库。最初选择Confluence工具是因为它集成了多个同步组件,是目前比较流行的同步工具。

它也是一个可靠的高性能流处理平台。但由于MongoDB同步需求的变化,需要选择一款支持CDC的同步工具——Debezium。

Debezium-MongoDB 连接器可以监视 MongoDB 副本集或 MongoDB 分片集群中的数据库和集合中的文档更改,并将这些更改作为事件记录在 Kafka 主题中。连接器自动处理分片集群中分片的添加或删除、每个副本集的成员资格更改、每个副本集中的选举以及通信问题的待解决。

当前方案:使用 Debezium Souce 将 mongo 数据同步到 Kafka,然后使用 Mongo-Kafka Sink 功能将 Kafka 数据同步到离线 MongoDB 库。这不仅可以解决Kafka从数据仓库实时读取的问题,还可以解决政审部门查询离线MongoDB库的问题。

2.2.1 工具集成代码语言:javascript 复制 1)下载源代码 地址:业务需求 为每一条更新/删除数据记录添加oid标识,提供数据仓库的可追溯性。 3) 要实现该方法,请打开 debezium/RecordMakers.java::createRecords() 并添加 value.put("objectid", objId); 4)编译命令:mvn install -pl debezium-connector-mongodb -Ddocker.skip.build=true -Ddocker.skip.run=true -DskipITs=true5)构建新的docker镜像并复制编译好的包:debezium-connector- mongodb/target/debezium-connector-mongodb-0.10.0.Final.jar 到 debezium/connect: 0.10 Docker 容器内。

重新提交并推送到测试环境。 6) 封装Sink函数。

将编译好的Mongo-Kafka jar包(mongo-kafka-0.3-SNAPSHOT-all.jar)复制到debezium/connect:0.10 Docker容器中的/kafka/connect/mongodb-kafka-connect目录下。需要提前创建mongodb-kafka-connect目录。

重新提交并将映像推送到测试环境。7)容器中的目录结构 [kafka@deb-connect ~]$ ls -l connect/total 8drwxr-xr-x 1 kafka kafka 52 Dec 1 16:18 debezium-connector-mongodbdrwxr-xr-x 1 kafka kafka Oct 2 00 :52 debezium-connector-mysqldrwxr-xr-x 1 kafka kafka 10 月 2 日 00:52 debezium-connector-oracledrwxr-xr-x 1 kafka kafka 10 月 2 日 00:52 debezium-connector-postgresdrwxr-xr-x 1 kafka kafka 10 月2 00:52 debezium-connector-sqlserverdrwxrwxr-x 1 kafka kafka 46 Nov 28 08:27 mongodb-kafka-connect 复制代码 2.2.2 Debezium在线部署代码语言:javascript 复制 # 由于需要提供Source和Sink功能,根据同步库的数量,适当增加Docker的数量,保证任务的正常高效执行。

基于相同GROUP_ID的集群支持负载均衡。默认数据格式为:Avro。

# 依赖的环境变量如下: GROUP_ID: "DW-MongoToKafka" KAFKA_HEAP_OPTS: "-Xms2G -Xmx8G" SERVICE_3_NAME: "dw-mongo-connect" SANITIZE_FIELD_NAMES: "true" CONNECT_PRODUCER_MAX_REQUEST_SIZE: KAFKA_PRODUCER_MAX_REQUEST_SIZE: STATUS_STORAGE_TOP IC:“debezium_connect_status” CONFIG_STORAGE_TOPIC:“debezium_connect_configs” OFFSET_STORAGE_TOPIC:“debezium_connect_offsets” KEY_CONVERTER:“io.confluence.connect.avro.AvroConverter” VALUE_CONVERTER:“io.confluence.connect.avro.AvroConverter” INTERNAL_KEY_CONVERTER:“org.apache.kafka.connect.json.JsonConverter” INTERNAL_VALUE_CONVERTER:“org .apache.kafka.connect.json.JsonConverter" CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: " CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: " BOOTSTRAP_SERVERS: "dn5.infra.app:, dn6.infra.app:, dn7.infra.app: "复制代码 2.2.3 创建源连接器 代码语言: javascript 复制 # 使用API??创建源连接器,实现MongoDB-Sharding数据实时同步到Kafka Topiccurl -X POST -H "Content-Type : application/ json" --data'{ "name": "debezium-source-table name", "config": { "connector.class":"io.debezium.connector.mongodb.MongoDbConnector", "sanitize.field .names" :"true", "tasks.max":"1", "mongodb.hosts":"mongos 地址:端口", "mongodb.user":"用户名", "mongodb.password":"密码" , " mongodb.name":"datawarehouse.mongo.debezium", "database.whitelist":"库名称", "collection.whitelist":"库名称.表名称", "max.request.size":"" , " database.history.kafka.bootstrap.servers":"dn5.infra.app:" }}' 创建 Sink Connector 代码语言: javascript copy # 使用 API 创建 Sink Connector,实现Kafka数据实时增量同步到离线MongoDB-RS库curl -X POST -H "Content-Type: application/json" --data'{ "name": "debezium-sink-table name", "config": { "tasks.max":"1", "database ":"目标库", "topics":"填写源连接器同步的主题", "connection.uri":"用户名:密码@IP:PORT/库名", "collection": “表名称”,“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,“change.data.capture.handler”:“com.mongodb.kafka.connect.sink.cdc.debezium.mongodb。 MongoDbHandler" }}' 主题数据保留时间 代码语言: javascript copy # 由于kafka服务器存储有限,请根据业务数据需求修改主题。

保留过期时间为3天 kafka-topics --zookeeper zk 地址: --alter --topic TopicName --config retention.ms=0 复制代码 2.2.6 检查 Debezium 同步数据的效果 A) 查看 Prometheus 监控的 Dashboard kafka B) 查看离线MongoDB-RS库下的数据 2.2.7 提问&记录 代码语言:javascript 复制 # 由于在线Mongo-Sharding集群对DataBase有严格的权限管理,所以创建连接器后,通常会出现权限拒绝的问题。错误信息如下 [11-30 16:49:52, ERROR MongoDB|datawarehouse.mongo.debezium|confrs Error while attempts to get oplogposition: ExceptionauthentiatingMongoCredential{mechanism=SCRAM-SHA-1, userName='synchronization user',source='admin',password=,mechanismProperties={}} [io.debezium.connector.mongodb.Replicator]com.mongodb.MongoSecurityException:验证 MongoCredential{mechanism=SCRAM-SHA-1,userName 时出现异常='Sync User',source='admin',password=,mechanismProperties={}}]使用Debezium Source连接器同步Mongo-sharding数据时,需要启用的权限为:admin的读取权限mongos进入后的库 mongos> show users ;{"_id" : "admin.Sync user","userId" : UUID("fb11-c41b8-8a9f-9bac30c28"),"user" : "Sync user","db" :“管理员”,“角色”:[{“角色”:“读取”,“db”:“风险”},{“角色”:“读取”,“db”:"admin"},{"role" : "read","db" : "config"}],"mechanisms" : ["SCRAM-SHA-1","SCRAM-SHA-"]}输入每个副本 Next ,为管理员和本地库创建读取权限s5rs:PRIMARY> 显示用户;{"_id" : "admin.Sync user","userId" : UUID("b99bddc9c-4f67-b78d63c1"),"user" : "同步用户","db" : "admin" ,“角色”:[{“角色”:“读取”,“数据库”:“本地”},{“角色”:“读取”,“数据库”:“管理员”}],“机制”:[“SCRAM” -SHA-1","SCRAM-SHA-"]} 使用 Mongo-Kakfa Sink 连接器离线操作 Mongodb 时,需要启用权限:riskPoolRs:PRIMARY> show users;{"_id" : "risk.synchronization users" ," userId" : UUID("9f5ef-af-8b54fea2"),"user" : "同步用户","db" : "库名称","roles" : [{"role" : "readWrite","db " : "risk"},{"role" : "read","db" : "admin"},{"role" : "clusterAdmin","db" : "admin"}],"mechanisms" : ["SCRAM-SHA-1","SCRAM-SHA-"]} 复制代码 debezium源连接器同步数据大小限制默认在1M以内,同步mongo大数据时需要修改该参数。

" max.request.size ":"" 修改为16M2.3,连接Presto这一步比较简单,根据presto官方提供的配置说明2.3.1,添加配置文件代码语言:javascript copy #创建mongodb.propertiesconnector.name=etc/catalog下的mongodbmongodb.seeds=IP:7mongodb.credentials=用户名:密码@库名mongodb.schema-collection=presto_mongomongodb.socket-keep-alive=true复制代码2.3.2重启presto代码语言: javascript 复制 bin/launcher stopbin/launcher start 复制代码 2.3.3 问题&记录 问题:连接mongo从presto读取数据时,发现所有字段没有显示? 解决方案:查询mongo库中的schema数据并发现有些字段值缺失。登录mongo手动更新schema数据,添加指定的domain值的显示定义为varchar类型。

修改前 修改后 2.4 连接 SuperSet 打开 superset 界面,选择添加数据源,打开 SQL 编辑器,即可实时查询 mongo 数据 3.准实时报表 结构图“蓝色”线实现过程比较简单。基于Flume对接Kafka并写入Hive,这是数据仓库平台上的定时任务。

实现比较简单,数据实时同步。但基于数据仓库的特点,无法做到分钟级的报表,但是可以做到。

到每小时的水平。如果需要准实时的报表,则需要基于Druid或者Kylin等分析引擎来处理数据。

这个解决方案将在后面的博文中介绍。4.总结在实现mongodb实时数仓架构的过程中,由于环境的不同,在部署过程中会遇到很多问题,但是不用害怕。

正是因为这些问题,才能让你对各个模块的内部实现原理和实现有更深入的了解。机制,耐心点,最终会解决的。

另外,上述基于MongoDB的实时数据仓库架构并不是最优的。主要是根据公司目前的业务结构以及各种系统、网络等环境的限制,在研究的基础上提出的实时解决方案。

基于MongoDB的实时数据仓库实现

站长声明

版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

标签:

相关文章

  • 小米14即将推出高通骁龙8 Gen3,与之前的8系列旗舰芯片有点不同

    小米14即将推出高通骁龙8 Gen3,与之前的8系列旗舰芯片有点不同

    虽然在本次高通骁龙峰会上,PC芯片骁龙X Elite抢了很多骁龙8 Gen3的风头,但安卓手机唯一的旗舰芯片,加上小米14系列即将推出以及性能限制的解除,也帮助骁龙8 Gen3重新获得了人气。 步伐先进,人工智能主导。 今年的骁龙8 Gen3确实与之前的8系列旗舰芯片有些不同。 今年,高

    06-21

  • 新消息!成大生物、中数智汇科创板IPO提交注册

    新消息!成大生物、中数智汇科创板IPO提交注册

    4月7日,获悉,辽宁成大生物科技股份有限公司、北京中数智汇科技有限公司在科创板提交注册。 科创板IPO。

    06-17

  • 法律重回春天,房产投资价值愈发凸显 -歌斐地产4月观察

    法律重回春天,房产投资价值愈发凸显 -歌斐地产4月观察

    要点概述:根据中国人民银行4月份发布的城镇居民家庭资产负债调查报告,居民资产主要是实物资产,其中70%是住房;每户拥有1.02套住房,近60%的家庭拥有一套住房(因此未来改善需求将占主导地位);家庭负债率总体稳定在45%,但流动性不好(实物资产占比高,金融资产占比低)。

    06-18

  • Masdar赢得亚美尼亚另一个200MW太阳能项目

    Masdar赢得亚美尼亚另一个200MW太阳能项目

    阿联酋政府拥有的可再生能源公司Masdar将在亚美尼亚实施另一个MW太阳能项目。 该协议是在亚美尼亚总统阿尔门萨尔基相和马斯达尔首席执行官穆罕默德贾米尔拉马希会晤时达成的。 会议讨论了可再生能源、最新技术、科教领域的合作前景。 萨尔基相总统强调,在亚美尼亚可再生能源

    06-08

  • 关于 Google X 实验室的现在与未来,“登月队长”  Astro Teller 是这样说的

    关于 Google X 实验室的现在与未来,“登月队长” Astro Teller 是这样说的

    关于谷歌的现在和未来,我们都无法回避一个名字——Astro Teller。 这位被称为“登月船长”的传奇人物已在这个实验室与知名员工一起工作了五年。 泰尔的简历就像一个疯狂的科学家。 他在卡内基梅隆大学获得人工智能博士学位,创立了健康体征数据公司 Body Media,后被 Jawbon

    06-17

  • 神州云动cloudcc:利用CRM大数据技术帮助企业脱颖而出

    神州云动cloudcc:利用CRM大数据技术帮助企业脱颖而出

    在当前的“互联网+”时代,客户已经成为企业最宝贵的资源,也是决定企业生存的最关键因素。 为了争夺客户,许多企业使出浑身解数,利用一切营销渠道。 网络广告、移动营销、搜索引擎营销、网络视频等数字营销渠道已占据当今企业营销的主流。 然而,往往即使有这些营销努力,带

    06-18

  • 小红书和抖音上那些让你拥有“财务自由”的副业,如果赚不到钱,可能就要付出

    小红书和抖音上那些让你拥有“财务自由”的副业,如果赚不到钱,可能就要付出

    年轻人这次特别“摇摆”。 他们孤独、渴望爱情,但也害怕婚姻和养育孩子的压力;他们讨厌熟悉钓鱼,但又想利用业余时间经营副业;他们想要生活幸福,但他们会焦虑,深夜无法入睡。 年轻人很难,但困难并不是不收获的理由。 在这个“副业刚需”的时代,有人趁着年轻人的焦虑,

    06-21

  • 酒店云PMS厂商“绿云”完成1亿元D轮融资

    酒店云PMS厂商“绿云”完成1亿元D轮融资

    据投资界(ID:pedaily)3月9日消息,杭州绿云软件有限公司(以下简称“绿云”)绿云”)宣布完成D轮融资,金额为1亿元人民币。 本轮投资方为海港天成基金,由同程旅游(.HK)、海港集团和青岛市、区引导基金共同发起设立。 鲁云董事长及CEO杨铭魁表示:D轮融资将用于行业数字

    06-18

  • AI增强夜视公司“知未来”获数千万元A+轮融资

    AI增强夜视公司“知未来”获数千万元A+轮融资

    投资界(ID:pedaily)3月19日消息,AI增强夜视公司“知未来”近期完成数十轮融资数百万元A+轮融资,本轮融资由景泰资本投资。 融资资金将用于技术研发和全球市场开拓。 此前,智造未来已完成梅花创投、大辰资本、国宏嘉信等机构的多轮融资。 智富未来成立于2007年,主要技术

    06-18

  • 东车日报 理想L9官方外观图片发布-特斯拉、比亚迪再次涨价-丰田、日产部分工厂因地震停产

    东车日报 理想L9官方外观图片发布-特斯拉、比亚迪再次涨价-丰田、日产部分工厂因地震停产

    介绍 理想L9官方外观图片发布 AITO文杰M5四驱旗舰版将上市从北京车展公开订购时需显示ID。 Aero概念车、奔驰发布EQS SUV内饰、官图、上汽荣威发布SUV新车型:鲸鱼、特斯拉上调Model Y后驱版价格、比亚迪发布第二次调价,郭明錤解释:苹果车团队必须重组现代汽车在印尼建厂生产

    06-21

  • 三迭纪完成Pre-C轮1.5亿元融资,国信投资

    三迭纪完成Pre-C轮1.5亿元融资,国信投资

    投资圈(ID:pedaily)领投 据9月28日消息,南京三迭纪医药科技有限公司(以下简称“三迭纪”) :三迭纪)宣布完成1.5亿元Pre-C轮融资,主要用于加速3D打印药物管线的临床研究以及3D打印药物技术的商业化进程。 本轮融资由国信投资领投,高脉联合家族办公室及老股东东富龙科

    06-18

  • 招人“难”吗?用人“贵”吗?制造业就业的AI机会来了

    招人“难”吗?用人“贵”吗?制造业就业的AI机会来了

    为什么劳务行业没有自己的“海底捞”和“三只松鼠”? “事实上,他们抓住了内部管理之外的服务升级,创造了自己独特的品牌价值,以低廉的价格吸引了更多的客户,从而扩大了规模经济;在不断加大研发投入的同时,也提高了公司的核心竞争力。 服务升级、管理升级、品牌升级是劳

    06-18