package com.tencent.qqlive.modules.vb.datacenter.impl.datacenter;

import android.os.Handler;
import android.os.SystemClock;
import android.text.TextUtils;
import com.sogou.bu.input.deletekey.h;
import com.sogou.bu.input.deletekey.i;
import com.tencent.qqlive.modules.vb.datacenter.impl.DataCenterError;
import com.tencent.qqlive.modules.vb.datacenter.impl.DataCenterInitTask;
import com.tencent.qqlive.modules.vb.datacenter.impl.DataCenterLog;
import com.tencent.qqlive.modules.vb.datacenter.impl.DataCenterServiceProxy;
import com.tencent.qqlive.modules.vb.datacenter.impl.datacenter.eventtrigger.AbsDSLEventTrigger;
import com.tencent.qqlive.modules.vb.datacenter.impl.datacenter.monitor.EventPipelineMonitor;
import com.tencent.qqlive.modules.vb.datacenter.impl.datacenter.monitor.EventQueueMonitor;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;

/* compiled from: SogouSource */
/* loaded from: classes6.dex */
public class EventAsyncPipeline extends AbsEventPipeline implements Runnable, DataCenterInitTask.IInitCallback {
    private ExecutorService mExecutorService;
    private EventAsyncPipelineConfig mPipelineConfig;
    private long mPipelineStartTs;
    private long mPipelineTimeOut;
    private String mPipelineWaitingTriggerId;
    private long mQueueTimeOut;
    private final PriorityBlockingQueue<AbsEvent> mEventQueue = new PriorityBlockingQueue<>();
    private final EventQueueMonitor mQueueMonitor = new EventQueueMonitor();
    private final long DEFAULT_PIPELINE_TIMEOUT = 5000;
    private ResetCountDownLatch mCountDownLatch = new ResetCountDownLatch(1);
    private volatile boolean isEnable = true;
    private volatile Runnable mPipelineEnableRunnable = new h(this, 5);
    private volatile Runnable mWaitTimeoutRunnable = new i(this, 6);

    public EventAsyncPipeline(EventAsyncPipelineConfig eventAsyncPipelineConfig) {
        this.mPipelineConfig = eventAsyncPipelineConfig;
        if (eventAsyncPipelineConfig != null) {
            this.mPipelineWaitingTriggerId = eventAsyncPipelineConfig.getPipelineWaitingTriggerId();
            this.mPipelineTimeOut = this.mPipelineConfig.getPipelineTimeOut();
            this.mPipelineId = this.mPipelineConfig.getPipelineId();
        }
        if (TextUtils.isEmpty(this.mPipelineId)) {
            this.mPipelineId = UUID.randomUUID().toString();
        }
        StringBuilder sb = new StringBuilder("创建异步管道:");
        sb.append(this.mPipelineId);
        sb.append(", 等待管道id:");
        sb.append(TextUtils.isEmpty(this.mPipelineWaitingTriggerId) ? "无" : this.mPipelineWaitingTriggerId);
        sb.append(", 等待管道超时:");
        sb.append(this.mPipelineTimeOut);
        sb.append(", 数据中心是否初始化:");
        sb.append(DataCenterInitTask.isInit());
        logD(sb.toString());
        if (DataCenterInitTask.isInit()) {
            onInitFinished();
        } else {
            DataCenterInitTask.addInitCallback(this);
        }
    }

    public /* synthetic */ void lambda$new$0() {
        logD("启用异步管道:" + this.mPipelineId + ", 等待的触发器已加入");
        setPipelineEnable();
    }

    public /* synthetic */ void lambda$new$1() {
        logD("启用异步管道:" + this.mPipelineId + ", 事件消费等待已超时");
        setPipelineEnable();
    }

    private void lockNotify() {
        this.mCountDownLatch.countDown();
    }

    private void lockReset() {
        this.mCountDownLatch.reset();
    }

    private void logD(String str) {
        DataCenterLog.d(DataCenterLog.TAG_EVENT, str);
    }

