
    .Ph3                        d dl mZmZmZ d dlZd dlZd dlZd dlZd dl	Z	d dl
mZ ddlmZmZ ddlmZmZmZ 	 d dlZd dlZn# e$ r dZdZY nw xY we;e9d dlmZmZmZmZmZ d dlmZ d d	lmZ d d
l m!Z! 	 d dl m"Z# n# e$ r	 d dl$m"Z# Y nw xY wd Z% G d d          Z&d Z'd Z( G d d          Z)d Z* G d dee          Z+dS )    )absolute_importdivisionprint_functionN)uuid4   )*_retrieve_traceback_capturing_wrapped_call_TracebackCapturingWrapper)AutoBatchingMixinParallelBackendBaseparallel_config)Clientas_completed
get_clientrejoinsecede)sizeof)funcname)thread_state)TimeoutErrorc                 R    	 t          j        |            dS # t          $ r Y dS w xY w)NTF)weakrefref	TypeError)objs    L/var/www/html/test/jupyter/venv/lib/python3.11/site-packages/joblib/_dask.pyis_weakrefabler   +   s>    Ct   uus    
&&c                   0    e Zd ZdZd Zd Zd Zd Zd ZdS )_WeakKeyDictionarya  A variant of weakref.WeakKeyDictionary for unhashable objects.

    This datastructure is used to store futures for broadcasted data objects
    such as large numpy arrays or pandas dataframes that are not hashable and
    therefore cannot be used as keys of traditional python dicts.

    Furthermore using a dict with id(array) as key is not safe because the
    Python is likely to reuse id of recently collected arrays.
    c                     i | _         d S N_dataselfs    r   __init__z_WeakKeyDictionary.__init__>   s    


    c                 v    | j         t          |                   \  }} |            |urt          |          |S r    )r"   idKeyError)r$   r   r   vals       r   __getitem__z_WeakKeyDictionary.__getitem__A   s:    :bgg&S3553--
r&   c                      t          |          	  j                 \  }} |            |urt          |          n+# t          $ r  fd}t          j        ||          }Y nw xY w||f j        <   d S )Nc                     j         = d S r    r!   )_keyr$   s    r   
on_destroyz2_WeakKeyDictionary.__setitem__.<locals>.on_destroyS   s    JsOOOr&   )r(   r"   r)   r   r   )r$   r   valuer   r.   r0   r/   s   `     @r   __setitem__z_WeakKeyDictionary.__setitem__H   s    gg	/Z_FCsuuCsmm#    	/ 	/ 	/$ $ $ $ $ $ +c:..CCC	/ u*
3s   +? %A'&A'c                 *    t          | j                  S r    )lenr"   r#   s    r   __len__z_WeakKeyDictionary.__len__Y   s    4:r&   c                 8    | j                                          d S r    )r"   clearr#   s    r   r7   z_WeakKeyDictionary.clear\   s    
r&   N)	__name__
__module____qualname____doc__r%   r+   r2   r5   r7    r&   r   r   r   3   si             % % %"      r&   r   c                     	 t          | t                    r| d         d         } n# t          $ r Y nw xY wt          |           S )Nr   )
isinstancelist	Exceptionr   )xs    r   	_funcnamerB   `   sS    a 	!QA   A;;s   #& 
33c                     d | D             }t          |          dk    rd}nd}t          |           |t          |           fS )z8Summarize of list of (func, args, kwargs) function callsc                     h | ]\  }}}|	S r<   r<   ).0funcargskwargss       r   	<setcomp>z&_make_tasks_summary.<locals>.<setcomp>k   s    999/T4D999r&   r   FT)r4   rB   )tasksunique_funcsmixeds      r   _make_tasks_summaryrM   i   sO    995999L
<Au::ui....r&   c                   &    e Zd ZdZd ZddZd ZdS )Batchz6dask-compatible wrapper that executes a batch of tasksc                 J    t          |          \  | _        | _        | _        d S r    )rM   
_num_tasks_mixedrB   )r$   rJ   s     r   r%   zBatch.__init__w   s#     8K57Q7Q4dnnnr&   Nc           	          g }t          d          5  |D ]!\  }}}|                     ||i |           "|cd d d            S # 1 swxY w Y   d S )Ndask)backend)r   append)r$   rJ   resultsrF   rG   rH   s         r   __call__zBatch.__call__|   s    V,,, 	 	&+ 6 6"dFttT4V445555	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   &AA
A
c                 D    d| j          d| j         d}| j        rd|z   }|S )N	batch_of_r.   _callsmixed_)rB   rQ   rR   )r$   descrs     r   __repr__zBatch.__repr__   s7    DDNDDT_DDD; 	%u$Er&   r    )r8   r9   r:   r;   r%   rX   r^   r<   r&   r   rO   rO   t   sO        @@R R R
       r&   rO   c                      d S r    r<   r<   r&   r   _joblib_probe_taskr`      s    Dr&   c                        e Zd ZdZdZdZdZ	 	 	 	 	 d fd	Zd Zd	 Z	d
 Z
