001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Map;
007import java.util.concurrent.ConcurrentHashMap;
008import java.util.concurrent.LinkedBlockingDeque;
009import java.util.concurrent.Semaphore;
010import java.util.concurrent.TimeUnit;
011
012import org.openstreetmap.josm.tools.Logging;
013
014/**
015 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
016 * and it will set a runnable task with semaphore release, when job has finished.
017 * <p>
018 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
019 * guarantee that all threads will be busy, when there is work for them[2]. <br>
020 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
021 *     tasks do not go through the Queue <br>
022 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
023 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
024 *     for some task further in queue, but this implementation doesn't try to detect such situation
025 *
026 * @author Wiktor Niesiobędzki
027 */
028public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
029    private static final long serialVersionUID = 1L;
030
031    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
032    private final int hostLimit;
033
034    /**
035     * Creates an unbounded queue
036     * @param hostLimit how many parallel calls to host to allow
037     */
038    public HostLimitQueue(int hostLimit) {
039        super(); // create unbounded queue
040        this.hostLimit = hostLimit;
041    }
042
043    /**
044     * Creates bounded queue
045     * @param hostLimit how many parallel calls to host to allow
046     * @param queueLimit how deep the queue should be
047     */
048    public HostLimitQueue(int hostLimit, int queueLimit) {
049        super(queueLimit); // create bounded queue
050        this.hostLimit = hostLimit;
051    }
052
053    private JCSCachedTileLoaderJob<?, ?> findJob() {
054        for (Runnable r : this) {
055            if (r instanceof JCSCachedTileLoaderJob) {
056                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
057                if (tryAcquireSemaphore(job)) {
058                    if (remove(job)) {
059                        return job;
060                    } else {
061                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
062                        // release the semaphore and look for another candidate
063                        releaseSemaphore(job);
064                    }
065                } else {
066                    URL url = null;
067                    try {
068                        url = job.getUrl();
069                    } catch (IOException e) {
070                        Logging.debug(e);
071                    }
072                    Logging.debug("TMS - Skipping job {0} because host limit reached", url);
073                }
074            }
075        }
076        return null;
077    }
078
079    @Override
080    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
081        Runnable job = findJob();
082        if (job != null) {
083            return job;
084        }
085        job = pollFirst(timeout, unit);
086        if (job != null) {
087            try {
088                boolean gotLock = tryAcquireSemaphore(job, timeout, unit);
089                return gotLock ? job : null;
090            } catch (InterruptedException e) {
091                // acquire my got interrupted, first offer back what was taken
092                if (!offer(job)) {
093                    Logging.warn("Unable to offer back " + job);
094                }
095                throw e;
096            }
097        }
098        return job;
099    }
100
101    @Override
102    public Runnable take() throws InterruptedException {
103        Runnable job = findJob();
104        if (job != null) {
105            return job;
106        }
107        job = takeFirst();
108        try {
109            acquireSemaphore(job);
110        } catch (InterruptedException e) {
111            // acquire my got interrupted, first offer back what was taken
112            if (!offer(job)) {
113                Logging.warn("Unable to offer back " + job);
114            }
115            throw e;
116        }
117        return job;
118    }
119
120    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
121        String host;
122        try {
123            host = job.getUrl().getHost();
124        } catch (IOException e) {
125            // do not pass me illegal URL's
126            throw new IllegalArgumentException(e);
127        }
128        Semaphore limit = hostSemaphores.get(host);
129        if (limit == null) {
130            limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit));
131        }
132        return limit;
133    }
134
135    private void acquireSemaphore(Runnable job) throws InterruptedException {
136        if (job instanceof JCSCachedTileLoaderJob) {
137            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
138            getSemaphore(jcsJob).acquire();
139            jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
140        }
141    }
142
143    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
144        boolean ret = true;
145        Semaphore limit = getSemaphore(job);
146        if (limit != null) {
147            ret = limit.tryAcquire();
148            if (ret) {
149                job.setFinishedTask(() -> releaseSemaphore(job));
150            }
151        }
152        return ret;
153    }
154
155    private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException {
156        boolean ret = true;
157        if (job instanceof JCSCachedTileLoaderJob) {
158            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
159            Semaphore limit = getSemaphore(jcsJob);
160            if (limit != null) {
161                ret = limit.tryAcquire(timeout, unit);
162                if (ret) {
163                    jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob));
164                }
165            }
166        }
167        return ret;
168    }
169
170    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
171        Semaphore limit = getSemaphore(job);
172        if (limit != null) {
173            limit.release();
174            if (limit.availablePermits() > hostLimit) {
175                Logging.warn("More permits than it should be");
176            }
177        }
178    }
179}