/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.processor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.TwitterConfiguration;
import org.apache.streams.twitter.TwitterStreamConfiguration;
import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
import org.apache.streams.twitter.pojo.Delete;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.provider.TwitterEventClassifier;
import org.apache.streams.twitter.provider.TwitterProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterObjectFactory;
import twitter4j.conf.ConfigurationBuilder;

public class FetchAndReplaceTwitterProcessor
implements StreamsProcessor {
    private static final String PROVIDER_ID = TwitterActivityUtil.getProvider().getId();
    private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class);
    private static final int MAX_ATTEMPTS = 5;
    public static final int BACKOFF = 240000;
    private final TwitterConfiguration config;
    private Twitter client;
    private ObjectMapper mapper;
    private int retryCount;

    public FetchAndReplaceTwitterProcessor() {
        this((TwitterStreamConfiguration)new ComponentConfigurator(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"));
    }

    public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) {
        this.config = config;
    }

    public String getId() {
        return TwitterActivityUtil.getProvider().getId();
    }

    public List<StreamsDatum> process(StreamsDatum entry) {
        if (entry.getDocument() instanceof Activity) {
            Activity doc = (Activity)entry.getDocument();
            String originalId = doc.getId();
            if (PROVIDER_ID.equals(doc.getProvider().getId())) {
                this.fetchAndReplace(doc, originalId);
            }
        } else {
            throw new IllegalStateException("Requires an activity document");
        }
        return Lists.newArrayList((Object[])new StreamsDatum[]{entry});
    }

    public void prepare(Object configurationObject) {
        this.client = this.getTwitterClient();
        this.mapper = StreamsJacksonMapper.getInstance();
    }

    public void cleanUp() {
    }

    protected void fetchAndReplace(Activity doc, String originalId) {
        try {
            String json = this.fetch(doc);
            this.replace(doc, json);
            doc.setId(originalId);
            this.retryCount = 0;
        }
        catch (TwitterException tw) {
            if (tw.exceededRateLimitation()) {
                this.sleepAndTryAgain(doc, originalId);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Error fetching and replacing tweet for activity {}", (Object)doc.getId());
        }
    }

    protected void replace(Activity doc, String json) throws IOException, ActivityConversionException {
        Class documentSubType = TwitterEventClassifier.detectClass(json);
        Object object = this.mapper.readValue(json, documentSubType);
        if (documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) {
            TwitterActivityUtil.updateActivity((Tweet)object, doc);
        } else if (documentSubType.equals(Delete.class)) {
            TwitterActivityUtil.updateActivity((Delete)object, doc);
        } else {
            LOGGER.info("Could not determine the correct update method for {}", (Object)documentSubType);
        }
    }

    protected String fetch(Activity doc) throws TwitterException {
        String id = doc.getObject().getId();
        LOGGER.debug("Fetching status from Twitter for {}", (Object)id);
        Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
        Status status = this.getTwitterClient().showStatus(tweetId.longValue());
        return TwitterObjectFactory.getRawJSON((Object)status);
    }

    protected Twitter getTwitterClient() {
        if (this.client == null) {
            String baseUrl = TwitterProviderUtil.baseUrl(this.config);
            ConfigurationBuilder builder = new ConfigurationBuilder().setOAuthConsumerKey(this.config.getOauth().getConsumerKey()).setOAuthConsumerSecret(this.config.getOauth().getConsumerSecret()).setOAuthAccessToken(this.config.getOauth().getAccessToken()).setOAuthAccessTokenSecret(this.config.getOauth().getAccessTokenSecret()).setIncludeEntitiesEnabled(true).setJSONStoreEnabled(true).setAsyncNumThreads(1).setRestBaseURL(baseUrl).setIncludeMyRetweetEnabled(Boolean.TRUE.booleanValue()).setPrettyDebugEnabled(Boolean.TRUE.booleanValue());
            this.client = new TwitterFactory(builder.build()).getInstance();
        }
        return this.client;
    }

    protected void sleepAndTryAgain(Activity doc, String originalId) {
        try {
            if (this.retryCount < 5) {
                ++this.retryCount;
                LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (Object)(this.retryCount * 4));
                Thread.sleep(240000 * this.retryCount);
                this.fetchAndReplace(doc, originalId);
            } else {
                this.retryCount = 0;
            }
        }
        catch (InterruptedException e) {
            LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
        }
    }
}

