o Xf'+@s&dZddlZddlZddlZddlmZmZddlZddlZddl Z ddl Z ddl m Z m Z ddlZddlmZmZddlmZddlZddlZddlZddlmZmZmZmZddlmZed d ZGd d d eZ ej!Gd ddZ"ddZ#GdddZ$eZ%e%&ddefddZ'e%&dddZ(e%&dddZ)e%&ddefddZ*e%&d defd!d"Z+e%&d#defd$d%Z,e%&d&defd'd(Z-e%.d)defd*d(Z-d+d,Z/e0d-kre/\Z1Z2e1j3rej4e%e1j5e1j6d.e j7d/e j7d0d1dSej4e%e1j5e1j6d.d2dSdS)3zQ A controller manages distributed workers. It sends worker addresses to clients. N)Enumauto)ListUnion)FastAPIRequest)StreamingResponse) CONTROLLER_HEART_BEAT_EXPIRATIONWORKER_API_TIMEOUT ErrorCodeSERVER_ERROR_MSG) build_logger controllerzcontroller.logc@s$eZdZeZeZeddZdS)DispatchMethodcCs$|dkr|jS|dkr|jStd)Nlotteryshortest_queuezInvalid dispatch method)LOTTERYSHORTEST_QUEUE ValueError)clsnamerM/mnt/public/algm/yuantao_home/workspace/fastchat/fastchat/serve/controller.pyfrom_str&s zDispatchMethod.from_strN)__name__ __module__ __qualname__rrr classmethodrrrrrr"s rc@s:eZdZUeeed<eed<eed<eed<eed<dS) WorkerInfo model_namesspeed queue_lengthcheck_heart_beatlast_heart_beatN)rrrrstr__annotations__intboolrrrrr0s  rcCs tt|qN)timesleepr "remove_stale_workers_by_expiration)rrrrheart_beat_controller9s r,c@seZdZdefddZdededefddZdefd d Zdefd d Z d dZ ddZ defddZ dede fddZddZddZddZddZdd Zd!S)" Controllerdispatch_methodcCs2i|_t||_tjt|fd|_|jdS)N)targetargs) worker_inforrr. threadingZThreadr,Zheart_beat_threadstart)selfr.rrr__init__@s  zController.__init__ worker_namer" worker_statuscCs||jvrtd|ntd||s||}|s!dSt|d|d|d|t|j|<td|d|d S) NzRegister a new worker: zRegister an existing worker: Frr r!zRegister done: , T)r1loggerinfoget_worker_statusrr))r4r6r"r7rrrregister_workerJs   zController.register_workerc Csz tj|ddd}Wn tjjy+}ztd|d|WYd}~dSd}~ww|jdkr>td|d|dS|S)N/worker_get_status)timeoutzGet status fails: r8)requestspost exceptionsRequestExceptionr9errorZ status_codejson)r4r6rerrrr;bs zController.get_worker_statuscCs |j|=dSr()r1)r4r6rrr remove_workeros zController.remove_workercCsFt|j}i|_|D]\}}|||jds td|q dS)NzRemove stale worker: )dictr1itemsr<r"r9r:)r4Zold_infow_namew_inforrrrefresh_all_workersrs zController.refresh_all_workerscCs.t}|jD] \}}||jqt|Sr()setr1rKupdaterlist)r4rrLrMrrr list_modelszszController.list_models model_namec CsR|jtjkrOg}g}|jD]\}}||jvr#||||jqtj |tj d}t |}|dkr7dS||} tj j tt||d}||}|S|jtjkrg}g} |jD]\}}||jvru||| |j|jq^t|dkr~dSt| } || }|j|jd7_td|d | d ||Std |j) N)Zdtypeg-C6?T)prznames: z, queue_lens: z, ret: zInvalid dispatch method: )r.rrr1rKrappendr npZarrayZfloat32sumZrandomZchoiceZarangelenr;rIrr!Zargminr9r:r) r4rSZ worker_namesZ worker_speedsrLrMZnormZptr6Z worker_qlenZ min_indexrrrget_worker_addresssF          zController.get_worker_addressr!cCsN||jvrtd|dS||j|_t|j|_td|dS)NzReceive unknown heart beat. FzReceive heart beat. T)r1r9r:r!r)r#)r4r6r!rrrreceive_heart_beats  zController.receive_heart_beatcCsVtt}g}|jD]\}}|jr|j|kr||q |D]}||q!dSr()r)r r1rKr"r#rWrI)r4ZexpireZ to_deleter6rMrrrr+s   z-Controller.remove_stale_workers_by_expirationcCs2td|dttjd}t|dS)Nz no worker: modeltextZ error_code)r9r:r r ZCONTROLLER_NO_WORKERrFdumpsencode)r4paramsretrrrhandle_no_workers zController.handle_no_workercCs.td|ttjd}t|dS)Nzworker timeout: r^r`)r9r:r r ZCONTROLLER_WORKER_TIMEOUTrFrarb)r4Zworker_addressrdrrrhandle_worker_timeouts z Controller.handle_worker_timeoutcCsjt}d}d}|jD]}||}|dur(||d||d7}||d7}q tt|}|||dS)Nrrr r!)rr r!)rOr1r;rPsortedrQ)r4rr r!rLr7rrrworker_api_get_statuss     z Controller.worker_api_get_statusc cs||d}|s||Vztj|d|dtd}|jdddD] }|r,|dVq#WdStjjyJ}z ||VWYd}~dSd}~ww)Nr]/worker_generate_streamT)rFstreamr?Fr`)Zdecode_unicodeZ delimiter) r[rerArBr Z iter_linesrCrDrf)r4rcZ worker_addrZresponsechunkrHrrrworker_api_generate_streams(  z%Controller.worker_api_generate_streamN)rrrr$r5r'rJr<r;rIrNrRr[r&r\r+rerfrhrlrrrrr-?s(   5   r-z/register_workerrequestcs2|IdH}t|d|d|dddS)Nr6r"r7)rFrr<get)rmdatarrrr<s r<z/refresh_all_workerscst}dSr()rrNmodelsrrrrNs rNz /list_modelscst}d|iS)Nrq)rrRrprrrrRsrRz/get_worker_addresscs&|IdH}t|d}d|iS)Nr]Zaddress)rFrr[)rmroZaddrrrrr[sr[z/receive_heart_beatcs,|IdH}t|d|d}d|iS)Nr6r!exist)rFrr\)rmrorrrrrr\ sr\rics"|IdH}t|}t|Sr()rFrrlr)rmrc generatorrrrrl's rlr=cs tSr()rrhrmrrrrh.srhz/test_connectioncsdS)NZsuccessrrtrrrrh3scCs|t}|jdtdd|jdtdd|jdtddgdd |jd d d d d d|}td|t|j }||fS)Nz--hostZ localhost)typedefaultz--porti Rz--dispatch-methodrr)ruZchoicesrvz--sslZ store_trueFzOEnable SSL. Requires OS Environment variables 'SSL_KEYFILE' and 'SSL_CERTFILE'.)actionZrequiredrvhelpzargs: ) argparseZArgumentParserZ add_argumentr$r&Z parse_argsr9r:r-r.)Zparserr0rrrrcreate_controller8s( rz__main__r:Z SSL_KEYFILEZ SSL_CERTFILE)hostport log_levelZ ssl_keyfileZ ssl_certfile)r|r}r~)8__doc__ryZasyncioZ dataclassesenumrrrFZloggingosr)typingrrr2ZfastapirrZfastapi.responsesrZnumpyrXrAZuvicornZfastchat.constantsr r r r Zfastchat.utilsr r9rZ dataclassrr,r-ZapprBr<rNrRr[r\rlrhrnrzrr0rZsslrunr|r}environrrrrsp   E