/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.apache.streams.twitter.provider.TwitterProviderUtil;
import org.apache.streams.twitter.provider.TwitterTimelineProviderTask;
import org.apache.streams.twitter.provider.TwitterUserInformationProvider;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;

public class TwitterTimelineProvider
implements StreamsProvider,
Serializable {
    public static final String STREAMS_ID = "TwitterTimelineProvider";
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
    public static final int MAX_NUMBER_WAITING = 10000;
    private TwitterUserInformationConfiguration config;
    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
    protected Collection<String[]> screenNameBatches;
    protected Collection<Long> ids;
    protected volatile Queue<StreamsDatum> providerQueue;
    protected int idsCount;
    protected Twitter client;
    protected ListeningExecutorService executor;
    protected DateTime start;
    protected DateTime end;
    protected final AtomicBoolean running = new AtomicBoolean();
    List<ListenableFuture<Object>> futures = new ArrayList<ListenableFuture<Object>>();
    Boolean jsonStoreEnabled;
    Boolean includeEntitiesEnabled;

    public TwitterUserInformationConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(TwitterUserInformationConfiguration config) {
        this.config = config;
    }

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument((args.length >= 2 ? 1 : 0) != 0);
        String configfile = args[0];
        String outfile = args[1];
        Config reference = ConfigFactory.load();
        File conf_file = new File(configfile);
        assert (conf_file.exists());
        Config testResourceConfig = ConfigFactory.parseFileAnySyntax((File)conf_file, (ConfigParseOptions)ConfigParseOptions.defaults().setAllowMissing(false));
        Config typesafe = testResourceConfig.withFallback((ConfigMergeable)reference).resolve();
        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration((Config)typesafe);
        TwitterUserInformationConfiguration config = (TwitterUserInformationConfiguration)new ComponentConfigurator(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
        TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
        StreamsJacksonMapper mapper = new StreamsJacksonMapper((List)Lists.newArrayList((Object[])new String[]{"EEE MMM dd HH:mm:ss Z yyyy"}));
        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
        provider.prepare(config);
        provider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly((long)streamsConfiguration.getBatchFrequencyMs(), (TimeUnit)TimeUnit.MILLISECONDS);
            for (StreamsDatum datum : provider.readCurrent()) {
                try {
                    String json = mapper.writeValueAsString(datum.getDocument());
                    outStream.println(json);
                }
                catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (provider.isRunning());
        provider.cleanUp();
        outStream.flush();
    }

    public TwitterTimelineProvider(TwitterUserInformationConfiguration config) {
        this.config = config;
    }

    public Queue<StreamsDatum> getProviderQueue() {
        return this.providerQueue;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object o) {
        try {
            this.lock.writeLock().lock();
            this.providerQueue = this.constructQueue();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        Preconditions.checkNotNull(this.providerQueue);
        Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerKey());
        Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerSecret());
        Preconditions.checkNotNull((Object)this.config.getOauth().getAccessToken());
        Preconditions.checkNotNull((Object)this.config.getOauth().getAccessTokenSecret());
        Preconditions.checkNotNull(this.config.getInfo());
        this.consolidateToIDs();
        this.executor = this.ids.size() > 1 ? MoreExecutors.listeningDecorator((ExecutorService)TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, this.ids.size())) : MoreExecutors.listeningDecorator((ExecutorService)Executors.newSingleThreadExecutor());
    }

    public void startStream() {
        LOGGER.debug("{} startStream", (Object)STREAMS_ID);
        Preconditions.checkArgument((!this.ids.isEmpty() ? 1 : 0) != 0);
        this.running.set(true);
        this.submitTimelineThreads(this.ids.toArray(new Long[0]));
        this.executor.shutdown();
    }

    public boolean shouldContinuePulling(List<Status> statuses) {
        return statuses != null && statuses.size() > 0;
    }

    protected void submitTimelineThreads(Long[] ids) {
        Twitter client = this.getTwitterClient();
        for (int i = 0; i < ids.length; ++i) {
            TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
            ListenableFuture future = this.executor.submit((Runnable)providerTask);
            this.futures.add((ListenableFuture<Object>)future);
            LOGGER.info("submitted {}", (Object)ids[i]);
        }
    }

    private Collection<Long> retrieveIds(String[] screenNames) {
        Twitter client = this.getTwitterClient();
        ArrayList ids = Lists.newArrayList();
        try {
            for (User tStat : client.lookupUsers(screenNames)) {
                ids.add(tStat.getId());
            }
        }
        catch (TwitterException e) {
            LOGGER.error("Failure retrieving user details.", (Object)e.getMessage());
        }
        return ids;
    }

    public StreamsResultSet readCurrent() {
        StreamsResultSet result;
        LOGGER.debug("Providing {} docs", (Object)this.providerQueue.size());
        try {
            this.lock.writeLock().lock();
            result = new StreamsResultSet(this.providerQueue);
            result.setCounter(new DatumStatusCounter());
            this.providerQueue = this.constructQueue();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (result.size() == 0 && this.providerQueue.isEmpty() && this.executor.isTerminated()) {
            LOGGER.info("Finished.  Cleaning up...");
            this.running.set(false);
            LOGGER.info("Exiting");
        }
        return result;
    }

    protected Queue<StreamsDatum> constructQueue() {
        return new LinkedBlockingQueue<StreamsDatum>();
    }

    public StreamsResultSet readNew(BigInteger sequence) {
        LOGGER.debug("{} readNew", (Object)STREAMS_ID);
        throw new NotImplementedException();
    }

    public StreamsResultSet readRange(DateTime start, DateTime end) {
        LOGGER.debug("{} readRange", (Object)STREAMS_ID);
        throw new NotImplementedException();
    }

    protected void consolidateToIDs() {
        List screenNames = Lists.newArrayList();
        this.ids = Lists.newArrayList();
        for (String account : this.config.getInfo()) {
            try {
                if (new Long(account) != null) {
                    this.ids.add(Long.parseLong(Objects.toString(account, null)));
                    continue;
                }
                screenNames.add(account);
            }
            catch (Exception e) {
                LOGGER.error("Exception while trying to add ID: {{}}, {}", (Object)account, (Object)e);
            }
        }
        this.screenNameBatches = new ArrayList<String[]>();
        while (screenNames.size() >= 100) {
            this.screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
            screenNames = screenNames.subList(100, screenNames.size());
        }
        if (screenNames.size() > 0) {
            this.screenNameBatches.add(screenNames.toArray(new String[this.ids.size()]));
        }
        Iterator<String[]> screenNameBatchIterator = this.screenNameBatches.iterator();
        while (screenNameBatchIterator.hasNext()) {
            Collection<Long> batchIds = this.retrieveIds(screenNameBatchIterator.next());
            this.ids.addAll(batchIds);
        }
    }

    public Twitter getTwitterClient() {
        String baseUrl = TwitterProviderUtil.baseUrl(this.config);
        ConfigurationBuilder builder = new ConfigurationBuilder().setOAuthConsumerKey(this.config.getOauth().getConsumerKey()).setOAuthConsumerSecret(this.config.getOauth().getConsumerSecret()).setOAuthAccessToken(this.config.getOauth().getAccessToken()).setOAuthAccessTokenSecret(this.config.getOauth().getAccessTokenSecret()).setIncludeEntitiesEnabled(true).setJSONStoreEnabled(true).setAsyncNumThreads(3).setRestBaseURL(baseUrl).setIncludeMyRetweetEnabled(Boolean.TRUE.booleanValue()).setPrettyDebugEnabled(Boolean.TRUE.booleanValue());
        return new TwitterFactory(builder.build()).getInstance();
    }

    public void cleanUp() {
        this.shutdownAndAwaitTermination((ExecutorService)this.executor);
    }

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(10L, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(10L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public boolean isRunning() {
        if (this.providerQueue.isEmpty() && this.executor.isTerminated() && Futures.allAsList(this.futures).isDone()) {
            LOGGER.info("Completed");
            this.running.set(false);
            LOGGER.info("Exiting");
        }
        return this.running.get();
    }
}

