京东:三季度归属母公司净亏损28亿元,去年同期净利润76亿元
06-18
背景互联网现在已经进入AI驱动业务发展的阶段。传统的机器学习开发流程基本上是以下步骤:数据收集->特征工程->模型训练->模型评估->并在线使用训练好的有效模型进行预测。
该方法主要存在两个瓶颈:模型更新周期慢,无法有效反映线上变化。最快的模型更新需要每小时级别,通常是每日级别甚至每周级别。
二是模型参数较小,预测效果较差;当模型参数较多时,在线预测需要大量内存,QPS无法保证。 一般来说,解决这些问题的方法有两种:一是使用On-line-learning算法,二是使用一些优化方法,在保证精度的情况下尽可能得到稀疏解,从而减少模型参数。
数量。传统的训练方法在模型训练上线后一般是静态的,与线上情况没有任何交互。
预测错误是添加的,只能在下次更新时修正,但这次更新一般需要很长时间。 现实中,为了及时响应市场变化,越来越多的企业开始选择在线学习的方式,直接处理流数据,实时训练,实时更新模型。
在线学习 在线学习算法的特点是:每来一个训练样本,就用样本产生的损失和梯度迭代一次模型,逐一进行训练。可以根据在线反馈数据实时快速调整模型。
这使得模型能够及时反映在线变化,提高在线预测的准确性。因此,它可以处理大数据量训练和在线训练。
常用的方法包括在线梯度下降(OGD)和随机梯度下降(SGD)。 Online Learning的优化目标是最小化整体损失函数,这需要快速求解目标函数的最优解。
逻辑回归经常用于在线学习和点击率。 FTRL(Follow-the-regularized-Leader)算法从理论研究到实际工程实现,谷歌花了三年(年年)时间。
在凸优化问题上表现非常好,例如具有非平滑正则化项(例如 1 范数、模型复杂性控制和稀疏化)的逻辑回归。 FTRL简介及FTRL的工程实现 FTR是FTRL的前身。
其思想是每次找到使之前所有样本的损失函数之和最小的参数。FTRL,即跟随正则化领导者,源自之前几部作品中经典的TG、OGD、L1-FOBOS和L1-RDA。
主要出发点是提高稀疏性并满足精度要求。基于FTL的优化目标,FTRL增加了正则化来防止过拟合。
FTRL的损失函数一般不容易求解。在这种情况下,一般需要找到代理损失函数。
代理损失函数需要满足以下条件:代理损失函数比较容易求解,最好有解析解。代理损失函数得到的解与原函数解的差异越小越好。
为了衡量条件2下两种解的差异,引入了寄存器的概念。如果一个在线学习算法能够保证其regret是t的次线性函数,那么随着训练样本数量的增加,在线学习到的模型将无限接近最优模型。
也就是说,随着训练样本数量的增加,代理损失函数计算出的参数与原始损失函数之间的实际损失值差异变得越来越小。毫不奇怪,FTRL 满足了这一特性。
另一方面,现实中也很看重稀疏性,即模型的稀疏性。拥有数亿个特征并不罕见。
模型越复杂,所需的存储和时间资源就越高。稀疏模型将大大减少预测的内存和复杂性。
另外,稀疏模型相对来说更容易解释,这也是通常所说的L1正则化的优点。逻辑回归下每坐标FTRL_Proximal的工程实现伪代码如下: 实现时,可以根据公式表达式进行一些变换,并在实际数据集上使用分布式并行加速。
四个参数的设置是根据论文中的指导和反复的实验测试,你可以找到一组适合你的问题的参数。上面所谓的每坐标是指FTRL针对w的每个维度分别进行训练和更新。
每个维度使用不同的学习率,也就是上面代码中lambda2之前的那一项。与对w的所有特征维度使用统一的学习率相比,该方法考虑了训练样本本身在不同特征上的不均匀分布。
如果包含w某维特征的训练样本很少,那么每个样本都是珍贵的。 ,那么这个特征维度对应的训练率就可以单独维持一个比较大的值。
每出现一个包含这个特征的样本,就可以让该样本的梯度前进一大步,而不用强制其他特征维度的前进。保持一致。
开源实现 FTRL目前有很多开源实现,包括多线程版本、基于参数服务器和MPI的分布式版本,可以运行在yarn等资源管理平台上。另外,据研究,一线互联网已经采用了实时计算引擎。
Flink的Alink实现了在线学习。例如: 带参数服务器的分布式 FM 和 LR:coding=utf-8import numpy as npclass LR(object): @staticmethod def fn(w, x): '''决策函数是 sigmoid 函数''' return 1.0 / (1.0 + np.exp(-w.dot(x))) @staticmethod def loss(y, y_hat): '''交叉熵损失函数''' return np.sum(np.nan_to_num(-y * np. log(y_hat) ) - (1 - y) * np.log(1 - y_hat))) @staticmethod def grad(y, y_hat, x): '''交叉熵损失函数相对于权重 w''' return (y_hat - y) * xclass FTRL(object): def __init__(self, dim, l1, l2, alpha, beta, DecisionFunc=LR): self.dim = dim self.decisionFunc = DecisionFunc self .z = np.zeros(dim) self.n = np.zeros(dim) self.w = np.zeros(dim) self.l1 = l1 self.l2 = l2 self.alpha = alpha self.beta = beta def预测(自我,x):返回自我.decisionFunc.fn(自我。
w, x) def update(self, x, y): self.w = np.array([0 if np.abs(self.z[i]) <= self.l1 else (np.sign( self.z [i]) * self.l1 - self.z[i]) / (self.l2 + (self.beta + np.sqrt(self.n[i])) / self.alpha) for i in xrange(self .dim)]) y_hat = self.predict(x) g = self.decisionFunc.grad(y, y_hat, x) sigma = (np.sqrt(self.n + g * g) - np.sqrt(self.n) )) / self.alpha self.z += g - sigma * self.w self.n += g * g return self.decisionFunc.loss(y, y_hat) def train(self, trainSet, verbos=False, max_itr= 0, eta=0.01, epochs=): itr = 0 n = 0 while True: 对于 trainSet 中的 x, y: loss = self.update(x, y) if verbos: print "itr=" + str(n) + "\tloss=" + str(loss) if loss < eta: itr += 1 else: itr = 0 if itr >= epochs: # 损失函数已连续epochs次迭代小于eta print "loss has less than", eta, " Continuous for ", itr, "iterations" return n += 1 if n >= max_itr: print "reach max iteration", max_itr returnclass TestData(object): def __init__(self, file, d): self.d = d self.file = file def __iter__(self): with open(self.file, 'r') as f_in: for line in f_in: arr = line.strip ().split() if len(arr) >= (self.d + 1): yield (np.array([float(x) for x in arr[0:self.d]]), float(arr[ self.d]))if __name__ == '__main__': d = 4 testData = TestData("train.txt", d) ftrl = FTRL(dim=d, l1=1.0, l2=1.0, alpha=0.1, beta =1.0) ftrl.train(testData, verbos=False, max_itr=00, eta=0.01, epochs=) w = ftrl.w 打印 w 正确 = 0 wrong = 0 for x, y in testData: y_hat = 1.0 if ftrl.predict(x) > 0.5 else 0.0 if y == y_hat: 正确 += 1 else: 错误 += 1 print "正确比率", 1.0 * 正确 / (正确+错误)基于Flink实现Alink是阿里巴巴基于实时计算引擎Flink开发的新一代机器学习算法平台。它是业界第一个同时支持批处理算法和流式算法的机器学习平台。
Alink提供在线学习。 Alink 中实现 FTRL 算法的主要流程如下: 具体代码实现逻辑如下: ● 建立特征处理管道,包括 StandardScaler 和 FeatureHasher,进行标准化缩放和特征哈希,最终得到特征向量代码语言: javascript copy Pipeline featurePipeline = new Pipeline().add(new StandardScaler().setSelectedCols(numericalColNames)).add(new FeatureHasher().setSelectedCols(selectedColNames).setCategoricalCols(categoryColNames).setOutputCol(vecColName).setNumFeatures( numHashFeatures));//拟合特征管道模型//构建特征工程管道PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);●这里准备数据集,用于构建kafka等流数据,实时切分得到原始数据训练数据和原始预测数据,代码语言:javascript copy // 准备流数据集 CsvSourceStreamOp data = new CsvSourceStreamOp().setFilePath(" 这里可以使用kafaka数据源 KafkaSourceStreamOp soure = new KafkaSourceStreamOp() .setBootstrapServers("localhost:" ) .setTopic("train_data_topic") .setStartupMode("EARLIEST") .setGroupId("");//对流数据源进行实时分割,得到原始训练数据和原始预测数据 SplitStreamOp splitter = new SplitStreamOp( ).setFraction(0.5). linkFrom(data); ● 训练逻辑回归模型作为FTRL算法的初始模型,这是系统冷启动所需的。
代码语言: javascript copy LogisticRegressionTrainBatchOp lr = new LogisticRegressionTrainBatchOp().setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true).setMaxIter(10);BatchOperator> initModel = featurePipelineModel.transform(trainBatchData).link(lr) ;● 基于初始模型进行FTRL在线训练;代码语言:javascript copy // 根据初始模型进行FTRL在线训练 FtrlTrainStreamOp model = new FtrlTrainStreamOp(initModel).setVectorCol(vecColName).setLabelCol(labelColName).setWithIntercept(true ).setAlpha(0.1).setBeta(0.1). setL1(0.01).setL2(0.01).setTimeInterval(10).setVectorSize(numHashFeatures).linkFrom(featurePipelineModel.transform(splitter));● 基于FTRL在线模型,连接预测数据进行预测; / 代码语言: javascript copy FtrlPredictStreamOp PredictResult = new FtrlPredictStreamOp(initModel) .setVectorCol(vecColName) .setPredictionCol("pred") .setReservedCols(new String[]{labelColName}) .setPredictionDetailCol("details" ) .linkFrom(model, featurePipelineModel .transform(splitter.getSideOutput(0)));● 评估预测结果流 代码语言: javascript copy // 评估预测结果流 PredictResult.link( new EvalBinaryClassStreamOp().setLabelCol(labelColName).setPredictionCol("pred") . setPredictionDetailCol( "details").setTimeInterval(10)).link(new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[]{"Statistics"}).setOutputCols(new String[]{"Accuracy" , " AUC", "ConfusionMatrix"}).setJsonPath(new String[]{".AUC", "$.ConfusionMatrixx"}) ) .print();StreamOperator.execute();开发时编译打包成jar包时我在做的过程中,遇到了两个问题。一是依赖包没有导入到jar中。
当提交到flink集群时,任务找不到相关的类。另外就是打包时包含了flink相关的包,导致flink出现问题。
lib中的jar包冲突,所以要注意maven打包,排除flink包,以免出错。部署flink集群代码语言:javascript copy wget -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0./ bin/start-cluster.sh 提交flink任务代码语言:javascript copy./ bin/flink run -p 1 -c org.example.FTRLExample FlinkOnlineProject-1.0-SNAPSHOT.jar 查看任务状态 将任务提交给 f链路聚类后,可以通过flink web ui查看任务状态。
一般情况下,如果运行在本地模式下,通过在浏览器中输入就可以看到提交到flink集群的所有状态,以及checkpoint、back压等,如下图所示。运行状态:工程优化就是内存优化。
内存节省主要分为两部分:预测和训练。可以使用以下方法: 预测期间节省内存:L1 范数加策略。
训练结果w非常稀疏,使用w进行预测时可以节省时间。内存训练时节省内存:在线丢弃训练数据中很少出现的特征(概率特征包含)、浮点数、对几个相似模型进行重新编码和训练,确保相关特征单值结构可以部分共享,并且多个模型共享一个共同的特征存储,并同时更新这个共同的特征结构。
利用正负样本的数量计算梯度并对训练集进行采样,选择更有价值的样本。总结 以上是对在线学习相关知识的总结和整理。
随着大数据时代的到来和人工智能的兴起,机器学习能够处理的场景更加广泛和多样。为了满足实时性要求,还需要对流数据进行实时直接预测。
在线培训已经成为一种趋势,国外也一直在探索相关的事情。目前,flink已经成为事实上的标准。
在线学习也得到了生产计算框架alink的支持。它利用flink丰富的连接器、可扩展的算子以及分布式部署能力,快速实现在线学习。
它已经被许多主要的互联网公司使用。 多语言支持,Flink是使用Java语言开发的。
根据目前的研究,除了基于 JPython 的 Python 语言支持之外,Flink 还没有针对其他语言的开发。除了Flink之外,还有一些公司为了性能而使用C++和LLVM来实现高性能。
通过Flink Native执行引擎等方式优化性能,通过Java调用底层C++等实现。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件 举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签:
相关文章
06-18
06-18
06-18
06-18
06-17
06-17
06-21
最新文章
【玩转GPU】ControlNet初学者生存指南
【实战】获取小程序中用户的城市信息(附源码)
包雪雪简单介绍Vue.js:开学
Go进阶:使用Gin框架简单实现服务端渲染
线程池介绍及实际案例分享
JMeter 注释 18 - JMeter 常用配置组件介绍
基于Sentry的大数据权限解决方案
【云+社区年度征文集】GPE监控介绍及使用