    private void parseConfig() {
        this.mPipelineStartTs = SystemClock.elapsedRealtime();
        EventAsyncPipelineConfig eventAsyncPipelineConfig = this.mPipelineConfig;
        if (eventAsyncPipelineConfig != null) {
            long queueTimeOut = eventAsyncPipelineConfig.getQueueTimeOut();
            this.mQueueTimeOut = queueTimeOut;
            this.mQueueMonitor.setQueueWaitingTime(queueTimeOut);
            this.mPipelineWaitingTriggerId = this.mPipelineConfig.getPipelineWaitingTriggerId();
            this.mPipelineTimeOut = this.mPipelineConfig.getPipelineTimeOut();
            if (this.mPipelineEnableRunnable == null || TextUtils.isEmpty(this.mPipelineWaitingTriggerId)) {
                return;
            }
            this.isEnable = false;
            if (this.mPipelineTimeOut == 0) {
                this.mPipelineTimeOut = 5000L;
            }
            EventPipelineMonitor.getInstance().getHandler().postDelayed(this.mPipelineEnableRunnable, this.mPipelineTimeOut);
        }
    }

    private void process(AbsEvent absEvent) {
        if (absEvent.getSeqNum() >= this.mQueueMonitor.getWaitSeqNum()) {
            this.mQueueMonitor.updateWaitState();
            this.mQueueMonitor.setWaitSeqNum(absEvent.getSeqNum() + 1);
        }
        absEvent.getEventMonitorInfo().setEventQueueMonitor(this.mQueueMonitor);
        AbsEvent peek = this.mEventQueue.peek();
        long seqNum = peek != null ? peek.getSeqNum() : -1L;
        long currentTimeMillis = System.currentTimeMillis();
        logD("管道:" + this.mPipelineId + " 开始消费事件, 页面id:" + absEvent.getEventPageId() + ", 类型:" + absEvent.getEventType() + ", 目标元素:" + absEvent.getEventTarget() + ", 序号:" + absEvent.getSeqNum() + ", 队头事件序号:" + seqNum + ", 队列事件数:" + this.mEventQueue.size() + ", 等待事件序号:" + this.mQueueMonitor.getWaitSeqNum());
        processEvent(absEvent);
        logD("管道:" + this.mPipelineId + " 结束消费事件, 页面id:" + absEvent.getEventPageId() + ", 类型:" + absEvent.getEventType() + ", 目标元素:" + absEvent.getEventTarget() + ", 序号:" + absEvent.getSeqNum() + ", 队列事件数:" + this.mEventQueue.size() + ", 队头事件序号:" + seqNum + ", 等待事件序号:" + this.mQueueMonitor.getWaitSeqNum() + ", 消费耗时:" + (System.currentTimeMillis() - currentTimeMillis) + "ms");
    }

    private void setPipelineEnable() {
        this.isEnable = true;
        lockNotify();
    }

    @Override // com.tencent.qqlive.modules.vb.datacenter.impl.datacenter.AbsEventPipeline
    public DataCenterError addEventTrigger(AbsDSLEventTrigger absDSLEventTrigger) {
        DataCenterError addEventTrigger = super.addEventTrigger(absDSLEventTrigger);
        if (!TextUtils.isEmpty(this.mPipelineWaitingTriggerId)) {
            if (this.mPipelineWaitingTriggerId.equals(absDSLEventTrigger.getEventTriggerId())) {
                if (DataCenterInitTask.isInit()) {
                    logD("启用异步管道:" + this.mPipelineId + ", 等待的触发器已加入");
                    EventPipelineMonitor.getInstance().getHandler().removeCallbacks(this.mPipelineEnableRunnable);
                    setPipelineEnable();
                }
                this.mPipelineEnableRunnable = null;
            }
        }
        return addEventTrigger;
    }

    public EventToken createToken() {
        return getTokenGenerator().createToken();
    }

