001/*
002 * This library is part of OpenCms -
003 * the Open Source Content Management System
004 *
005 * Copyright (c) Alkacon Software GmbH & Co. KG (http://www.alkacon.com)
006 *
007 * This library is free software; you can redistribute it and/or
008 * modify it under the terms of the GNU Lesser General Public
009 * License as published by the Free Software Foundation; either
010 * version 2.1 of the License, or (at your option) any later version.
011 *
012 * This library is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * For further information about Alkacon Software GmbH & Co. KG, please see the
018 * company website: http://www.alkacon.com
019 *
020 * For further information about OpenCms, please see the
021 * project website: http://www.opencms.org
022 *
023 * You should have received a copy of the GNU Lesser General Public
024 * License along with this library; if not, write to the Free Software
025 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
026 */
027
028package org.opencms.search;
029
030import org.opencms.db.CmsPublishedResource;
031import org.opencms.file.CmsResource;
032import org.opencms.i18n.CmsMessageContainer;
033import org.opencms.main.CmsLog;
034import org.opencms.report.CmsLogReport;
035import org.opencms.report.I_CmsReport;
036
037import java.io.IOException;
038
039import org.apache.commons.logging.Log;
040
041/**
042 * Implements the management of indexing threads.<p>
043 *
044 * @since 6.0.0
045 */
046public class CmsIndexingThreadManager {
047
048    /** The log object for this class. */
049    private static final Log LOG = CmsLog.getLog(CmsIndexingThreadManager.class);
050
051    /** Number of threads abandoned. */
052    private int m_abandonedCounter;
053
054    /** The time the last error was written to the log. */
055    private long m_lastLogErrorTime;
056
057    /** The time the last warning was written to the log. */
058    private long m_lastLogWarnTime;
059
060    /** The maximum number of modifications before a commit in the search index is triggered. */
061    private int m_maxModificationsBeforeCommit;
062
063    /** Number of thread returned. */
064    private int m_returnedCounter;
065
066    /** Overall number of threads started. */
067    private int m_startedCounter;
068
069    /** Timeout for abandoning threads. */
070    private long m_timeout;
071
072    /**
073     * Creates and starts a thread manager for indexing threads.<p>
074     *
075     * @param timeout timeout after a thread is abandoned
076     * @param maxModificationsBeforeCommit the maximum number of modifications before a commit in the search index is triggered
077     */
078    public CmsIndexingThreadManager(long timeout, int maxModificationsBeforeCommit) {
079
080        m_timeout = timeout;
081        m_maxModificationsBeforeCommit = maxModificationsBeforeCommit;
082    }
083
084    /**
085     * Creates and starts a new indexing thread for a resource.<p>
086     *
087     * After an indexing thread was started, the manager suspends itself
088     * and waits for an amount of time specified by the <code>timeout</code>
089     * value. If the timeout value is reached, the indexing thread is
090     * aborted by an interrupt signal.<p>
091     *
092     * @param indexer the VFS indexer to create the index thread for
093     * @param writer the index writer that can update the index
094     * @param res the resource
095     */
096    public void createIndexingThread(CmsVfsIndexer indexer, I_CmsIndexWriter writer, CmsResource res) {
097
098        I_CmsReport report = indexer.getReport();
099        m_startedCounter++;
100        CmsIndexingThread thread = new CmsIndexingThread(
101            indexer.getCms(),
102            res,
103            indexer.getIndex(),
104            m_startedCounter,
105            report);
106        thread.setPriority(Thread.MIN_PRIORITY);
107        thread.start();
108        try {
109            thread.join(m_timeout);
110        } catch (InterruptedException e) {
111            // ignore
112        }
113        if (thread.isAlive()) {
114            // the thread has not finished - so it must be marked as an abandoned thread
115            m_abandonedCounter++;
116            thread.interrupt();
117            if (LOG.isWarnEnabled()) {
118                LOG.warn(Messages.get().getBundle().key(Messages.LOG_INDEXING_TIMEOUT_1, res.getRootPath()));
119            }
120            if (report != null) {
121                report.println();
122                report.print(
123                    org.opencms.report.Messages.get().container(org.opencms.report.Messages.RPT_FAILED_0),
124                    I_CmsReport.FORMAT_WARNING);
125                report.println(
126                    Messages.get().container(Messages.RPT_SEARCH_INDEXING_TIMEOUT_1, res.getRootPath()),
127                    I_CmsReport.FORMAT_WARNING);
128            }
129        } else {
130            // the thread finished normally
131            m_returnedCounter++;
132        }
133        I_CmsSearchDocument doc = thread.getResult();
134        if (doc != null) {
135            // write the document to the index
136            indexer.updateResource(writer, res.getRootPath(), doc);
137        } else {
138            indexer.deleteResource(writer, new CmsPublishedResource(res));
139        }
140        if ((m_startedCounter % m_maxModificationsBeforeCommit) == 0) {
141            try {
142                writer.commit();
143            } catch (IOException e) {
144                if (LOG.isWarnEnabled()) {
145                    LOG.warn(
146                        Messages.get().getBundle().key(
147                            Messages.LOG_IO_INDEX_WRITER_COMMIT_2,
148                            indexer.getIndex().getName(),
149                            indexer.getIndex().getPath()),
150                        e);
151                }
152            }
153        }
154    }
155
156    /**
157     * Returns if the indexing manager still have indexing threads.<p>
158     *
159     * @return true if the indexing manager still have indexing threads
160     */
161    public boolean isRunning() {
162
163        if (m_lastLogErrorTime <= 0) {
164            m_lastLogErrorTime = System.currentTimeMillis();
165            m_lastLogWarnTime = m_lastLogErrorTime;
166        } else {
167            long currentTime = System.currentTimeMillis();
168            if ((currentTime - m_lastLogWarnTime) > 30000) {
169                // write warning to log after 30 seconds
170                if (LOG.isWarnEnabled()) {
171                    LOG.warn(
172                        Messages.get().getBundle().key(
173                            Messages.LOG_WAITING_ABANDONED_THREADS_2,
174                            new Integer(m_abandonedCounter),
175                            new Integer((m_startedCounter - m_returnedCounter))));
176                }
177                m_lastLogWarnTime = currentTime;
178            }
179            if ((currentTime - m_lastLogErrorTime) > 600000) {
180                // write error to log after 10 minutes
181                LOG.error(
182                    Messages.get().getBundle().key(
183                        Messages.LOG_WAITING_ABANDONED_THREADS_2,
184                        new Integer(m_abandonedCounter),
185                        new Integer((m_startedCounter - m_returnedCounter))));
186                m_lastLogErrorTime = currentTime;
187            }
188        }
189
190        boolean result = (m_returnedCounter + m_abandonedCounter) < m_startedCounter;
191        if (result && LOG.isInfoEnabled()) {
192            // write a note to the log that all threads have finished
193            LOG.info(Messages.get().getBundle().key(Messages.LOG_THREADS_FINISHED_0));
194        }
195        return result;
196    }
197
198    /**
199     * Writes statistical information to the report.<p>
200     *
201     * The method reports the total number of threads started
202     * (equals to the number of indexed files), the number of returned
203     * threads (equals to the number of successfully indexed files),
204     * and the number of abandoned threads (hanging threads reaching the timeout).
205     *
206     * @param report the report to write the statistics to
207     */
208    public void reportStatistics(I_CmsReport report) {
209
210        if (report != null) {
211            CmsMessageContainer message = Messages.get().container(
212                Messages.RPT_SEARCH_INDEXING_STATS_4,
213                new Object[] {
214                    new Integer(m_startedCounter),
215                    new Integer(m_returnedCounter),
216                    new Integer(m_abandonedCounter),
217                    report.formatRuntime()});
218
219            report.println(message);
220            if (!(report instanceof CmsLogReport) && LOG.isInfoEnabled()) {
221                // only write to the log if report is not already a log report
222                LOG.info(message.key());
223            }
224        }
225    }
226}