数据关联

在task之间进行有逻辑的并发,并善用该功能进行异步阻塞。

实际上,并发的方式虽然可以帮助我们提升效率,但是因为并发是无序的,这就使并发变得毫无逻辑。所谓百万并发也就仅限于处理没有逻辑的GET请求了。

为了解决这种问题,我们分别在task之间和event之间设立了数据关联和数据互补系统。

数据关联和数据互补,无一例外都是将一个task的返回值传送至另一个task中,然后该task继承这个返回值,参考其作为args和kwargs,执行自身的func。但是数据关联只可以在一个事件流内进行交换数据,数据互补可以将数据传输至指定事件流中

那么。我们开始吧。

数据关联

想要彻底掌握好数据关联,就必须了解其底层实现。

首先,如果两个task或多个task想要进行互相数据关联,就必须有一个它们返回值的载体。否则零零散散的几个task根本找不到对面在哪。而这个载体就是event了。事件流中设有returns字典和anonymousReturns序列,它们都用于在并发操作时接收task的返回值(如果它们的event成员正确指向了这个事件流的话)。然后task在进行传输数据时,可以通过event成员直接访问到自己所属的事件流,将自己的返回值经过returnFunc处理加入所属事件流的returns字典(建为task的名称)。另外一个task进行接收数据时,则可以直接通过刚才的方法,在returns中查找数据。而查找到需要的数据的最快方法,就是直接查找需要关联的task的键,这样就可以在字典中直接找到值。

在传输数据的task在returns字典中放入数据时,需要先执行自身的returnFunc对自身原本的返回值进行一次处理。传输完毕后,接收数据的task就先对其进行针对于自身的二次处理数据,然后将其最终得到的结果作为自己的args和kwargs对func进行传参,最后运行自身func。

传输方的task不需要任何成员来指明它要和某一个task进行关联,每一个有名字的task都会将自己的func返回值自动放入所属事件流的returns当中。

接收方的task用association成员来指代另一个task的名称,在开始并发时,先不断在self.event.returns中搜寻association这个键,搜寻到后,拿到数据,开始处理,然后运行函数。也就是说在传输方task函数还没执行完毕之前,接收方task都处于阻塞状态。

对于returnFunc和associationFunc的传参和返回,有着严格的规定:第一个参数是需要处理的数据,第二个参数是目前处理该参数的task的self。除此之外不能有其他的必填参数。这两个函数的返回值也有严格的规定。在处理数据时最终得出的接收方task的args和kwargs,用一个列表封装,格式为[args , kwargs](如果传输的数据可以在接收时被task二次处理,可以先不用这种格式,但二次处理时必须要)

通用流程

了解了数据关联的底层实现后,以下是常用的进行数据关联的流程代码。

import turingAPI     #引入turingAPI
user = turingAPI.icodeUser('a cookie')  #登录
ev = turingAPI.event()      #创建事件流

def data(value , self):    #task1的处理数据函数(处理API返回值)
    return [[value[0]['id'],'test'],{}]   #按照格式返回内容

turingAPI.addTask(ev,user.getPersonWorks,'task1',[user.info['userId'],1,1,True],{},data)
#增加第一个task,设置传参内容
turingAPI.addTask(ev,user.comment,'task2',association='task1')
#增加第二个task,与task1设置关联
ev.run()  #并发

值得一提的是,returnFunc也可以当做一个阻塞函数来使用。如,在returnFunc的函数中添加一个time.sleep(5)

在将returnFunc执行完毕后,task才会将最终得出的数据放入self.event.returns中。

Last updated