    @Override // com.tencent.qqlive.modules.vb.datacenter.impl.datacenter.AbsEventPipeline
    void onEvent(AbsEvent absEvent) {
        this.mQueueMonitor.increaseEventCount();
        absEvent.getEventMonitorInfo().onPushed(false, absEvent.getSeqNum());
        this.mEventQueue.put(absEvent);
        if (this.isEnable && DataCenterInitTask.isInit()) {
            AbsEvent peek = this.mEventQueue.peek();
            long seqNum = peek != null ? peek.getSeqNum() : -1L;
            StringBuilder sb = new StringBuilder("管道:");
            sb.append(this.mPipelineId);
            sb.append(" 接收事件, 页面id:");
            sb.append(absEvent.getEventPageId());
            sb.append(", 类型:");
            sb.append(absEvent.getEventType());
            sb.append(", 目标元素:");
            sb.append(absEvent.getEventTarget());
            sb.append(", 序号:");
            sb.append(absEvent.getSeqNum());
            sb.append(", 当前队列事件数:");
            sb.append(this.mEventQueue.size());
            sb.append(", 队头事件:");
            sb.append(seqNum == -1 ? "空" : Long.valueOf(seqNum));
            logD(sb.toString());
            lockNotify();
            return;
        }
        logD("管道:" + this.mPipelineId + " 接收事件, 页面id:" + absEvent.getEventPageId() + ", 类型:" + absEvent.getEventType() + ", 目标元素:" + absEvent.getEventTarget() + ", 序号:" + absEvent.getSeqNum() + ", 进入阻塞状态, 数据中心是否初始化:" + DataCenterInitTask.isInit() + ", 等待管道id:" + this.mPipelineWaitingTriggerId + ", 等待管道超时:" + this.mPipelineTimeOut + ", 已等待时间:" + (SystemClock.elapsedRealtime() - this.mPipelineStartTs));
    }

    @Override // com.tencent.qqlive.modules.vb.datacenter.impl.DataCenterInitTask.IInitCallback
    public void onInitFinished() {
        parseConfig();
        if (this.mExecutorService == null) {
            this.mExecutorService = DataCenterServiceProxy.getThreadProxy().getSingleThreadPool(this.mPipelineId);
        }
        this.mExecutorService.execute(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        AbsEvent absEvent;
        while (true) {
            if (!this.isEnable || !DataCenterInitTask.isInit()) {
                logD("管道:" + this.mPipelineId + " 消费事件, 进入阻塞状态, 数据中心是否初始化:" + DataCenterInitTask.isInit() + ", 等待管道id:" + this.mPipelineWaitingTriggerId + ", 等待管道超时:" + this.mPipelineTimeOut + ", 已等待时间:" + (SystemClock.elapsedRealtime() - this.mPipelineStartTs) + ", 管道可用状态:" + this.isEnable);
                lockReset();
            }
            try {
                this.mCountDownLatch.await();
                absEvent = this.mEventQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
                absEvent = null;
            }
            if (absEvent != null) {
                if (absEvent.getSeqNum() <= this.mQueueMonitor.getWaitSeqNum()) {
                    process(absEvent);
                } else if (this.mQueueMonitor.isNeedWait()) {
                    this.mEventQueue.put(absEvent);
                    long remainingWaitTime = this.mQueueMonitor.getRemainingWaitTime();
                    logD("管道:" + this.mPipelineId + " 消费事件, 进入阻塞状态, 页面id:" + absEvent.getEventPageId() + ", 类型:" + absEvent.getEventType() + ", 目标元素:" + absEvent.getEventTarget() + ", 序号:" + absEvent.getSeqNum() + ", 队列事件数:" + this.mEventQueue.size() + ", 等待序号:" + this.mQueueMonitor.getWaitSeqNum() + ", 等待超时配置:" + this.mEventQueue.size() + ", 等待超时配置:" + this.mQueueMonitor.getQueueWaitingTime() + ", 是否已超时:" + this.mQueueMonitor.isWaitTimeout() + ", 仍需等待:" + remainingWaitTime + "ms");
                    Handler handler = EventPipelineMonitor.getInstance().getHandler();
                    handler.removeCallbacks(this.mWaitTimeoutRunnable);
                    StringBuilder sb = new StringBuilder("resume process after ");
                    sb.append(remainingWaitTime);
                    sb.append("ms");
                    logD(sb.toString());
                    handler.postDelayed(this.mWaitTimeoutRunnable, remainingWaitTime);
                    lockReset();
                } else {
                    process(absEvent);
                }
            }
        }
    }
}