ddZd Zd Zd Zd ZddZd ZddZej        d             Z xZS )DaskDistributedBackendg?g      ?TN
   c                    t                                                       t          d}t          |          |I|rt	          ||d          }n4	 t                      }n$# t          $ r}d}t          |          |d }~ww xY w|| _        |@t          |t          t          f          s$t          dt          |          j        z            |ct          |          dk    rPt          |          | _        | j                            |d          }	d	 t!          ||	          D             | _        ng | _        i | _        || _        || _        t)          g |j        dd
          | _        i | _        i | _        d S )Nz{You are trying to use 'dask' as a joblib parallel backend but dask is not installed. Please install dask to fix this error.F)loopset_as_defaultzTo use Joblib with Dask first create a Dask Client

    from dask.distributed import Client
    client = Client()
or
    client = Client('scheduler-address:8786')z&scatter must be a list/tuple, got `%s`r   T)	broadcastc                 4    i | ]\  }}t          |          |S r<   )r(   )rE   rA   fs      r   
<dictcomp>z3DaskDistributedBackend.__init__.<locals>.<dictcomp>   s$     N N NdaA N N Nr&   )rf   with_resultsraise_errors)superr%   distributed
ValueErrorr   r   clientr>   r?   tupler   typer8   r4   _scatterscatterzipdata_futureswait_for_workers_timeoutsubmit_kwargsr   rf   waiting_futures_results
_callbacks)r$   scheduler_hostru   rq   rf   rx   ry   msge	scattered	__class__s             r   r%   zDaskDistributedBackend.__init__   s    	% 
 S//!> 1T%PPP1'\\FF! 	1 	1 	1H  %S//q0	1 z'D%='I'I84==;QQ   3w<<!#3#3 MMDM++Gt+DDI N Nc'96M6M N N NDDM "D(@%*+V[t% 
  
  
 s   A! !
B+A==Bc                 z  K   | j         r| j        2 3 d {V \  }}| j                            |          }| j                            |          }|j        dk    r|\  }}}|                    |           e|                    |            ||           6 t          j	        d           d {V  | j         d S d S )Nerror{Gz?)
	_continuerz   r{   popr|   statusset_exception
set_resultasynciosleep)r$   futureresult	cf_futurecallbacktypexctbs           r   _collectzDaskDistributedBackend._collect   s     n 
	&(,(< % % % % % % %nff M--f55	?..v66=G++#)LCb++C0000((000HV$$$$ )= -%%%%%%%%% n 
	& 
	& 
	& 
	& 
	&s   Bc                     t           dfS )Nr<   )rb   r#   s    r   
__reduce__z!DaskDistributedBackend.__reduce__   s    &++r&   c                 0    t          | j                  dfS )N)rq   rc   )rb   rq   r#   s    r   get_nested_backendz)DaskDistributedBackend.get_nested_backend   s    %T[9992==r&   r   c                 :    || _         |                     |          S r    )paralleleffective_n_jobs)r$   n_jobsr   backend_argss       r   	configurez DaskDistributedBackend.configure   s     $$V,,,r&   c                     d| _         | j        j                            | j                   t                      | _        d S )NT)r   rq   rf   add_callbackr   r   call_data_futuresr#   s    r   
start_callz!DaskDistributedBackend.start_call   s8    %%dm444!3!5!5r&   c                 n    d| _         t          j        d           | j                                         d S )NFr   )r   timer   r   r7   r#   s    r   	stop_callz DaskDistributedBackend.stop_call   s8      	
4$$&&&&&r&   c           	         t          | j                                                                                  }|dk    s| j        s|S 	 | j                            t                                        | j                   nS# t          $ rF}d	                    | j        t          dd| j        z                      }t          |          |d }~ww xY wt          | j                                                                                  S )Nr   )timeoutzDaskDistributedBackend has no worker after {} seconds. Make sure that workers are started and can properly connect to the scheduler and increase the joblib/dask connection timeout with:

