/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.twitter.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
import org.apache.streams.twitter.provider.TwitterFollowingProviderTask;
import org.apache.streams.twitter.provider.TwitterUserInformationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;

public class TwitterFollowingProvider
extends TwitterUserInformationProvider {
    public static final String STREAMS_ID = "TwitterFollowingProvider";
    private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
    private TwitterFollowingConfiguration config;
    List<ListenableFuture<Object>> futures = new ArrayList<ListenableFuture<Object>>();
    public static final int MAX_NUMBER_WAITING = 10000;

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument((args.length >= 2 ? 1 : 0) != 0);
        String configfile = args[0];
        String outfile = args[1];
        Config reference = ConfigFactory.load();
        File conf_file = new File(configfile);
        assert (conf_file.exists());
        Config testResourceConfig = ConfigFactory.parseFileAnySyntax((File)conf_file, (ConfigParseOptions)ConfigParseOptions.defaults().setAllowMissing(false));
        Config typesafe = testResourceConfig.withFallback((ConfigMergeable)reference).resolve();
        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration((Config)typesafe);
        TwitterFollowingConfiguration config = (TwitterFollowingConfiguration)new ComponentConfigurator(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter");
        TwitterFollowingProvider provider = new TwitterFollowingProvider(config);
        StreamsJacksonMapper mapper = new StreamsJacksonMapper((List)Lists.newArrayList((Object[])new String[]{"EEE MMM dd HH:mm:ss Z yyyy"}));
        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
        provider.prepare(config);
        provider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly((long)streamsConfiguration.getBatchFrequencyMs(), (TimeUnit)TimeUnit.MILLISECONDS);
            for (StreamsDatum datum : provider.readCurrent()) {
                try {
                    String json = mapper.writeValueAsString(datum.getDocument());
                    outStream.println(json);
                }
                catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (provider.isRunning());
        provider.cleanUp();
        outStream.flush();
    }

    @Override
    public TwitterFollowingConfiguration getConfig() {
        return this.config;
    }

    public TwitterFollowingProvider() {
        this.config = (TwitterFollowingConfiguration)new ComponentConfigurator(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter"));
    }

    public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
        super(config);
        this.config = config;
    }

    @Override
    public void prepare(Object o) {
        super.prepare(this.config);
        Preconditions.checkNotNull((Object)this.getConfig().getEndpoint());
        Preconditions.checkArgument((this.getConfig().getEndpoint().equals("friends") || this.getConfig().getEndpoint().equals("followers") ? 1 : 0) != 0);
    }

    @Override
    public void startStream() {
        Preconditions.checkNotNull((Object)this.executor);
        Preconditions.checkArgument((this.idsBatches.hasNext() || this.screenNameBatches.hasNext() ? 1 : 0) != 0);
        LOGGER.info("startStream");
        this.running.set(true);
        while (this.idsBatches.hasNext()) {
            this.submitFollowingThreads((Long[])this.idsBatches.next());
        }
        while (this.screenNameBatches.hasNext()) {
            this.submitFollowingThreads((String[])this.screenNameBatches.next());
        }
        this.executor.shutdown();
    }

    protected void submitFollowingThreads(Long[] ids) {
        Twitter client = this.getTwitterClient();
        for (int i = 0; i < ids.length; ++i) {
            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
            ListenableFuture future = this.executor.submit((Runnable)providerTask);
            this.futures.add((ListenableFuture<Object>)future);
            LOGGER.info("submitted {}", (Object)ids[i]);
        }
    }

    protected void submitFollowingThreads(String[] screenNames) {
        Twitter client = this.getTwitterClient();
        for (int i = 0; i < screenNames.length; ++i) {
            TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
            ListenableFuture future = this.executor.submit((Runnable)providerTask);
            this.futures.add((ListenableFuture<Object>)future);
            LOGGER.info("submitted {}", (Object)screenNames[i]);
        }
    }

    @Override
    public StreamsResultSet readCurrent() {
        StreamsResultSet result;
        LOGGER.info("{}{} - readCurrent", (Object)this.idsBatches, (Object)this.screenNameBatches);
        try {
            this.lock.writeLock().lock();
            result = new StreamsResultSet(this.providerQueue);
            result.setCounter(new DatumStatusCounter());
            this.providerQueue = this.constructQueue();
            LOGGER.debug("{}{} - providing {} docs", new Object[]{this.idsBatches, this.screenNameBatches, result.size()});
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return result;
    }

    @Override
    public boolean isRunning() {
        if (this.providerQueue.isEmpty() && this.executor.isTerminated() && Futures.allAsList(this.futures).isDone()) {
            LOGGER.info("Completed");
            this.running.set(false);
            LOGGER.info("Exiting");
        }
        return this.running.get();
    }
}

