001/*
002 * This library is part of OpenCms -
003 * the Open Source Content Management System
004 *
005 * Copyright (c) Alkacon Software GmbH & Co. KG (http://www.alkacon.com)
006 *
007 * This library is free software; you can redistribute it and/or
008 * modify it under the terms of the GNU Lesser General Public
009 * License as published by the Free Software Foundation; either
010 * version 2.1 of the License, or (at your option) any later version.
011 *
012 * This library is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * For further information about Alkacon Software GmbH & Co. KG, please see the
018 * company website: http://www.alkacon.com
019 *
020 * For further information about OpenCms, please see the
021 * project website: http://www.opencms.org
022 *
023 * You should have received a copy of the GNU Lesser General Public
024 * License along with this library; if not, write to the Free Software
025 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
026 *
027 *
028 * This library is based to some extend on code from the
029 * OpenSymphony Quartz project. Original copyright notice:
030 *
031 * Copyright James House (c) 2001-2005
032 *
033 * All rights reserved.
034 *
035 * Redistribution and use in source and binary forms, with or without
036 * modification, are permitted provided that the following conditions are met: 1.
037 * Redistributions of source code must retain the above copyright notice, this
038 * list of conditions and the following disclaimer. 2. Redistributions in
039 * binary form must reproduce the above copyright notice, this list of
040 * conditions and the following disclaimer in the documentation and/or other
041 * materials provided with the distribution.
042 *
043 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND ANY
044 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
045 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
046 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR ANY
047 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
048 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
049 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
050 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
051 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
052 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
053 */
054
055package org.opencms.scheduler;
056
057import org.opencms.main.CmsLog;
058
059import org.apache.commons.logging.Log;
060
061import org.quartz.SchedulerConfigException;
062import org.quartz.spi.ThreadPool;
063
064/**
065 * Simple thread pool used for the Quartz scheduler in OpenCms.<p>
066 *
067 * @since 6.0.0
068 */
069public class CmsSchedulerThreadPool implements ThreadPool {
070
071    /** The log object for this class. */
072    private static final Log LOG = CmsLog.getLog(CmsSchedulerThreadPool.class);
073
074    /** The current thread count. */
075    private int m_currentThreadCount;
076
077    /** The inheri group. */
078    private boolean m_inheritGroup;
079
080    /** The inherit loader. */
081    private boolean m_inheritLoader;
082
083    /** The initial thread count. */
084    private int m_initialThreadCount;
085
086    /** Flag indicating if the system is shutting down. */
087    private boolean m_isShutdown;
088
089    /** Flag indicating whether to create thread deamons. */
090    private boolean m_makeThreadsDaemons;
091
092    /** The max thread count. */
093    private int m_maxThreadCount;
094
095    /** The next runnable. */
096    private Runnable m_nextRunnable;
097
098    /** The next runnable lock. */
099    private Object m_nextRunnableLock;
100
101    /** The thread group. */
102    private ThreadGroup m_threadGroup;
103
104    /** The thread name prefix. */
105    private String m_threadNamePrefix;
106
107    /** The thread priority. */
108    private int m_threadPriority;
109
110    /** The threads. */
111    private CmsSchedulerThread[] m_workers;
112    
113    private String schedulerInstanceName, schedulerInstanceId;
114
115    /**
116     * Create a new <code>CmsSchedulerThreadPool</code> with default values.
117     *
118     * This will create a pool with 0 initial and 10 maximum threads running
119     * in normal priority.<p>
120     *
121     * @see #CmsSchedulerThreadPool(int, int, int)
122     */
123    public CmsSchedulerThreadPool() {
124
125        this(0, 10, Thread.NORM_PRIORITY);
126    }
127
128    /**
129     * Create a new <code>CmsSchedulerThreadPool</code> with the specified number
130     * of threads that have the given priority.
131     *
132     * The OpenCms scheduler thread pool will initially start with provided number of
133     * active scheduler threads.
134     * When a thread is requested by the scheduler, and no "free" threads are available,
135     * a new thread will be added to the pool and used for execution. The pool
136     * will be allowed to grow until it has reached the configured number
137     * of maximum threads.<p>
138     *
139     * @param initialThreadCount the initial number of threads for the pool
140     * @param maxThreadCount maximum number of threads the pool is allowed to grow
141     * @param threadPriority the thread priority for the scheduler threads
142     *
143     * @see java.lang.Thread
144     */
145    public CmsSchedulerThreadPool(int initialThreadCount, int maxThreadCount, int threadPriority) {
146
147        m_inheritGroup = true;
148        m_inheritLoader = true;
149        m_nextRunnableLock = new Object();
150        m_threadNamePrefix = "OpenCms: Scheduler Thread ";
151        m_makeThreadsDaemons = true;
152        m_initialThreadCount = initialThreadCount;
153        m_currentThreadCount = 0;
154        m_maxThreadCount = maxThreadCount;
155        m_threadPriority = threadPriority;
156    }
157
158    /**
159     * @see org.quartz.spi.ThreadPool
160     *
161     * @return if the pool should be blocked for available threads
162     */
163    public int blockForAvailableThreads() {
164
165        // Waiting will be done in runInThread so we always return 1
166        return 1;
167    }
168
169    /**
170     * @see org.quartz.spi.ThreadPool#getPoolSize()
171     */
172    public int getPoolSize() {
173
174        return m_currentThreadCount;
175    }
176
177    /**
178     * Returns the thread priority of the threads in the scheduler pool.<p>
179     *
180     * @return the thread priority of the threads in the scheduler pool
181     */
182    public int getThreadPriority() {
183
184        return m_threadPriority;
185    }
186
187    /**
188     * @see org.quartz.spi.ThreadPool#initialize()
189     */
190    public void initialize() throws SchedulerConfigException {
191
192        if ((m_maxThreadCount <= 0) || (m_maxThreadCount > 200)) {
193            throw new SchedulerConfigException(Messages.get().getBundle().key(Messages.ERR_MAX_THREAD_COUNT_BOUNDS_0));
194        }
195        if ((m_initialThreadCount < 0) || (m_initialThreadCount > m_maxThreadCount)) {
196            throw new SchedulerConfigException(Messages.get().getBundle().key(Messages.ERR_INIT_THREAD_COUNT_BOUNDS_0));
197        }
198        if ((m_threadPriority <= 0) || (m_threadPriority > 9)) {
199            throw new SchedulerConfigException(
200                Messages.get().getBundle().key(Messages.ERR_SCHEDULER_PRIORITY_BOUNDS_0));
201        }
202
203        if (m_inheritGroup) {
204            m_threadGroup = Thread.currentThread().getThreadGroup();
205        } else {
206            // follow the threadGroup tree to the root thread group
207            m_threadGroup = Thread.currentThread().getThreadGroup();
208            ThreadGroup parent = m_threadGroup;
209            while (!parent.getName().equals("main")) {
210                m_threadGroup = parent;
211                parent = m_threadGroup.getParent();
212            }
213            m_threadGroup = new ThreadGroup(parent, this.getClass().getName());
214        }
215
216        if (m_inheritLoader) {
217            LOG.debug(
218                Messages.get().getBundle().key(
219                    Messages.LOG_USING_THREAD_CLASSLOADER_1,
220                    Thread.currentThread().getName()));
221        }
222
223        // create the worker threads and start them
224        m_workers = new CmsSchedulerThread[m_maxThreadCount];
225        for (int i = 0; i < m_initialThreadCount; ++i) {
226            growThreadPool();
227        }
228    }
229
230    /**
231     * Run the given <code>Runnable</code> object in the next available
232     * <code>Thread</code>.<p>
233     *
234     * If while waiting the thread pool is asked to
235     * shut down, the Runnable is executed immediately within a new additional
236     * thread.<p>
237     *
238     * @param runnable the <code>Runnable</code> to run
239     * @return true if the <code>Runnable</code> was run
240     */
241    public boolean runInThread(Runnable runnable) {
242
243        if (runnable == null) {
244            return false;
245        }
246
247        if (m_isShutdown) {
248            LOG.debug(Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_UNAVAILABLE_0));
249            return false;
250        }
251
252        boolean hasNextRunnable;
253        synchronized (m_nextRunnableLock) {
254            // must synchronize here to avoid potential double checked locking
255            hasNextRunnable = (m_nextRunnable != null);
256        }
257
258        if (hasNextRunnable || (m_currentThreadCount == 0)) {
259            // try to grow the thread pool since other runnables are already waiting
260            growThreadPool();
261        }
262
263        synchronized (m_nextRunnableLock) {
264
265            // wait until a worker thread has taken the previous Runnable
266            // or until the thread pool is asked to shutdown
267            while ((m_nextRunnable != null) && !m_isShutdown) {
268                try {
269                    m_nextRunnableLock.wait(1000);
270                } catch (InterruptedException e) {
271                    // can be ignores
272                }
273            }
274
275            // during normal operation, not shutdown, set the nextRunnable
276            // and notify the worker threads waiting (getNextRunnable())
277            if (!m_isShutdown) {
278                m_nextRunnable = runnable;
279                m_nextRunnableLock.notifyAll();
280            }
281        }
282
283        // if the thread pool is going down, execute the Runnable
284        // within a new additional worker thread (no thread from the pool)
285        // note: the synchronized section should be as short (time) as
286        // possible as starting a new thread is not a quick action
287        if (m_isShutdown) {
288            CmsSchedulerThread thread = new CmsSchedulerThread(
289                this,
290                m_threadGroup,
291                m_threadNamePrefix + "(final)",
292                m_threadPriority,
293                false,
294                runnable);
295            thread.start();
296        }
297
298        return true;
299    }
300
301    /**
302     * Terminate any worker threads in this thread group.<p>
303     *
304     * Jobs currently in progress will be allowed to complete.<p>
305     */
306    public void shutdown() {
307
308        shutdown(true);
309    }
310
311    /**
312     * Terminate all threads in this thread group.<p>
313     *
314     * @param waitForJobsToComplete if true,, all current jobs will be allowed to complete
315     */
316    public void shutdown(boolean waitForJobsToComplete) {
317
318        m_isShutdown = true;
319
320        // signal each scheduler thread to shut down
321        for (int i = 0; i < m_currentThreadCount; i++) {
322            if (m_workers[i] != null) {
323                m_workers[i].shutdown();
324            }
325        }
326
327        // give waiting (wait(1000)) worker threads a chance to shut down
328        // active worker threads will shut down after finishing their
329        // current job
330        synchronized (m_nextRunnableLock) {
331            m_nextRunnableLock.notifyAll();
332        }
333
334        if (waitForJobsToComplete) {
335            // wait until all worker threads are shut down
336            int alive = m_currentThreadCount;
337            while (alive > 0) {
338                alive = 0;
339                for (int i = 0; i < m_currentThreadCount; i++) {
340                    if (m_workers[i].isAlive()) {
341                        try {
342                            if (LOG.isDebugEnabled()) {
343                                LOG.debug(
344                                    Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_WAITING_1, Integer.valueOf(i)));
345                            }
346
347                            // note: with waiting infinite - join(0) - the application
348                            // may appear to 'hang'
349                            // waiting for a finite time however requires an additional loop (alive)
350                            alive++;
351                            m_workers[i].join(200);
352                        } catch (InterruptedException e) {
353                            // can be ignored
354                        }
355                    }
356                }
357            }
358
359            int activeCount = m_threadGroup.activeCount();
360            if ((activeCount > 0) && LOG.isInfoEnabled()) {
361                LOG.info(
362                    Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_STILL_ACTIVE_1, Integer.valueOf(activeCount)));
363            }
364            if (LOG.isDebugEnabled()) {
365                LOG.debug(Messages.get().getBundle().key(Messages.LOG_THREAD_POOL_SHUTDOWN_0));
366            }
367        }
368    }
369
370    /**
371     * Dequeue the next pending <code>Runnable</code>.<p>
372     *
373     * @return the next pending <code>Runnable</code>
374     * @throws InterruptedException if something goes wrong
375     */
376    protected Runnable getNextRunnable() throws InterruptedException {
377
378        Runnable toRun = null;
379
380        // Wait for new Runnable (see runInThread()) and notify runInThread()
381        // in case the next Runnable is already waiting.
382        synchronized (m_nextRunnableLock) {
383            if (m_nextRunnable == null) {
384                m_nextRunnableLock.wait(1000);
385            }
386
387            if (m_nextRunnable != null) {
388                toRun = m_nextRunnable;
389                m_nextRunnable = null;
390                m_nextRunnableLock.notifyAll();
391            }
392        }
393
394        return toRun;
395    }
396
397    /**
398     * Grows the thread pool by one new thread if the maximum pool size
399     * has not been reached.<p>
400     */
401    private void growThreadPool() {
402
403        if (m_currentThreadCount < m_maxThreadCount) {
404            // if maximum number is not reached grow the thread pool
405            synchronized (m_nextRunnableLock) {
406                m_workers[m_currentThreadCount] = new CmsSchedulerThread(
407                    this,
408                    m_threadGroup,
409                    m_threadNamePrefix + m_currentThreadCount,
410                    m_threadPriority,
411                    m_makeThreadsDaemons);
412                m_workers[m_currentThreadCount].start();
413                if (m_inheritLoader) {
414                    m_workers[m_currentThreadCount].setContextClassLoader(
415                        Thread.currentThread().getContextClassLoader());
416                }
417                // increas the current size
418                m_currentThreadCount++;
419                // notify the waiting threads
420                m_nextRunnableLock.notifyAll();
421            }
422        }
423    }
424
425    public void setInstanceId(String schedInstId) {
426
427        schedulerInstanceId = schedInstId;
428    }
429
430    public void setInstanceName(String schedName) {
431
432        schedulerInstanceName = schedName;
433    }
434}