001/* 002 * This library is part of OpenCms - 003 * the Open Source Content Management System 004 * 005 * Copyright (c) Alkacon Software GmbH & Co. KG (http://www.alkacon.com) 006 * 007 * This library is free software; you can redistribute it and/or 008 * modify it under the terms of the GNU Lesser General Public 009 * License as published by the Free Software Foundation; either 010 * version 2.1 of the License, or (at your option) any later version. 011 * 012 * This library is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * For further information about Alkacon Software GmbH & Co. KG, please see the 018 * company website: http://www.alkacon.com 019 * 020 * For further information about OpenCms, please see the 021 * project website: http://www.opencms.org 022 * 023 * You should have received a copy of the GNU Lesser General Public 024 * License along with this library; if not, write to the Free Software 025 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 026 * 027 * 028 * This library is based to some extend on code from the 029 * OpenSymphony Quartz project. Original copyright notice: 030 * 031 * Copyright James House (c) 2001-2005 032 * 033 * All rights reserved. 034 * 035 * Redistribution and use in source and binary forms, with or without 036 * modification, are permitted provided that the following conditions are met: 1. 037 * Redistributions of source code must retain the above copyright notice, this 038 * list of conditions and the following disclaimer. 2. Redistributions in 039 * binary form must reproduce the above copyright notice, this list of 040 * conditions and the following disclaimer in the documentation and/or other 041 * materials provided with the distribution. 042 * 043 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND ANY 044 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 045 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 046 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR ANY 047 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 048 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 049 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 050 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 051 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 052 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 053 */ 054 055package org.opencms.scheduler; 056 057import org.opencms.main.CmsLog; 058 059import org.apache.commons.logging.Log; 060 061import org.quartz.SchedulerConfigException; 062import org.quartz.spi.ThreadPool; 063 064/** 065 * Simple thread pool used for the Quartz scheduler in OpenCms.<p> 066 * 067 * @since 6.0.0 068 */ 069public class CmsSchedulerThreadPool implements ThreadPool { 070 071 /** The log object for this class. */ 072 private static final Log LOG = CmsLog.getLog(CmsSchedulerThreadPool.class); 073 074 /** The current thread count. */ 075 private int m_currentThreadCount; 076 077 /** The inheri group. */ 078 private boolean m_inheritGroup; 079 080 /** The inherit loader. */ 081 private boolean m_inheritLoader; 082 083 /** The initial thread count. */ 084 private int m_initialThreadCount; 085 086 /** Flag indicating if the system is shutting down. */ 087 private boolean m_isShutdown; 088 089 /** Flag indicating whether to create thread deamons. */ 090 private boolean m_makeThreadsDaemons; 091 092 /** The max thread count. */ 093 private int m_maxThreadCount; 094 095 /** The next runnable. */ 096 private Runnable m_nextRunnable; 097 098 /** The next runnable lock. */ 099 private Object m_nextRunnableLock; 100 101 /** The thread group. */ 102 private ThreadGroup m_threadGroup; 103 104 /** The thread name prefix. */ 105 private String m_threadNamePrefix; 106 107 /** The thread priority. */ 108 private int m_threadPriority; 109 110 /** The threads. */ 111 private CmsSchedulerThread[] m_workers; 112 113 private String schedulerInstanceName, schedulerInstanceId; 114 115 /** 116 * Create a new <code>CmsSchedulerThreadPool</code> with default values. 117 * 118 * This will create a pool with 0 initial and 10 maximum threads running 119 * in normal priority.<p> 120 * 121 * @see #CmsSchedulerThreadPool(int, int, int) 122 */ 123 public CmsSchedulerThreadPool() { 124 125 this(0, 10, Thread.NORM_PRIORITY); 126 } 127 128 /** 129 * Create a new <code>CmsSchedulerThreadPool</code> with the specified number 130 * of threads that have the given priority. 131 * 132 * The OpenCms scheduler thread pool will initially start with provided number of 133 * active scheduler threads. 134 * When a thread is requested by the scheduler, and no "free" threads are available, 135 * a new thread will be added to the pool and used for execution. The pool 136 * will be allowed to grow until it has reached the configured number 137 * of maximum threads.<p> 138 * 139 * @param initialThreadCount the initial number of threads for the pool 140 * @param maxThreadCount maximum number of threads the pool is allowed to grow 141 * @param threadPriority the thread priority for the scheduler threads 142 * 143 * @see java.lang.Thread 144 */ 145 public CmsSchedulerThreadPool(int initialThreadCount, int maxThreadCount, int threadPriority) { 146 147 m_inheritGroup = true; 148 m_inheritLoader = true; 149 m_nextRunnableLock = new Object(); 150 m_threadNamePrefix = "OpenCms: Scheduler Thread "; 151 m_makeThreadsDaemons = true; 152 m_initialThreadCount = initialThreadCount; 153 m_currentThreadCount = 0; 154 m_maxThreadCount = maxThreadCount; 155 m_threadPriority = threadPriority; 156 } 157 158 /** 159 * @see org.quartz.spi.ThreadPool 160 * 161 * @return if the pool should be blocked for available threads 162 */ 163 public int blockForAvailableThreads() { 164 165 // Waiting will be done in runInThread so we always return 1 166 return 1; 167 } 168 169 /** 170 * @see org.quartz.spi.ThreadPool#getPoolSize() 171 */ 172 public int getPoolSize() { 173 174 return m_currentThreadCount; 175 } 176 177 /** 178 * Returns the thread priority of the threads in the scheduler pool.<p> 179 * 180 * @return the thread priority of the threads in the scheduler pool 181 */ 182 public int getThreadPriority() { 183 184 return m_threadPriority; 185 } 186 187 /** 188 * @see org.quartz.spi.ThreadPool#initialize() 189 */ 190 public void initialize() throws SchedulerConfigException { 191 192 if ((m_maxThreadCount <= 0) || (m_maxThreadCount > 200)) { 193 throw new SchedulerConfigException(Messages.get().getBundle().key(Messages.ERR_MAX_THREAD_COUNT_BOUNDS_0)); 194 } 195 if ((m_initialThreadCount < 0) || (m_initialThreadCount > m_maxThreadCount)) { 196 throw new SchedulerConfigException(Messages.get().getBundle().key(Messages.ERR_INIT_THREAD_COUNT_BOUNDS_0)); 197 } 198 if ((m_threadPriority <= 0) || (m_threadPriority > 9)) { 199 throw new SchedulerConfigException( 200 Messages.get().getBundle().key(Messages.ERR_SCHEDULER_PRIORITY_BOUNDS_0)); 201 } 202 203 if (m_inheritGroup) { 204 m_threadGroup = Thread.currentThread().getThreadGroup(); 205 } else { 206 // follow the threadGroup tree to the root thread group 207 m_threadGroup = Thread.currentThread().getThreadGroup(); 208 ThreadGroup parent = m_threadGroup; 209 while (!parent.getName().equals("main")) { 210 m_threadGroup = parent; 211 parent = m_threadGroup.getParent(); 212 } 213 m_threadGroup = new ThreadGroup(parent, this.getClass().getName()); 214 } 215 216 if (m_inheritLoader) { 217 LOG.debug( 218 Messages.get().getBundle().key( 219 Messages.LOG_USING_THREAD_CLASSLOADER_1, 220 Thread.currentThread().getName())); 221 } 222 223 // create the worker threads and start them 224 m_workers = new CmsSchedulerThread[m_maxThreadCount]; 225 for (int i = 0; i < m_initialThreadCount; ++i) { 226 growThreadPool(); 227 } 228 } 229 230 /** 231 * Run the given <code>Runnable</code> object in the next available 232 * <code>Thread</code>.<p> 233 * 234 * If while waiting the thread pool is asked to 235 * shut down, the Runnable is executed immediately within a new additional 236 * thread.<p> 237 * 238 * @param runnable the <code>Runnable</code> to run 239 * @return true if the <code>Runnable</code> was run 240 */ 241 public boolean runInThread(Runnable runnable) { 242 243 if (runnable == null) { 244 return false; 245 } 246 247 if (m_isShutdown) { 248 LOG.debug(Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_UNAVAILABLE_0)); 249 return false; 250 } 251 252 boolean hasNextRunnable; 253 synchronized (m_nextRunnableLock) { 254 // must synchronize here to avoid potential double checked locking 255 hasNextRunnable = (m_nextRunnable != null); 256 } 257 258 if (hasNextRunnable || (m_currentThreadCount == 0)) { 259 // try to grow the thread pool since other runnables are already waiting 260 growThreadPool(); 261 } 262 263 synchronized (m_nextRunnableLock) { 264 265 // wait until a worker thread has taken the previous Runnable 266 // or until the thread pool is asked to shutdown 267 while ((m_nextRunnable != null) && !m_isShutdown) { 268 try { 269 m_nextRunnableLock.wait(1000); 270 } catch (InterruptedException e) { 271 // can be ignores 272 } 273 } 274 275 // during normal operation, not shutdown, set the nextRunnable 276 // and notify the worker threads waiting (getNextRunnable()) 277 if (!m_isShutdown) { 278 m_nextRunnable = runnable; 279 m_nextRunnableLock.notifyAll(); 280 } 281 } 282 283 // if the thread pool is going down, execute the Runnable 284 // within a new additional worker thread (no thread from the pool) 285 // note: the synchronized section should be as short (time) as 286 // possible as starting a new thread is not a quick action 287 if (m_isShutdown) { 288 CmsSchedulerThread thread = new CmsSchedulerThread( 289 this, 290 m_threadGroup, 291 m_threadNamePrefix + "(final)", 292 m_threadPriority, 293 false, 294 runnable); 295 thread.start(); 296 } 297 298 return true; 299 } 300 301 /** 302 * Terminate any worker threads in this thread group.<p> 303 * 304 * Jobs currently in progress will be allowed to complete.<p> 305 */ 306 public void shutdown() { 307 308 shutdown(true); 309 } 310 311 /** 312 * Terminate all threads in this thread group.<p> 313 * 314 * @param waitForJobsToComplete if true,, all current jobs will be allowed to complete 315 */ 316 public void shutdown(boolean waitForJobsToComplete) { 317 318 m_isShutdown = true; 319 320 // signal each scheduler thread to shut down 321 for (int i = 0; i < m_currentThreadCount; i++) { 322 if (m_workers[i] != null) { 323 m_workers[i].shutdown(); 324 } 325 } 326 327 // give waiting (wait(1000)) worker threads a chance to shut down 328 // active worker threads will shut down after finishing their 329 // current job 330 synchronized (m_nextRunnableLock) { 331 m_nextRunnableLock.notifyAll(); 332 } 333 334 if (waitForJobsToComplete) { 335 // wait until all worker threads are shut down 336 int alive = m_currentThreadCount; 337 while (alive > 0) { 338 alive = 0; 339 for (int i = 0; i < m_currentThreadCount; i++) { 340 if (m_workers[i].isAlive()) { 341 try { 342 if (LOG.isDebugEnabled()) { 343 LOG.debug( 344 Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_WAITING_1, Integer.valueOf(i))); 345 } 346 347 // note: with waiting infinite - join(0) - the application 348 // may appear to 'hang' 349 // waiting for a finite time however requires an additional loop (alive) 350 alive++; 351 m_workers[i].join(200); 352 } catch (InterruptedException e) { 353 // can be ignored 354 } 355 } 356 } 357 } 358 359 int activeCount = m_threadGroup.activeCount(); 360 if ((activeCount > 0) && LOG.isInfoEnabled()) { 361 LOG.info( 362 Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_STILL_ACTIVE_1, Integer.valueOf(activeCount))); 363 } 364 if (LOG.isDebugEnabled()) { 365 LOG.debug(Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_SHUTDOWN_0)); 366 } 367 } 368 } 369 370 /** 371 * Dequeue the next pending <code>Runnable</code>.<p> 372 * 373 * @return the next pending <code>Runnable</code> 374 * @throws InterruptedException if something goes wrong 375 */ 376 protected Runnable getNextRunnable() throws InterruptedException { 377 378 Runnable toRun = null; 379 380 // Wait for new Runnable (see runInThread()) and notify runInThread() 381 // in case the next Runnable is already waiting. 382 synchronized (m_nextRunnableLock) { 383 if (m_nextRunnable == null) { 384 m_nextRunnableLock.wait(1000); 385 } 386 387 if (m_nextRunnable != null) { 388 toRun = m_nextRunnable; 389 m_nextRunnable = null; 390 m_nextRunnableLock.notifyAll(); 391 } 392 } 393 394 return toRun; 395 } 396 397 /** 398 * Grows the thread pool by one new thread if the maximum pool size 399 * has not been reached.<p> 400 */ 401 private void growThreadPool() { 402 403 if (m_currentThreadCount < m_maxThreadCount) { 404 // if maximum number is not reached grow the thread pool 405 synchronized (m_nextRunnableLock) { 406 m_workers[m_currentThreadCount] = new CmsSchedulerThread( 407 this, 408 m_threadGroup, 409 m_threadNamePrefix + m_currentThreadCount, 410 m_threadPriority, 411 m_makeThreadsDaemons); 412 m_workers[m_currentThreadCount].start(); 413 if (m_inheritLoader) { 414 m_workers[m_currentThreadCount].setContextClassLoader( 415 Thread.currentThread().getContextClassLoader()); 416 } 417 // increas the current size 418 m_currentThreadCount++; 419 // notify the waiting threads 420 m_nextRunnableLock.notifyAll(); 421 } 422 } 423 } 424 425 public void setInstanceId(String schedInstId) { 426 427 schedulerInstanceId = schedInstId; 428 } 429 430 public void setInstanceName(String schedName) { 431 432 schedulerInstanceName = schedName; 433 } 434}