o :_d@s@ddlZddlZddlZddlZGdddZGdddZdS)Nc@sfeZdZdZdZdeddfddZddZd d Zd e fd d Z ddZ ddZ ddZ ddZdS)TimeoutIteratora Wrapper class to add timeout feature to synchronous iterators - timeout: timeout for next(). Default=ZERO_TIMEOUT i.e. no timeout or blocking calls to next. Updated using set_timeout() - sentinel: the object returned by iterator when timeout happens - reset_on_next: if set to True, timeout is reset to the value of ZERO_TIMEOUT on each iteration TimeoutIterator uses a thread internally. The thread stops once the iterator exhausts or raises an exception during iteration. Any exceptions raised within the wrapped iterator are propagated as it is. Exception is raised when all elements generated by the actual iterator before exception have been consumed Timeout can be set dynamically before going for iteration FTcCsR||_||_||_||_||_d|_d|_t|_ t j |j d|_ |j dS)NF)target) _iterator_timeout _sentinel_reset_on_next_raise_on_exception _interrupt_donequeueQueue_buffer threadingThread_TimeoutIterator__lookahead_threadstart)selfiteratortimeoutsentinel reset_on_nextraise_on_exceptionr./home/jon/h2ogpt/iterators/timeout_iterator.py__init__s zTimeoutIterator.__init__cC|jSNrrrrr get_sentinel$zTimeoutIterator.get_sentinelcC ||_dSrrrrrrrset_reset_on_next' z!TimeoutIterator.set_reset_on_nextrcCs ||_dS)z0 Set timeout for next iteration Nrrrrrr set_timeout*s zTimeoutIterator.set_timeoutcCs d|_dS)z interrupt and stop the underlying thread. the thread actually dies only after interrupt has been set and the underlying iterator yields a value after that. TNr r rrr interrupt0s zTimeoutIterator.interruptcC|Srrr rrr__iter__8zTimeoutIterator.__iter__cCs|jrt|j}z*z|j|jkr|jj|jd}n|j}Wn tjy)YnwW|j r2|j|_n|j r:|j|_wt |t rhd|_t |trJ|d t |j}tdt|t|fdd|jrf||S|S)z yield the result from iterator if timeout > 0: yield data if available. otherwise yield sentinal )rTzGeneration Failed: %s %s)flush)r StopIterationrr ZERO_TIMEOUTrgetr Emptyr isinstance BaseExceptionjoin traceback format_tb __traceback__printstrr )rdataexrrr__next__;s6     zTimeoutIterator.__next__c CsVz |jt|j|jrtqty*}z |j|WYd}~dSd}~wwr)rputnextrr r2r7)rerrr __lookahead`szTimeoutIterator.__lookaheadN)__name__ __module__ __qualname____doc__r3objectrr!r&floatr*r,r.r@rrrrrrs   %rc@sdeZdZdZdZdedfddZddZdd Zd e fd d Z d dZ ddZ ddZ ddZdS)AsyncTimeoutIteratorzW Async version of TimeoutIterator. See method documentation of TimeoutIterator rFcCsF||_||_||_||_d|_d|_t|_t | |_ dS)NF) rrrrr r asyncior rget_event_loop create_task _AsyncTimeoutIterator__lookahead_task)rrrrrrrrrps zAsyncTimeoutIterator.__init__cCrrrr rrrr!{r"z!AsyncTimeoutIterator.get_sentinelcCr#rr$r%rrrr&~r'z&AsyncTimeoutIterator.set_reset_on_nextrcCr#rr(r)rrrr*r'z AsyncTimeoutIterator.set_timeoutcCs d|_dSNTr+r rrrr,r'zAsyncTimeoutIterator.interruptcCr-rrr rrr __aiter__r/zAsyncTimeoutIterator.__aiter__cs|jrt|j}z2z|j|jkrt|j|jIdH}n|jIdH}Wn tj y2YnwW|j r;|j|_n|j rC|j|_wt |t rNd|_||SrQ) r StopAsyncIterationrrr3rLwait_forrr4 TimeoutErrorrr6r7)rr>rrr __anext__s,   zAsyncTimeoutIterator.__anext__c snz |jIdH}|j|IdH|jrtqty6}z|j|IdHWYd}~dSd}~wwr)rrVrrAr rSr7)rr>rCrrrrDs z AsyncTimeoutIterator.__lookaheadN)rErFrGrHr3rIrr!r&rJr*r,rRrVrOrrrrrKjs  rK)r rLrr9rrKrrrrs c