IAST重构实录(一)

最近接了个新的坑,准备重构某个IAST的项目。由于以前针对集群和大数据的接入使用,都是对二次封装平台接触的多。然鹅这次需要自己上服务器完整趟坑,故而记录下流水账,给连续加的班留个纪念。

redis脏数据的问题

在接入redis集群时,开始测的时候只做了单元测试,处理时基本没遇到啥问题。

但在整体联动试运行时,发现原来服务器集群上留存有原来测试脚本和agent,在往redis集群里打脏数据,格式与现有的不一致,导致连连报错。

后来通过定位crontab和现有进程,找出了运行的几个测试脚本和agent,将它们干掉做了缓释。

redis集群配置问题

我们知道,redis集群上执行一些命令,如flushallkeys*等等,对线上环境可能会造成影响,所以一般会重命名,也就会用到rename-command

但是我搞的时候不太熟,结果整的无论是redis-cli练连上去,还是开发急脚本用库去连,都不好使会显示:command not found

所以后来我在redis配置文件里找到重命名的命令后,直接在任务函数里封装了一层远程cli命令,动态配置账密和rename以后的命令,引用配置来写死来执行危险操作(屏蔽了logging),如:

redis-cli -h x.x.x.x -p xxx -a xxx commandxxx

redis持久化异常

一个异常提示的是: MISCONF Redis is configured to save RDB snapshots...,这个其实就是持久化问题,一般不建议配置太长时间,该清理就清理,或者设置好expire时间。

还有个提示是redis达到max records还是啥玩意儿,提示的大概是 OOM command not allowed when used memory > 'maxmemory'

大概说是达到上限了,连接集群时,我尝试捕获错误也没捕获到。所以也不好根据这个来判定特征,对方只是直接拒绝连接了。

我处理这类情况一般是,针对键值配置好足额的expire时间,然后定期去判定dbszie,设定一个肩部峰值,暂定的是70%-80%左右。最后,再加上我之前cli版本的主动清理,事实上还是比较好处理这类问题的。

kafka rebalancing解决

注意,kafka如果读取时切换任务比较快,可以设置较短的时间避免rebalancing。

但是如果本身消费的时候,需要进行延时逻辑判定,或者需要等待kafka消息流等情况,就需要其他情况来控制了。

相关配置参数如下(python版本):

session_timeout_ms(会话池过期时间,维持活跃)
heartbeat_interval_ms(心跳时间,涉及rebalancing)
max_poll_interval_ms(总过期时间,设置过短容易rebalancing)

kafka时间段跳空后开

我遇到的情况是,同一个groupid的情况下,在kafka topicA读数据流,选取了一个权重最大的时间点作为锚点。

然后我又以此时间点,去kafka topicB读取数据存入redis缓存,作为基准分析数据。

等我回过头再去kafka topicA读数据流时,发现新读取的数据流时间点,居然比存入缓存的topicB时间更晚。

我仔细想了想,是不是因为在我读取kafka topicB去存的时候产生了延时,然后kafka topicA数据流的offset其实依然在跑。

但是我消费的时候,设置了auto commit和手动commit和close,kafka topicA的二次锚点还是会跳空比缓存数据后开,这个问题目前偶尔会跳出来,留待之后观察。

批量插入和消费问题

主要聊聊kafka消费和redis的读插,两个比较简单的问题,放在一起聊了。

kafka读取

本身kafka属于流数据,通过offset控制读取的点位,竞争消费的问题由groupid控制解决。

但是在竞争读取数据时,要知道redis缓存上限原来是按单进程进行窗口结算的。

我在开多线程以后,如果从某个线程读取窗口数据,mset压入redis集群触发存储告警上限。

这时候我们需要主动杀死线程,这样可能会丢失另外几个相当多的正在压入redis的数据。

解决方案的话,我这边考虑的是多起几个节点和或者线程,减少单个单线程和节点的数据集。

一旦停止,能通过少量时间的time-sleep来缓释节点的数据处理压力,也能避免max_poll_interval_ms or session_timeout_ms设置的时间过长的问题。

redis读写之殇

在取redis数据的时候,读的动作和速度肯定是延后于写的。本身这事儿就不应该用消息队列型存储来搞,在数据量堆积到时候读压力巨tm大。

实时规则处理可以用spark或者flink框架,离线的话完全可以采用hive(kafka2hive)存储kafka结果,定时拉取表对比生成临时hive表,再用脚本规则过滤去输出结果。

这里因为一些限制借助不了大数据平台的能力,也不能指望我为一个项目搞个一套平台框架。

所以呢,还是沿用的原来的思路,借助redis缓存,形成时间窗口。

然后依据单个kafka数据的index_id和时间点作为锚定标准,去对比分析另一个kafka数据流,最后完成阶段性循环。

但是在读redis的时候,批量mget数据意义不大,貌似提速不了多少。

我这边是根据两个kafka队列的index_id去做判定的。

如果一次性mget多个数据,通过多线程去做index_id的关联逻辑判定时,如果规则判定本身不怎么耗时的话,你会发现并没有快多少。

所以结论是,大头还是落在kafka读取的压力上,取了以后就能去做分布式匹配。

结语

本小节只简单聊了下之前踩过的准备坑,针对IAST本身没有聊太多,后面的续篇会简单介绍下。