import com.sun.mail.imap._ import com.sun.mail.util.MailSSLSocketFactory import java.io._ import java.sql.{Array => SqlArray, Date => SqlDate, _} import java.util._ import java.util.concurrent._ import javax.mail._ import javax.mail.event._ import javax.mail.internet._ import org.apache.commons.io.IOUtils import org.slf4j.Logger import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ /* object MyConnectionListener extends ConnectionListener { def closed(e: ConnectionEvent) { println("MyConnectionListener: closed"); } def disconnected(e: ConnectionEvent) { println("MyConnectionListener: disconnected"); } def opened(e: ConnectionEvent) { println("MyConnectionListener: opened"); } } class MyMessageCountListener(f: IMAPFolder, dao: Dao) extends MessageCountListener { def messagesAdded(e: MessageCountEvent) { println("messagesAdded: added count=" + e.getMessages.length) e.getMessages.foreach { message => val uid = f.getUID(message) println("uid=" + uid) deque.push(uid) } } def messagesRemoved(e: MessageCountEvent) { println("messagesRemoved: removed count=" + e.getMessages.length + ", isRemoved=" + e.isRemoved) e.getMessages.foreach { message => println("message: " + message.getFrom) } } } */ /* object MyMessageChangedListener extends MessageChangedListener { def messageChanged(e: MessageChangedEvent) { val t = if(e.getMessageChangeType() == MessageChangedEvent.FLAGS_CHANGED) "FLAGS_CHANGED" else "ENVELOPE_CHANGED" println("messageChanged: " + t) println("message: " + e.getMessage()) } } */ /* object MyFolderListener extends FolderListener { def folderCreated(e: FolderEvent) { println("folder created") } def folderDeleted(e: FolderEvent) { println("folder deleted") } def folderRenamed(e: FolderEvent) { println("folder renamed") } } object MyStoreListener extends StoreListener { def notification(e: StoreEvent) { val t = if(e.getMessageType() == StoreEvent.NOTICE) "NOTICE" else "ALERT" println("store event: " + t) println("message: " + e.getMessage()) } } */ class Dao(c: Connection) extends Closeable { def close() { c.close() } val s = c.prepareStatement("BEGIN") def begin() { s.executeUpdate() } def commit() { c.commit() } def rollback() { c.rollback() } val countMail = c.prepareStatement("SELECT count(uid) FROM mail WHERE uid=?") def countMail(uid: Long): Long = { countMail.setLong(1, uid) val rs = countMail.executeQuery() rs.next() val count = rs.getLong(1) rs.close() count } val insertMail = c.prepareStatement( "INSERT INTO mail(uid, header_keys, header_values) VALUES(?, ?, ?)") def insertMail(uid: Long, keys: Array[String], values: Array[String]) { insertMail.setLong(1, uid) insertMail.setArray(2, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) insertMail.setArray(3, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) insertMail.executeUpdate() } val insertMailPart = c.prepareStatement( "INSERT INTO mail_part(mail_uid, index, header_keys, header_values, data) VALUES(?, ?, ?, ?, ?)") def insertMailPart(uid: Long, index: Int, keys: Array[String], values: Array[String], is: InputStream, size: Int) { insertMailPart.setLong(1, uid) insertMailPart.setInt(2, index) insertMailPart.setArray(3, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) insertMailPart.setArray(4, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) insertMailPart.setBinaryStream(5, is, size) insertMailPart.executeUpdate() } } class DbThread(factory: () => Service) extends Thread with Closeable { private val queue = new LinkedBlockingQueue[(IMAPFolder, Message)]() private var shouldRun = true def add(folder: IMAPFolder, message: Message) { println("Adding message: " + message) queue.add((folder, message)) /* queue.synchronized { queue.add((folder, message)) queue.notify() } */ } def close() { /* queue.synchronized { shouldRun = false queue.notify() this.interrupt() } */ this.interrupt() } def processQueue(service: Service) { var item = queue.poll() while(item != null) { var (folder, message) = item service.processMessage(folder, message) item = queue.poll() } } override def run() { var service = factory() while(shouldRun) { if(service == null) { service = factory() } try { /* queue.synchronized { queue.wait } */ Option(queue.poll(1, TimeUnit.DAYS)) match { case None => case Some((folder, message)) => println("Processing message: " + message) service.processMessage(folder, message) } } catch { case e => IOUtils.closeQuietly(service) service = null if(shouldRun) e.printStackTrace(); } } println("db thread stopping") } } class Service(dao: Dao) extends Closeable { def close() { IOUtils.closeQuietly(dao) } def headers(part: Part): (Array[String], Array[String]) = { var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList var (names, values) = headers.unzip({ header => println(header.getName + ": " + header.getValue) (header.getName, header.getValue) }) return (names.toArray, values.toArray) } def processMessage(folder: IMAPFolder, message: Message) { val uid = folder.getUID(message) var ok = tx { dao => println(new Date) if(dao.countMail(uid) > 0) { println("Message already imported: uid=" + uid) } else { processMail(uid, message) } } if(ok.isDefined && !message.getFlags().contains(Flags.Flag.DELETED)) { message.setFlag(Flags.Flag.DELETED, true) } } def processMail(uid: Long, message: Message) { println("Importing message, uid=" + uid) var (names, values) = headers(message) dao.insertMail(uid, names, values) /* message.getFrom.foreach { address => if(address.isInstanceOf[InternetAddress]) { val a = address.asInstanceOf[InternetAddress] println("From: " + a.getPersonal + " <" + a.getAddress + ">") } else println("From: " + address) } message.getAllRecipients.foreach { address => println("To: " + address) } println("Subject: " + message.getSubject) */ if(!message.isInstanceOf[MimeMessage]) error("Unprocessable: not a mime message") else { val m = message.asInstanceOf[MimeMessage] /* println("Mime mail") println("Content-Type: " + m.getContentType) */ val content = m.getContent // println("Is multipart? " + m.getContent.isInstanceOf[Multipart]) if(m.getContent.isInstanceOf[Multipart]) { val multipart = m.getContent.asInstanceOf[Multipart] 0.until(multipart.getCount).foreach { i => val body = multipart.getBodyPart(i) var (names, values) = headers(body) /* println("Part #" + i) println("Content-Type: " + body.getContentType) println("Description: " + body.getDescription) println("Content-Disposition: " + body.getDisposition) println("FileName: " + body.getFileName) println("Downloading part #" + i) */ val start = System.currentTimeMillis var bytes = IOUtils.toByteArray(m.getInputStream) val end = System.currentTimeMillis println("Downloaded " + bytes.length + " bytes in " + (end - start) + "ms") dao.insertMailPart(uid, i, names, values, new ByteArrayInputStream(bytes), bytes.length) } } else { // TODO: what to do? } } } def tx[T](f: Dao => T): Option[T] = { dao.begin() try { val t = f(dao) dao.commit() Some(t) } catch { case e => e.printStackTrace dao.rollback() None } } } class ConnectionHandler(f: IMAPFolder, thread: DbThread) { def process() { println("Folder: " + f.getFullName()) f.open(Folder.READ_WRITE) println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.") // f.addMessageChangedListener(MyMessageChangedListener) f.addMessageCountListener(new MessageCountListener { def messagesAdded(e: MessageCountEvent) { e.getMessages.foreach { message => thread.add(f, message) } } def messagesRemoved(e: MessageCountEvent) {} }) f.getMessages().foreach(m => thread.add(f, m)) do { println("Going idle...") f.idle(true); } while(true) } } class ImapToDb(f: File) { def work() { val p = new Properties() var r: InputStream = null try { r = new FileInputStream(f) p.load(r) } catch { case _ => System.err.println("Could not read configuraion file: " + f) return } finally { IOUtils.closeQuietly(r) } def remove(key: String) = { val value = p.getProperty(key) println(key + ":" + value) if(value != null) { p.remove(key) Some(value) } else None } val params = for { dbUrl: String <- remove("db.url") dbUsername: String <- remove("db.username") dbPassword: String <- remove("db.password") imapHost: String <- remove("imap.host") imapUsername: String <- remove("imap.username") imapPassword: String <- remove("imap.password") } yield (dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword) params match { case None => System.err.println("Required properties:") System.err.println(" db.url, db.username db.password") System.err.println(" imap.host, imap.username imap.password") case Some((dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword)) => for(klass: String <- remove("db.driver") if klass.length > 0) Class.forName(klass) val dbProps = new Properties() println("Db properties: ") p.asScala.foreach { case (key, value) if key.startsWith("db.") => dbProps.put(key.substring(3), value) println(key + ": " + value) } val imapProps = new Properties() println("Imap properties: ") p.asScala.foreach { case (key, value) if key.startsWith("imap.") => imapProps.put(key.substring(5), value) println(key + ": " + value) } val sf = new MailSSLSocketFactory() sf.setTrustAllHosts(true) imapProps.put("mail.imaps.ssl.socketFactory", sf) val session = Session.getDefaultInstance(imapProps, null) def connectToFolder() = { val store = session.getStore("imaps") store.connect(imapHost, imapUsername, imapPassword) // store.addConnectionListener(MyConnectionListener) // store.addStoreListener(MyStoreListener) // store.addFolderListener(MyFolderListener) val folder = store.getFolder("INBOX") // folder.addConnectionListener(MyConnectionListener) // folder.addFolderListener(MyFolderListener) // folder.addMessageCountListener(MyMessageCountListener) // folder.addMessageChangedListener(MyMessageChangedListener) if(!folder.isInstanceOf[IMAPFolder]) { error("folder is not an IMAPFolder") } folder.asInstanceOf[IMAPFolder] } def connectToDb() = { println("Connecting to db...") val c = DriverManager.getConnection(dbUrl, dbProps) c.setAutoCommit(false) new Service(new Dao(c)) } val thread = new DbThread(connectToDb) thread.setDaemon(true) thread.start() while(true) { var folder: IMAPFolder = null try { folder = connectToFolder() new ConnectionHandler(folder, thread).process() } catch { case e => System.out.println("Uncaught exception: " + e.getMessage) } finally { IOUtils.closeQuietly(new Closeable { def close { folder.getStore.close() } } ) Thread.sleep(1000) } } } } } object ImapToDb extends App { new ImapToDb(new File("imap-to-db.properties")).work() }