/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.data.spi.filesystem;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.PartitionStrategy;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.ClockReady;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.PartitionListener;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.data.spi.RollingWriter;
import org.kitesdk.data.spi.StorageKey;
import org.kitesdk.data.spi.filesystem.FileSystemDataset;
import org.kitesdk.data.spi.filesystem.FileSystemView;
import org.kitesdk.data.spi.filesystem.FileSystemWriter;
import org.kitesdk.data.spi.filesystem.PathConversion;
import org.kitesdk.shaded.com.google.common.annotations.VisibleForTesting;
import org.kitesdk.shaded.com.google.common.base.Objects;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.cache.CacheBuilder;
import org.kitesdk.shaded.com.google.common.cache.CacheLoader;
import org.kitesdk.shaded.com.google.common.cache.LoadingCache;
import org.kitesdk.shaded.com.google.common.cache.RemovalListener;
import org.kitesdk.shaded.com.google.common.cache.RemovalNotification;
import org.kitesdk.shaded.com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class PartitionedDatasetWriter<E, W extends FileSystemWriter<E>>
extends AbstractDatasetWriter<E>
implements RollingWriter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedDatasetWriter.class);
    private static final int DEFAULT_WRITER_CACHE_SIZE = 10;
    protected FileSystemView<E> view;
    private final int maxWriters;
    private final PartitionStrategy partitionStrategy;
    protected LoadingCache<StorageKey, W> cachedWriters;
    private final StorageKey reusedKey;
    private final EntityAccessor<E> accessor;
    private final Map<String, Object> provided;
    protected ReaderWriterState state;
    protected long targetFileSize;
    protected long rollIntervalMillis;

    static <E> PartitionedDatasetWriter<E, ?> newWriter(FileSystemView<E> view) {
        DatasetDescriptor descriptor = view.getDataset().getDescriptor();
        Format format = descriptor.getFormat();
        if (Formats.PARQUET.equals(format)) {
            if (DescriptorUtil.isDisabled("kite.parquet.non-durable-writes", descriptor)) {
                return new IncrementalPartitionedDatasetWriter(view);
            }
            return new NonDurablePartitionedDatasetWriter(view);
        }
        if (Formats.AVRO.equals(format) || Formats.CSV.equals(format)) {
            return new IncrementalPartitionedDatasetWriter(view);
        }
        return new NonDurablePartitionedDatasetWriter(view);
    }

    private PartitionedDatasetWriter(FileSystemView<E> view) {
        DatasetDescriptor descriptor = view.getDataset().getDescriptor();
        Preconditions.checkArgument(descriptor.isPartitioned(), "Dataset " + view.getDataset() + " is not partitioned");
        this.view = view;
        this.partitionStrategy = descriptor.getPartitionStrategy();
        int defaultMaxWriters = this.partitionStrategy.getCardinality();
        if (defaultMaxWriters < 0 || defaultMaxWriters > 10) {
            defaultMaxWriters = 10;
        }
        this.maxWriters = DescriptorUtil.getInt("kite.writer.cache-size", descriptor, defaultMaxWriters);
        this.state = ReaderWriterState.NEW;
        this.reusedKey = new StorageKey(this.partitionStrategy);
        this.accessor = view.getAccessor();
        this.provided = view.getProvidedValues();
        this.targetFileSize = !Formats.PARQUET.equals(descriptor.getFormat()) ? DescriptorUtil.getLong("kite.writer.target-file-size", descriptor, -1L) : -1L;
        this.rollIntervalMillis = 1000L * DescriptorUtil.getLong("kite.writer.roll-interval-seconds", descriptor, -1L);
    }

    @Override
    public void initialize() {
        Preconditions.checkState(this.state.equals((Object)ReaderWriterState.NEW), "Unable to open a writer from state:%s", new Object[]{this.state});
        DatasetDescriptor descriptor = this.view.getDataset().getDescriptor();
        ValidationException.check(FileSystemWriter.isSupportedFormat(descriptor), "Not a supported format: %s", descriptor.getFormat());
        LOG.debug("Opening partitioned dataset writer w/strategy:{}", (Object)this.partitionStrategy);
        this.cachedWriters = CacheBuilder.newBuilder().maximumSize(this.maxWriters).removalListener(new DatasetWriterCloser()).build(this.createCacheLoader());
        this.state = ReaderWriterState.OPEN;
    }

    protected abstract CacheLoader<StorageKey, W> createCacheLoader();

    @Override
    public void write(E entity) {
        Preconditions.checkState(this.state.equals((Object)ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", new Object[]{this.state});
        this.accessor.keyFor(entity, this.provided, this.reusedKey);
        DatasetWriter writer = (DatasetWriter)this.cachedWriters.getIfPresent(this.reusedKey);
        if (writer == null) {
            Preconditions.checkArgument(this.view.includes(entity), "View %s does not include entity %s", this.view, entity);
            StorageKey key = StorageKey.copy(this.reusedKey);
            try {
                writer = (DatasetWriter)this.cachedWriters.getUnchecked(key);
            }
            catch (UncheckedExecutionException ex) {
                throw new IllegalArgumentException("Problem creating view for entity: " + entity, ex.getCause());
            }
        }
        writer.write(entity);
    }

    @Override
    public void close() {
        if (this.state.equals((Object)ReaderWriterState.OPEN)) {
            LOG.debug("Closing all cached writers for view:{}", this.view);
            for (DatasetWriter writer : this.cachedWriters.asMap().values()) {
                LOG.debug("Closing partition writer:{}", (Object)writer);
                writer.close();
            }
            this.state = ReaderWriterState.CLOSED;
        }
    }

    @Override
    public void setRollIntervalMillis(long rollIntervalMillis) {
        this.rollIntervalMillis = rollIntervalMillis;
        if (ReaderWriterState.OPEN == this.state) {
            for (DatasetWriter writer : this.cachedWriters.asMap().values()) {
                if (!(writer instanceof RollingWriter)) continue;
                ((RollingWriter)((Object)writer)).setRollIntervalMillis(rollIntervalMillis);
            }
        }
    }

    @Override
    public void setTargetFileSize(long targetSizeBytes) {
        this.targetFileSize = targetSizeBytes;
        if (ReaderWriterState.OPEN == this.state) {
            for (DatasetWriter writer : this.cachedWriters.asMap().values()) {
                if (!(writer instanceof RollingWriter)) continue;
                ((RollingWriter)((Object)writer)).setTargetFileSize(targetSizeBytes);
            }
        }
    }

    @Override
    public void tick() {
        if (ReaderWriterState.OPEN == this.state) {
            for (DatasetWriter writer : this.cachedWriters.asMap().values()) {
                if (!(writer instanceof ClockReady)) continue;
                ((ClockReady)((Object)writer)).tick();
            }
        }
    }

    @Override
    public boolean isOpen() {
        return this.state.equals((Object)ReaderWriterState.OPEN);
    }

    public String toString() {
        return Objects.toStringHelper(this).add("partitionStrategy", this.partitionStrategy).add("maxWriters", this.maxWriters).add("view", this.view).add("cachedWriters", this.cachedWriters).toString();
    }

    private static class IncrementalPartitionedDatasetWriter<E>
    extends PartitionedDatasetWriter<E, FileSystemWriter.IncrementalWriter<E>>
    implements Flushable,
    Syncable {
        private IncrementalPartitionedDatasetWriter(FileSystemView<E> view) {
            super(view);
        }

        @Override
        protected CacheLoader<StorageKey, FileSystemWriter.IncrementalWriter<E>> createCacheLoader() {
            return new IncrementalDatasetWriterCacheLoader(this.view, new ConfAccessor(){

                @Override
                public long getTargetFileSize() {
                    return targetFileSize;
                }

                @Override
                public long getRollIntervalMillis() {
                    return rollIntervalMillis;
                }
            });
        }

        @Override
        public void flush() {
            Preconditions.checkState(this.state.equals((Object)ReaderWriterState.OPEN), "Attempt to flush a writer in state:%s", new Object[]{this.state});
            LOG.debug("Flushing all cached writers for view:{}", (Object)this.view);
            for (FileSystemWriter.IncrementalWriter writer : this.cachedWriters.asMap().values()) {
                LOG.debug("Flushing partition writer:{}", (Object)writer);
                writer.flush();
            }
        }

        @Override
        public void sync() {
            Preconditions.checkState(this.state.equals((Object)ReaderWriterState.OPEN), "Attempt to sync a writer in state:%s", new Object[]{this.state});
            LOG.debug("Syncing all cached writers for view:{}", (Object)this.view);
            for (FileSystemWriter.IncrementalWriter writer : this.cachedWriters.asMap().values()) {
                LOG.debug("Syncing partition writer:{}", (Object)writer);
                writer.sync();
            }
        }
    }

    private static class NonDurablePartitionedDatasetWriter<E>
    extends PartitionedDatasetWriter<E, FileSystemWriter<E>> {
        private NonDurablePartitionedDatasetWriter(FileSystemView<E> view) {
            super(view);
        }

        @Override
        protected CacheLoader<StorageKey, FileSystemWriter<E>> createCacheLoader() {
            return new DatasetWriterCacheLoader(this.view, new ConfAccessor(){

                @Override
                public long getTargetFileSize() {
                    return targetFileSize;
                }

                @Override
                public long getRollIntervalMillis() {
                    return rollIntervalMillis;
                }
            });
        }
    }

    private static class DatasetWriterCloser<E>
    implements RemovalListener<StorageKey, DatasetWriter<E>> {
        private DatasetWriterCloser() {
        }

        @Override
        public void onRemoval(RemovalNotification<StorageKey, DatasetWriter<E>> notification) {
            DatasetWriter<E> writer = notification.getValue();
            LOG.debug("Closing writer:{} for partition:{}", writer, (Object)notification.getKey());
            writer.close();
        }
    }

    @VisibleForTesting
    static class IncrementalDatasetWriterCacheLoader<E>
    extends CacheLoader<StorageKey, FileSystemWriter.IncrementalWriter<E>> {
        private final FileSystemView<E> view;
        private final PathConversion convert;
        private final ConfAccessor conf;

        public IncrementalDatasetWriterCacheLoader(FileSystemView<E> view, ConfAccessor conf) {
            this.view = view;
            this.convert = new PathConversion(view.getDataset().getDescriptor().getSchema());
            this.conf = conf;
        }

        @Override
        @SuppressWarnings(value={"BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"}, justification="Writer is known to be IncrementalWriter")
        public FileSystemWriter.IncrementalWriter<E> load(StorageKey key) throws Exception {
            Preconditions.checkState(this.view.getDataset() instanceof FileSystemDataset, "FileSystemWriters cannot create writer for " + this.view.getDataset());
            FileSystemDataset dataset = (FileSystemDataset)this.view.getDataset();
            Path partition = this.convert.fromKey(key);
            FileSystemWriter writer = FileSystemWriter.newWriter(dataset.getFileSystem(), new Path(dataset.getDirectory(), partition), this.conf.getRollIntervalMillis(), this.conf.getTargetFileSize(), dataset.getDescriptor());
            PartitionListener listener = dataset.getPartitionListener();
            if (listener != null) {
                listener.partitionAdded(dataset.getNamespace(), dataset.getName(), partition.toString());
            }
            writer.initialize();
            return (FileSystemWriter.IncrementalWriter)writer;
        }
    }

    @VisibleForTesting
    static class DatasetWriterCacheLoader<E>
    extends CacheLoader<StorageKey, FileSystemWriter<E>> {
        private final FileSystemView<E> view;
        private final PathConversion convert;
        private final ConfAccessor conf;

        public DatasetWriterCacheLoader(FileSystemView<E> view, ConfAccessor conf) {
            this.view = view;
            this.convert = new PathConversion(view.getDataset().getDescriptor().getSchema());
            this.conf = conf;
        }

        @Override
        public FileSystemWriter<E> load(StorageKey key) throws Exception {
            Preconditions.checkState(this.view.getDataset() instanceof FileSystemDataset, "FileSystemWriters cannot create writer for " + this.view.getDataset());
            FileSystemDataset dataset = (FileSystemDataset)this.view.getDataset();
            Path partition = this.convert.fromKey(key);
            FileSystemWriter writer = FileSystemWriter.newWriter(dataset.getFileSystem(), new Path(dataset.getDirectory(), partition), this.conf.getRollIntervalMillis(), this.conf.getTargetFileSize(), dataset.getDescriptor());
            PartitionListener listener = dataset.getPartitionListener();
            if (listener != null) {
                listener.partitionAdded(dataset.getNamespace(), dataset.getName(), partition.toString());
            }
            writer.initialize();
            return writer;
        }
    }

    public static interface ConfAccessor {
        public long getTargetFileSize();

        public long getRollIntervalMillis();
    }
}