parallel_config(backend='dask', wait_for_workers_timeout={})rd      )sumrq   ncoresvaluesrx   submitr`   r   _TimeoutErrorformatmaxr   )r$   r   r   r   	error_msgs        r   r   z'DaskDistributedBackend.effective_n_jobs   s   t{1133::<<==q  (E ##
	1K122995 :      	1 	1 	1O
 f-BD99::   y))q0	1 4;%%''..00111s   	8B 
CACCc           
         K   t                      t           dd            fd}g }|j        D ]\  }}}t           ||           d {V           }t          t	          |                                 ||                                           d {V                     }|                    |||f           t          |          |fS )Nr   c                   K   g }| D ] }t          |          }|v r|                    |                    2	j                            |d           }|	 |          d {V }n# t          $ r Y nw xY w|`t          |          rQt          |          dk    r>	j                            |dd          }t          j
        |          }||<   | d {V }||                    |           |                    |           |S )Ng     @@TF)asynchronoushash)r(   rV   rw   getr)   r   r   rq   ru   r   Task)
rG   outargarg_idrj   _corotr   itemgettersr$   s
          r   maybe_to_futuresz>DaskDistributedBackend._to_func_args.<locals>.maybe_to_futures  sT     C &$ &$C[((JJ{62333%))&$779!2!>"3C"8888888#   y)#.. (6#;;3D3D %)K$7$7 #$U %8 % %E !(U 3 3A56-c2&'A=JJqMMMMJJsOOOOJs   A))
A65A6)	dictgetattritemsr?   rv   keysr   rV   rO   )	r$   rF   r   rJ   rj   rG   rH   r   r   s	   `      @@r   _to_func_argsz$DaskDistributedBackend._to_func_args  s     ff $D*=tDD)	 )	 )	 )	 )	 )	 )	V #z 	, 	,OAtV..t4444444455D#fkkmm3C3CFMMOO3T3T-T-T-T-T-T-TUUVVFLL!T6*++++ee$$r&   c                      t           j                                        j        _         fd} j        j                            |||           S )Nc                 >  K                        |            d {V \  }}t          |           dt                      j         } j        j        t          |          f||dj        }j        	                    |           |j
        |<   j        |<   d S )N-)rJ   r/   )r   reprr   hexrq   r   r	   ry   rz   addr|   r{   )rF   r   batchrJ   r/   dask_futurer   r$   s         r   rj   z-DaskDistributedBackend.apply_async.<locals>.fN  s      !%!3!3D!9!9999999LE5%[[00577;00C,$+,*511  $	 K  $$[111+3DOK()2DM+&&&r&   )
concurrentfuturesFuturer   r   rq   rf   r   )r$   rF   r   rj   r   s   `   @r   apply_asyncz"DaskDistributedBackend.apply_asyncJ  se    &--//	!(		3 	3 	3 	3 	3 	3 	%%ax888r&   c                      t          |          S r    )r   )r$   r   s     r   retrieve_result_callbackz/DaskDistributedBackend.retrieve_result_callback`  s    9#>>>r&   c                 @   | j         j        5  | j         j                                         | j         j                                        s<| j         j                                         | j         j                                        <ddd           dS # 1 swxY w Y   dS )zTell the client to cancel any task submitted via this instance

        joblib.Parallel will never access those results
        N)rz   lockr   r7   queueemptyr   )r$   ensure_readys     r   abort_everythingz'DaskDistributedBackend.abort_everythingc  s    
 !& 	1 	1 (..000*06688 1$*..000 *06688 1	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1s   A9BBBc              #      K   t          t          d          rt                       dV  t          t          d          rt                       dS dS )zOverride ParallelBackendBase.retrieval_context to avoid deadlocks.

        This removes thread from the worker's thread pool (using 'secede').
        Seceding avoids deadlock in nested parallelism settings.
        execution_stateN)hasattrr   r   r   r#   s    r   retrieval_contextz(DaskDistributedBackend.retrieval_contextm  sW       <!233 	HHH<!233 	HHHHH	 	r&   )NNNNrd   )r   Nr    )T)r8   r9   r:   MIN_IDEAL_BATCH_DURATIONMAX_IDEAL_BATCH_DURATIONsupports_retrieve_callbackdefault_n_jobsr%   r   r   r   r   r   r   r   r   r   r   r   
contextlibcontextmanagerr   __classcell__)r   s   @r   rb   rb      s:       ""!%N !#9 9 9 9 9 9v& & &, , ,> > >- - - -6 6 6
' ' '2 2 248% 8% 8%t   ,? ? ?1 1 1 1       r&   rb   ),
__future__r   r   r   r   concurrent.futuresr   r   r   r   uuidr   _utilsr   r	   r   r
   r   r   rT   ro   ImportErrordask.distributedr   r   r   r   r   dask.sizeofr   
dask.utilsr   distributed.utilsr   r   r   tornado.genr   r   rB   rM   rO   r`   rb   r<   r&   r   <module>r      s   @ @ @ @ @ @ @ @ @ @                         N M M M M M M M M MKKK   DKKK /              #"""""######......> 	DCCCCCC > > >========>  * * * * * * * *Z  / / /       ,	 	 	
n n n n n.0C n n n n ns#   A 	AA5A< <B
	B
