001/* 002 * This library is part of OpenCms - 003 * the Open Source Content Management System 004 * 005 * Copyright (c) Alkacon Software GmbH & Co. KG (http://www.alkacon.com) 006 * 007 * This library is free software; you can redistribute it and/or 008 * modify it under the terms of the GNU Lesser General Public 009 * License as published by the Free Software Foundation; either 010 * version 2.1 of the License, or (at your option) any later version. 011 * 012 * This library is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * For further information about Alkacon Software GmbH & Co. KG, please see the 018 * company website: http://www.alkacon.com 019 * 020 * For further information about OpenCms, please see the 021 * project website: http://www.opencms.org 022 * 023 * You should have received a copy of the GNU Lesser General Public 024 * License along with this library; if not, write to the Free Software 025 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 026 */ 027 028package org.opencms.search; 029 030import org.opencms.db.CmsPublishedResource; 031import org.opencms.file.CmsResource; 032import org.opencms.i18n.CmsMessageContainer; 033import org.opencms.main.CmsLog; 034import org.opencms.report.CmsLogReport; 035import org.opencms.report.I_CmsReport; 036 037import java.io.IOException; 038 039import org.apache.commons.logging.Log; 040 041/** 042 * Implements the management of indexing threads.<p> 043 * 044 * @since 6.0.0 045 */ 046public class CmsIndexingThreadManager { 047 048 /** The log object for this class. */ 049 private static final Log LOG = CmsLog.getLog(CmsIndexingThreadManager.class); 050 051 /** Number of threads abandoned. */ 052 private int m_abandonedCounter; 053 054 /** The time the last error was written to the log. */ 055 private long m_lastLogErrorTime; 056 057 /** The time the last warning was written to the log. */ 058 private long m_lastLogWarnTime; 059 060 /** The maximum number of modifications before a commit in the search index is triggered. */ 061 private int m_maxModificationsBeforeCommit; 062 063 /** Number of thread returned. */ 064 private int m_returnedCounter; 065 066 /** Overall number of threads started. */ 067 private int m_startedCounter; 068 069 /** Timeout for abandoning threads. */ 070 private long m_timeout; 071 072 /** 073 * Creates and starts a thread manager for indexing threads.<p> 074 * 075 * @param timeout timeout after a thread is abandoned 076 * @param maxModificationsBeforeCommit the maximum number of modifications before a commit in the search index is triggered 077 */ 078 public CmsIndexingThreadManager(long timeout, int maxModificationsBeforeCommit) { 079 080 m_timeout = timeout; 081 m_maxModificationsBeforeCommit = maxModificationsBeforeCommit; 082 } 083 084 /** 085 * Creates and starts a new indexing thread for a resource.<p> 086 * 087 * After an indexing thread was started, the manager suspends itself 088 * and waits for an amount of time specified by the <code>timeout</code> 089 * value. If the timeout value is reached, the indexing thread is 090 * aborted by an interrupt signal.<p> 091 * 092 * @param indexer the VFS indexer to create the index thread for 093 * @param writer the index writer that can update the index 094 * @param res the resource 095 */ 096 public void createIndexingThread(CmsVfsIndexer indexer, I_CmsIndexWriter writer, CmsResource res) { 097 098 I_CmsReport report = indexer.getReport(); 099 m_startedCounter++; 100 CmsIndexingThread thread = new CmsIndexingThread( 101 indexer.getCms(), 102 res, 103 indexer.getIndex(), 104 m_startedCounter, 105 report); 106 thread.setPriority(Thread.MIN_PRIORITY); 107 thread.start(); 108 try { 109 thread.join(m_timeout); 110 } catch (InterruptedException e) { 111 // ignore 112 } 113 if (thread.isAlive()) { 114 // the thread has not finished - so it must be marked as an abandoned thread 115 m_abandonedCounter++; 116 thread.interrupt(); 117 if (LOG.isWarnEnabled()) { 118 LOG.warn(Messages.get().getBundle().key(Messages.LOG_INDEXING_TIMEOUT_1, res.getRootPath())); 119 } 120 if (report != null) { 121 report.println(); 122 report.print( 123 org.opencms.report.Messages.get().container(org.opencms.report.Messages.RPT_FAILED_0), 124 I_CmsReport.FORMAT_WARNING); 125 report.println( 126 Messages.get().container(Messages.RPT_SEARCH_INDEXING_TIMEOUT_1, res.getRootPath()), 127 I_CmsReport.FORMAT_WARNING); 128 } 129 } else { 130 // the thread finished normally 131 m_returnedCounter++; 132 } 133 I_CmsSearchDocument doc = thread.getResult(); 134 if (doc != null) { 135 // write the document to the index 136 indexer.updateResource(writer, res.getRootPath(), doc); 137 } else { 138 indexer.deleteResource(writer, new CmsPublishedResource(res)); 139 } 140 if ((m_startedCounter % m_maxModificationsBeforeCommit) == 0) { 141 try { 142 writer.commit(); 143 } catch (IOException e) { 144 if (LOG.isWarnEnabled()) { 145 LOG.warn( 146 Messages.get().getBundle().key( 147 Messages.LOG_IO_INDEX_WRITER_COMMIT_2, 148 indexer.getIndex().getName(), 149 indexer.getIndex().getPath()), 150 e); 151 } 152 } 153 } 154 } 155 156 /** 157 * Returns if the indexing manager still have indexing threads.<p> 158 * 159 * @return true if the indexing manager still have indexing threads 160 */ 161 public boolean isRunning() { 162 163 if (m_lastLogErrorTime <= 0) { 164 m_lastLogErrorTime = System.currentTimeMillis(); 165 m_lastLogWarnTime = m_lastLogErrorTime; 166 } else { 167 long currentTime = System.currentTimeMillis(); 168 if ((currentTime - m_lastLogWarnTime) > 30000) { 169 // write warning to log after 30 seconds 170 if (LOG.isWarnEnabled()) { 171 LOG.warn( 172 Messages.get().getBundle().key( 173 Messages.LOG_WAITING_ABANDONED_THREADS_2, 174 Integer.valueOf(m_abandonedCounter), 175 Integer.valueOf((m_startedCounter - m_returnedCounter)))); 176 } 177 m_lastLogWarnTime = currentTime; 178 } 179 if ((currentTime - m_lastLogErrorTime) > 600000) { 180 // write error to log after 10 minutes 181 LOG.error( 182 Messages.get().getBundle().key( 183 Messages.LOG_WAITING_ABANDONED_THREADS_2, 184 Integer.valueOf(m_abandonedCounter), 185 Integer.valueOf((m_startedCounter - m_returnedCounter)))); 186 m_lastLogErrorTime = currentTime; 187 } 188 } 189 190 boolean result = (m_returnedCounter + m_abandonedCounter) < m_startedCounter; 191 if (result && LOG.isInfoEnabled()) { 192 // write a note to the log that all threads have finished 193 LOG.info(Messages.get().getBundle().key(Messages.LOG_THREADS_FINISHED_0)); 194 } 195 return result; 196 } 197 198 /** 199 * Writes statistical information to the report.<p> 200 * 201 * The method reports the total number of threads started 202 * (equals to the number of indexed files), the number of returned 203 * threads (equals to the number of successfully indexed files), 204 * and the number of abandoned threads (hanging threads reaching the timeout). 205 * 206 * @param report the report to write the statistics to 207 */ 208 public void reportStatistics(I_CmsReport report) { 209 210 if (report != null) { 211 CmsMessageContainer message = Messages.get().container( 212 Messages.RPT_SEARCH_INDEXING_STATS_4, 213 new Object[] { 214 Integer.valueOf(m_startedCounter), 215 Integer.valueOf(m_returnedCounter), 216 Integer.valueOf(m_abandonedCounter), 217 report.formatRuntime()}); 218 219 report.println(message); 220 if (!(report instanceof CmsLogReport) && LOG.isInfoEnabled()) { 221 // only write to the log if report is not already a log report 222 LOG.info(message.key()); 223 } 224 } 225 } 226}