001package com.github.theholywaffle.teamspeak3; 002 003/* 004 * #%L 005 * TeamSpeak 3 Java API 006 * %% 007 * Copyright (C) 2014 Bert De Geyter 008 * %% 009 * Permission is hereby granted, free of charge, to any person obtaining a copy 010 * of this software and associated documentation files (the "Software"), to deal 011 * in the Software without restriction, including without limitation the rights 012 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 013 * copies of the Software, and to permit persons to whom the Software is 014 * furnished to do so, subject to the following conditions: 015 * 016 * The above copyright notice and this permission notice shall be included in 017 * all copies or substantial portions of the Software. 018 * 019 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 020 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 021 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 022 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 023 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 024 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 025 * THE SOFTWARE. 026 * #L% 027 */ 028 029import com.github.theholywaffle.teamspeak3.api.Callback; 030import com.github.theholywaffle.teamspeak3.commands.Command; 031 032import java.io.IOException; 033import java.util.Collections; 034import java.util.Iterator; 035import java.util.LinkedHashMap; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.Executors; 040import java.util.logging.Level; 041 042public class SocketReader extends Thread { 043 044 private final TS3Query ts3; 045 private final ExecutorService userThreadPool; 046 private final Map<Command, Callback> callbackMap; 047 048 private String lastEvent; 049 050 public SocketReader(TS3Query ts3) { 051 super("[TeamSpeak-3-Java-API] SocketReader"); 052 this.ts3 = ts3; 053 this.userThreadPool = Executors.newCachedThreadPool(); 054 this.callbackMap = Collections.synchronizedMap(new LinkedHashMap<Command, Callback>()); 055 this.lastEvent = ""; 056 057 try { 058 int i = 0; 059 while (i < 4 || ts3.getIn().ready()) { 060 TS3Query.log.info("< " + ts3.getIn().readLine()); 061 i++; 062 } 063 } catch (final IOException e) { 064 e.printStackTrace(); 065 } 066 } 067 068 @Override 069 public void run() { 070 while (ts3.getSocket() != null && ts3.getSocket().isConnected() 071 && ts3.getIn() != null && !isInterrupted()) { 072 final String line; 073 074 try { 075 // Will block until a full line of text could be read. 076 line = ts3.getIn().readLine(); 077 } catch (IOException io) { 078 if (!isInterrupted()) { 079 io.printStackTrace(); 080 } 081 break; 082 } 083 084 if (line == null) { 085 break; // The underlying socket was closed 086 } else if (line.isEmpty()) { 087 continue; // The server is sending garbage 088 } 089 090 final Command c = ts3.getCommandList().peek(); 091 092 if (line.startsWith("notify")) { 093 TS3Query.log.info("< [event] " + line); 094 095 // Filter out duplicate events for join, quit and channel move events 096 if (isDuplicate(line)) continue; 097 098 userThreadPool.execute(new Runnable() { 099 @Override 100 public void run() { 101 final String arr[] = line.split(" ", 2); 102 ts3.getEventManager().fireEvent(arr[0], arr[1]); 103 } 104 }); 105 } else if (c != null && c.isSent()) { 106 TS3Query.log.info("[" + c.getName() + "] < " + line); 107 if (line.startsWith("error")) { 108 c.feedError(line.substring("error ".length())); 109 if (c.getError().getId() != 0) { 110 TS3Query.log.severe("TS3 command error: " + c.getError()); 111 } 112 c.setAnswered(); 113 ts3.getCommandList().remove(c); 114 answerCallback(c); 115 } else { 116 c.feed(line); 117 } 118 } else { 119 TS3Query.log.warning("[UNHANDLED] < " + line); 120 } 121 } 122 123 userThreadPool.shutdown(); 124 if (!isInterrupted()) { 125 TS3Query.log.warning("SocketReader has stopped!"); 126 } 127 } 128 129 private void answerCallback(Command c) { 130 final Callback callback = callbackMap.get(c); 131 132 // Command had no callback registered 133 if (callback == null) return; 134 135 // To avoid the possibility of clogging the map with callbacks for 136 // inexistent commands, remove all entries before the current one 137 // Typically, this will exit without removing a single entry 138 Set<Command> keySet = callbackMap.keySet(); 139 synchronized (callbackMap) { 140 Iterator<Command> iterator = keySet.iterator(); 141 while (iterator.hasNext() && !c.equals(iterator.next())) { 142 iterator.remove(); 143 } 144 } 145 146 userThreadPool.execute(new Runnable() { 147 @Override 148 public void run() { 149 try { 150 callback.handle(); 151 } catch (Throwable t) { 152 TS3Query.log.log(Level.WARNING, "User callback threw exception", t); 153 } 154 } 155 }); 156 } 157 158 void registerCallback(Command command, Callback callback) { 159 callbackMap.put(command, callback); 160 } 161 162 private boolean isDuplicate(String eventMessage) { 163 if (!(eventMessage.startsWith("notifyclientmoved") 164 || eventMessage.startsWith("notifycliententerview") 165 || eventMessage.startsWith("notifyclientleftview"))) { 166 167 // Event that will never cause duplicates 168 return false; 169 } 170 171 if (eventMessage.equals(lastEvent)) { 172 // Duplicate event! 173 lastEvent = ""; // Let's only ever filter one duplicate 174 return true; 175 } 176 177 lastEvent = eventMessage; 178 return false; 179 } 180}