/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
import org.apache.streams.twitter.provider.TwitterErrorHandler;
import org.apache.streams.twitter.provider.TwitterProviderUtil;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.User;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;

public class TwitterUserInformationProvider
implements StreamsProvider,
Serializable {
    public static final String STREAMS_ID = "TwitterUserInformationProvider";
    private static ObjectMapper MAPPER = new StreamsJacksonMapper((List)Lists.newArrayList((Object[])new String[]{"EEE MMM dd HH:mm:ss Z yyyy"}));
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
    public static final int MAX_NUMBER_WAITING = 1000;
    private TwitterUserInformationConfiguration config;
    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
    protected volatile Queue<StreamsDatum> providerQueue;
    protected Iterator<Long[]> idsBatches;
    protected Iterator<String[]> screenNameBatches;
    protected ListeningExecutorService executor;
    protected DateTime start;
    protected DateTime end;
    protected final AtomicBoolean running = new AtomicBoolean();

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument((args.length >= 2 ? 1 : 0) != 0);
        String configfile = args[0];
        String outfile = args[1];
        Config reference = ConfigFactory.load();
        File conf_file = new File(configfile);
        assert (conf_file.exists());
        Config testResourceConfig = ConfigFactory.parseFileAnySyntax((File)conf_file, (ConfigParseOptions)ConfigParseOptions.defaults().setAllowMissing(false));
        Config typesafe = testResourceConfig.withFallback((ConfigMergeable)reference).resolve();
        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration((Config)typesafe);
        TwitterUserInformationConfiguration config = (TwitterUserInformationConfiguration)new ComponentConfigurator(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter");
        TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config);
        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
        provider.prepare(config);
        provider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly((long)streamsConfiguration.getBatchFrequencyMs(), (TimeUnit)TimeUnit.MILLISECONDS);
            for (StreamsDatum datum : provider.readCurrent()) {
                try {
                    String json = MAPPER.writeValueAsString(datum.getDocument());
                    outStream.println(json);
                }
                catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (provider.isRunning());
        provider.cleanUp();
        outStream.flush();
    }

    public TwitterUserInformationConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(TwitterUserInformationConfiguration config) {
        this.config = config;
    }

    public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
        return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public TwitterUserInformationProvider() {
        this.config = (TwitterUserInformationConfiguration)new ComponentConfigurator(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
    }

    public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) {
        this.config = config;
    }

    public Queue<StreamsDatum> getProviderQueue() {
        return this.providerQueue;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object o) {
        if (o instanceof TwitterFollowingConfiguration) {
            this.config = (TwitterUserInformationConfiguration)o;
        }
        Preconditions.checkNotNull((Object)this.config);
        Preconditions.checkNotNull((Object)this.config.getOauth());
        Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerKey());
        Preconditions.checkNotNull((Object)this.config.getOauth().getConsumerSecret());
        Preconditions.checkNotNull((Object)this.config.getOauth().getAccessToken());
        Preconditions.checkNotNull((Object)this.config.getOauth().getAccessTokenSecret());
        Preconditions.checkNotNull(this.config.getInfo());
        try {
            this.lock.writeLock().lock();
            this.providerQueue = this.constructQueue();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        Preconditions.checkNotNull(this.providerQueue);
        ArrayList<String> screenNames = new ArrayList<String>();
        ArrayList<String[]> screenNameBatches = new ArrayList<String[]>();
        ArrayList<Long> ids = new ArrayList<Long>();
        ArrayList<Long[]> idsBatches = new ArrayList<Long[]>();
        for (String s : this.config.getInfo()) {
            if (s == null) continue;
            String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
            try {
                ids.add(Long.parseLong(potentialScreenName));
            }
            catch (Exception e) {
                screenNames.add(potentialScreenName);
            }
            if (ids.size() >= 100) {
                idsBatches.add(ids.toArray(new Long[ids.size()]));
                ids = new ArrayList();
            }
            if (screenNames.size() < 100) continue;
            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
            screenNames = new ArrayList();
        }
        if (ids.size() > 0) {
            idsBatches.add(ids.toArray(new Long[ids.size()]));
        }
        if (screenNames.size() > 0) {
            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
        }
        this.executor = ids.size() + screenNames.size() > 0 ? MoreExecutors.listeningDecorator((ExecutorService)TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size() + screenNames.size())) : MoreExecutors.listeningDecorator((ExecutorService)Executors.newSingleThreadExecutor());
        Preconditions.checkNotNull((Object)this.executor);
        this.idsBatches = idsBatches.iterator();
        this.screenNameBatches = screenNameBatches.iterator();
    }

    public void startStream() {
        Preconditions.checkNotNull((Object)this.executor);
        Preconditions.checkArgument((this.idsBatches.hasNext() || this.screenNameBatches.hasNext() ? 1 : 0) != 0);
        LOGGER.info("{}{} - startStream", this.idsBatches, this.screenNameBatches);
        while (this.idsBatches.hasNext()) {
            this.loadBatch(this.idsBatches.next());
        }
        while (this.screenNameBatches.hasNext()) {
            this.loadBatch(this.screenNameBatches.next());
        }
        this.running.set(true);
        this.executor.shutdown();
    }

    protected void loadBatch(Long[] ids) {
        Twitter client = this.getTwitterClient();
        int keepTrying = 0;
        while (keepTrying < 1) {
            try {
                long[] toQuery = new long[ids.length];
                for (int i = 0; i < ids.length; ++i) {
                    toQuery[i] = ids[i];
                }
                for (User tUser : client.lookupUsers(toQuery)) {
                    String json = DataObjectFactory.getRawJSON((Object)tUser);
                    try {
                        org.apache.streams.twitter.pojo.User user = (org.apache.streams.twitter.pojo.User)MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
                        ComponentUtils.offerUntilSuccess((Object)new StreamsDatum((Object)user), this.providerQueue);
                    }
                    catch (Exception exception) {
                        LOGGER.warn("Failed to read document as User ", (Object)tUser);
                    }
                }
                keepTrying = 10;
            }
            catch (TwitterException twitterException) {
                keepTrying += TwitterErrorHandler.handleTwitterError(client, (Exception)((Object)twitterException));
            }
            catch (Exception e) {
                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
            }
        }
    }

    protected void loadBatch(String[] ids) {
        Twitter client = this.getTwitterClient();
        int keepTrying = 0;
        while (keepTrying < 1) {
            try {
                for (User tUser : client.lookupUsers(ids)) {
                    String json = DataObjectFactory.getRawJSON((Object)tUser);
                    try {
                        org.apache.streams.twitter.pojo.User user = (org.apache.streams.twitter.pojo.User)MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
                        ComponentUtils.offerUntilSuccess((Object)new StreamsDatum((Object)user), this.providerQueue);
                    }
                    catch (Exception exception) {
                        LOGGER.warn("Failed to read document as User ", (Object)tUser);
                    }
                }
                keepTrying = 10;
            }
            catch (TwitterException twitterException) {
                keepTrying += TwitterErrorHandler.handleTwitterError(client, (Exception)((Object)twitterException));
            }
            catch (Exception e) {
                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
            }
        }
    }

    public StreamsResultSet readCurrent() {
        StreamsResultSet result;
        LOGGER.debug("{}{} - readCurrent", this.idsBatches, this.screenNameBatches);
        try {
            this.lock.writeLock().lock();
            result = new StreamsResultSet(this.providerQueue);
            result.setCounter(new DatumStatusCounter());
            this.providerQueue = this.constructQueue();
            LOGGER.debug("{}{} - providing {} docs", new Object[]{this.idsBatches, this.screenNameBatches, result.size()});
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return result;
    }

    protected Queue<StreamsDatum> constructQueue() {
        return new LinkedBlockingQueue<StreamsDatum>();
    }

    public StreamsResultSet readNew(BigInteger sequence) {
        LOGGER.debug("{} readNew", (Object)STREAMS_ID);
        throw new NotImplementedException();
    }

    public StreamsResultSet readRange(DateTime start, DateTime end) {
        LOGGER.debug("{} readRange", (Object)STREAMS_ID);
        this.start = start;
        this.end = end;
        this.readCurrent();
        StreamsResultSet result = (StreamsResultSet)this.providerQueue.iterator();
        return result;
    }

    public boolean isRunning() {
        if (this.providerQueue.isEmpty() && this.executor.isTerminated()) {
            LOGGER.info("{}{} - completed", this.idsBatches, this.screenNameBatches);
            this.running.set(false);
            LOGGER.info("Exiting");
        }
        return this.running.get();
    }

    void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(10L, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(10L, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        }
        catch (InterruptedException ie) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    protected Twitter getTwitterClient() {
        String baseUrl = TwitterProviderUtil.baseUrl(this.config);
        ConfigurationBuilder builder = new ConfigurationBuilder().setOAuthConsumerKey(this.config.getOauth().getConsumerKey()).setOAuthConsumerSecret(this.config.getOauth().getConsumerSecret()).setOAuthAccessToken(this.config.getOauth().getAccessToken()).setOAuthAccessTokenSecret(this.config.getOauth().getAccessTokenSecret()).setIncludeEntitiesEnabled(true).setJSONStoreEnabled(true).setAsyncNumThreads(3).setRestBaseURL(baseUrl).setIncludeMyRetweetEnabled(Boolean.TRUE.booleanValue()).setPrettyDebugEnabled(Boolean.TRUE.booleanValue());
        return new TwitterFactory(builder.build()).getInstance();
    }

    protected void callback() {
    }

    public void cleanUp() {
        this.shutdownAndAwaitTermination((ExecutorService)this.executor);
    }
}

