001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Map; 007import java.util.concurrent.ConcurrentHashMap; 008import java.util.concurrent.LinkedBlockingDeque; 009import java.util.concurrent.Semaphore; 010import java.util.concurrent.TimeUnit; 011 012import org.openstreetmap.josm.tools.Logging; 013 014/** 015 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 016 * and it will set a runnable task with semaphore release, when job has finished. 017 * <p> 018 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 019 * guarantee that all threads will be busy, when there is work for them[2]. <br> 020 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 021 * tasks do not go through the Queue <br> 022 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 023 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 024 * for some task further in queue, but this implementation doesn't try to detect such situation 025 * 026 * @author Wiktor Niesiobędzki 027 */ 028public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 029 private static final long serialVersionUID = 1L; 030 031 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 032 private final int hostLimit; 033 034 /** 035 * Creates an unbounded queue 036 * @param hostLimit how many parallel calls to host to allow 037 */ 038 public HostLimitQueue(int hostLimit) { 039 super(); // create unbounded queue 040 this.hostLimit = hostLimit; 041 } 042 043 /** 044 * Creates bounded queue 045 * @param hostLimit how many parallel calls to host to allow 046 * @param queueLimit how deep the queue should be 047 */ 048 public HostLimitQueue(int hostLimit, int queueLimit) { 049 super(queueLimit); // create bounded queue 050 this.hostLimit = hostLimit; 051 } 052 053 private JCSCachedTileLoaderJob<?, ?> findJob() { 054 for (Runnable r : this) { 055 if (r instanceof JCSCachedTileLoaderJob) { 056 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 057 if (tryAcquireSemaphore(job)) { 058 if (remove(job)) { 059 return job; 060 } else { 061 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 062 // release the semaphore and look for another candidate 063 releaseSemaphore(job); 064 } 065 } else { 066 URL url = null; 067 try { 068 url = job.getUrl(); 069 } catch (IOException e) { 070 Logging.debug(e); 071 } 072 Logging.debug("TMS - Skipping job {0} because host limit reached", url); 073 } 074 } 075 } 076 return null; 077 } 078 079 @Override 080 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 081 Runnable job = findJob(); 082 if (job != null) { 083 return job; 084 } 085 job = pollFirst(timeout, unit); 086 if (job != null) { 087 try { 088 boolean gotLock = tryAcquireSemaphore(job, timeout, unit); 089 return gotLock ? job : null; 090 } catch (InterruptedException e) { 091 // acquire my got interrupted, first offer back what was taken 092 if (!offer(job)) { 093 Logging.warn("Unable to offer back " + job); 094 } 095 throw e; 096 } 097 } 098 return job; 099 } 100 101 @Override 102 public Runnable take() throws InterruptedException { 103 Runnable job = findJob(); 104 if (job != null) { 105 return job; 106 } 107 job = takeFirst(); 108 try { 109 acquireSemaphore(job); 110 } catch (InterruptedException e) { 111 // acquire my got interrupted, first offer back what was taken 112 if (!offer(job)) { 113 Logging.warn("Unable to offer back " + job); 114 } 115 throw e; 116 } 117 return job; 118 } 119 120 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 121 String host; 122 try { 123 host = job.getUrl().getHost(); 124 } catch (IOException e) { 125 // do not pass me illegal URL's 126 throw new IllegalArgumentException(e); 127 } 128 Semaphore limit = hostSemaphores.get(host); 129 if (limit == null) { 130 limit = hostSemaphores.computeIfAbsent(host, k -> new Semaphore(hostLimit)); 131 } 132 return limit; 133 } 134 135 private void acquireSemaphore(Runnable job) throws InterruptedException { 136 if (job instanceof JCSCachedTileLoaderJob) { 137 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 138 getSemaphore(jcsJob).acquire(); 139 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 140 } 141 } 142 143 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 144 boolean ret = true; 145 Semaphore limit = getSemaphore(job); 146 if (limit != null) { 147 ret = limit.tryAcquire(); 148 if (ret) { 149 job.setFinishedTask(() -> releaseSemaphore(job)); 150 } 151 } 152 return ret; 153 } 154 155 private boolean tryAcquireSemaphore(Runnable job, long timeout, TimeUnit unit) throws InterruptedException { 156 boolean ret = true; 157 if (job instanceof JCSCachedTileLoaderJob) { 158 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 159 Semaphore limit = getSemaphore(jcsJob); 160 if (limit != null) { 161 ret = limit.tryAcquire(timeout, unit); 162 if (ret) { 163 jcsJob.setFinishedTask(() -> releaseSemaphore(jcsJob)); 164 } 165 } 166 } 167 return ret; 168 } 169 170 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 171 Semaphore limit = getSemaphore(job); 172 if (limit != null) { 173 limit.release(); 174 if (limit.availablePermits() > hostLimit) { 175 Logging.warn("More permits than it should be"); 176 } 177 } 178 } 179}