DataSourceHandler.java
Upload User: liulanlin
Upload Date: 2017-12-08
Package Size: 1274k
Code Size: 13k
Category:

VOIP program

Development Platform:

Java

  1. /*
  2.  * DataSourceHandler.java
  3.  *
  4.  * Created on March 20, 2003, 10:56 AM
  5.  */
  6. package gov.nist.applet.phone.media.messaging;
  7. import java.util.Enumeration;
  8. import java.util.Vector;
  9. import java.io.ByteArrayOutputStream;
  10. import java.io.IOException;
  11. import javax.media.DataSink;
  12. import javax.media.MediaLocator;
  13. import javax.media.Control;
  14. import javax.media.datasink.DataSinkErrorEvent;
  15. import javax.media.datasink.DataSinkEvent;
  16. import javax.media.datasink.DataSinkListener;
  17. import javax.media.datasink.EndOfStreamEvent;
  18. import javax.media.protocol.SourceTransferHandler;
  19. import javax.media.protocol.PushDataSource;
  20. import javax.media.protocol.PushSourceStream;
  21. import javax.media.protocol.PullDataSource;
  22. import javax.media.protocol.DataSource;
  23. import javax.media.protocol.SourceStream;
  24. import javax.media.IncompatibleSourceException;
  25. /**
  26.  * This Data source Handler allow one to write directly from a DataSource 
  27.  * to an array of byte.
  28.  * This Data Source Handler allow one to fill a buffer with MPEG_AUDIO 
  29.  * or GSM audio data.
  30.  * Example : One can record his voice from a microphone to a buffer in either a
  31.  * MP3 or GSM format. To get the buffer with the recorded voice just call the
  32.  * getRecordBuffer method
  33.  * @author Jean Deruelle <jean.deruelle@nist.gov>
  34.  *
  35.  * <a href="{@docRoot}/uncopyright.html">This code is in the public domain.</a>
  36.  */
  37. public class DataSourceHandler    
  38.     implements DataSink, SourceTransferHandler, Runnable{
  39.     //DataSink listeners
  40. protected Vector listeners = new Vector(1);
  41.     final private static   boolean DEBUG = false;
  42.     //states
  43.     final protected static int NOT_INITIALIZED = 0;
  44.     final protected static int OPENED = 1;
  45.     final protected static int STARTED = 2;
  46.     final protected static int CLOSED = 3;
  47.     //current state
  48.     protected int state = NOT_INITIALIZED;
  49.     protected DataSource source;
  50.     protected SourceStream [] streams;
  51.     protected SourceStream stream;
  52.     protected boolean push;   
  53.     protected Control [] controls;
  54.     
  55.     //protected MediaLocator locator = null;
  56.     protected String contentType = null;
  57.     protected int bytesWritten = 0;
  58.     protected static final int BUFFER_LEN = 128 * 1024;
  59.     protected boolean syncEnabled = false;
  60. long lastSyncTime = -1;
  61. ByteArrayOutputStream baout=null;
  62.     protected byte [] buffer1 = new byte[BUFFER_LEN];
  63.     protected byte [] buffer2 = new byte[BUFFER_LEN];
  64.     protected boolean buffer1Pending = false;
  65.     protected long    buffer1PendingLocation = -1;
  66.     protected int     buffer1Length;
  67.     protected boolean buffer2Pending = false;
  68.     protected long    buffer2PendingLocation = -1;
  69.     protected int     buffer2Length;
  70.     protected long    nextLocation = 0;
  71.     protected Thread  writeThread = null;
  72.     private   Integer bufferLock = new Integer(0);
  73.     private   boolean receivedEOS = false;
  74.     
  75.     public  int WRITE_CHUNK_SIZE = 16384;
  76.     private boolean streamingEnabled = false;
  77.     
  78.     /**
  79.      * Constructs a new DataSourceHandler
  80.      */
  81.     public DataSourceHandler(){
  82.      baout=new ByteArrayOutputStream();
  83.     }
  84.     
  85.     /**
  86.      * Set the data source of this data source handler
  87.      * @exception IncompatibleSourceException - if the data source cannot be 
  88.      * handled, this exception is thrown
  89.      */
  90.     public void setSource(DataSource ds) throws IncompatibleSourceException {
  91. if (!(ds instanceof PushDataSource) &&
  92.     !(ds instanceof PullDataSource)) {
  93.     throw new IncompatibleSourceException("Incompatible datasource");
  94. }
  95. source = ds;
  96. if (source instanceof PushDataSource) {
  97.     push = true;
  98.     try {
  99. ((PushDataSource) source).connect();
  100.     } catch (IOException ioe) {
  101.     }
  102.     streams = ((PushDataSource) source).getStreams();
  103. } else {
  104.     push = false;
  105.     try {
  106. ((PullDataSource) source).connect();
  107.     } catch (IOException ioe) {
  108.     }
  109.     streams = ((PullDataSource) source).getStreams();
  110. }
  111. if (streams == null || streams.length != 1)
  112.     throw new IncompatibleSourceException("DataSource should have 1 stream");
  113. stream = streams[0];
  114. contentType = source.getContentType();
  115. if (push)
  116.     ((PushSourceStream)stream).setTransferHandler(this);
  117.     }
  118.     /**
  119.      * Set the output <code>MediaLocator</code>.
  120.      * This method should only be called once; an error is thrown if
  121.      * the locator has already been set.
  122.      * @param output <code>MediaLocator</code> that describes where 
  123.      *  the output goes.
  124.      */
  125.     public void setOutputLocator(MediaLocator output) {
  126. //locator = output;
  127.     }
  128.     /*public void setEnabled(boolean b) {
  129. streamingEnabled = b;
  130.     }
  131.     public void setSyncEnabled() {
  132. syncEnabled = true;
  133.     }*/
  134.     /**
  135.  * Our DataSink does not need to be opened.
  136.      */
  137. public void open() {
  138.     }
  139. /**
  140.  * @see javax.media.DataSink#getOutputLocator()
  141.  */
  142.     public MediaLocator getOutputLocator() {
  143.      return null;
  144. //return locator;
  145.     }
  146. /**
  147.  * @see javax.media.DataSink#start()
  148.  */
  149.     public void start() throws IOException {
  150. //System.out.println("DATASOURCEHANDLER: start : source: "+source);
  151.     if (source != null)
  152. source.start();
  153.     if (writeThread == null) {
  154. writeThread = new Thread(this);
  155. writeThread.setName("DataSourceHandler Thread");
  156. writeThread.start();
  157.     }
  158.     setState(STARTED);
  159.     }
  160.     /**
  161.      * Stop the data-transfer.
  162.      * If the source has not been connected and started,
  163.      * <CODE>stop</CODE> does nothing.
  164.      */
  165.     public void stop() throws IOException {
  166.      //System.out.println("DATASOURCEHANDLER: stop");
  167. if (state == STARTED) {
  168.     if (source != null)
  169. source.stop();
  170.     setState(OPENED);
  171. }
  172.     }
  173. /**
  174.  * Set the state of this data source handler
  175.  */
  176.     protected void setState(int state) {
  177. synchronized(this) {
  178.     this.state = state;
  179. }
  180.     }   
  181. /**
  182.  * @see javax.media.DataSink#close()
  183.  */
  184.     public final void close() {
  185. synchronized(this) {
  186.     if ( state == CLOSED )
  187. return;
  188.     setState(CLOSED);
  189. }
  190. if (push) {
  191.     for (int i = 0; i < streams.length; i++) {
  192. ((PushSourceStream)streams[i]).setTransferHandler(null);
  193.     }
  194. }
  195. try {
  196.     source.stop();
  197. } catch (IOException e) {
  198.     System.err.println("IOException when stopping source " + e);
  199. }
  200.     //  Disconnect the data source 
  201.     if (source != null)
  202. source.disconnect();    
  203. removeAllListeners();
  204. //writeBufferToFile("d://temp//test.mp3");  
  205.     }
  206. /**
  207.  * Get the current content type for this data source handler
  208.  *
  209.  * @return The current <CODE>ContentDescriptor</CODE> for this data source handler.
  210.  */
  211.     public String getContentType() {
  212. return contentType;
  213.     }
  214. /**
  215.  * Obtain the collection of objects that
  216.  * control the object that implements this interface.
  217.  * <p>
  218.  *
  219.  * No controls are supported.
  220.  * A zero length array is returned.
  221.  *
  222.  * @return A zero length array
  223.  */
  224.     public Object [] getControls() {
  225. if (controls == null) {
  226.     controls = new Control[0];
  227. }
  228. return controls;
  229.     }
  230. /**
  231.  * Obtain the object that implements the specified
  232.  * <code>Class</code> or <code>Interface</code>
  233.  * The full class or interface name must be used.
  234.  * <p>
  235.  *
  236.  * The control is not supported.
  237.  * <code>null</code> is returned.
  238.  *
  239.  * @return <code>null</code>.
  240.  */
  241.     public Object getControl(String controlName) {
  242. return null;
  243.     }
  244. /*
  245.  * @see javax.media.protocol.SourceTransferHandler#transferData(javax.media.protocol.PushSourceStream)
  246.  */
  247.     public synchronized void transferData(PushSourceStream pss) {
  248.      if(state==STARTED){
  249. int totalRead = 0;
  250. int spaceAvailable = BUFFER_LEN;
  251. int bytesRead = 0;
  252. if (buffer1Pending) {
  253.     synchronized (bufferLock) {
  254. while (buffer1Pending) {
  255.     if (DEBUG) System.err.println("Waiting for free buffer");
  256.     try {
  257. bufferLock.wait();
  258.     } catch (InterruptedException ie) {
  259.     }
  260. }
  261.     }
  262.     if (DEBUG) System.err.println("Got free buffer");
  263. }
  264. // System.err.println("In transferData()");
  265. while (spaceAvailable > 0) {
  266.     try {
  267. bytesRead = pss.read(buffer1, totalRead, spaceAvailable);
  268. //System.err.println("bytesRead = " + bytesRead);
  269. if (bytesRead > 16 * 1024 && WRITE_CHUNK_SIZE < 32 * 1024) {
  270.     if (  bytesRead > 64 * 1024 &&
  271.   WRITE_CHUNK_SIZE < 128 * 1024  )
  272. WRITE_CHUNK_SIZE = 128 * 1024;
  273.     else if (  bytesRead > 32 * 1024 &&
  274.        WRITE_CHUNK_SIZE < 64 * 1024  )
  275. WRITE_CHUNK_SIZE = 64 * 1024;
  276.     else if (  WRITE_CHUNK_SIZE < 32 * 1024  )
  277. WRITE_CHUNK_SIZE = 32 * 1024;
  278.     //System.err.println("Increasing buffer to " + WRITE_CHUNK_SIZE);
  279. }
  280.     } catch (IOException ioe) {
  281. // What to do here?
  282.     }
  283.     if (bytesRead <= 0) {
  284. break;
  285.     }
  286.     totalRead += bytesRead;
  287.     spaceAvailable -= bytesRead;
  288. }
  289. if (totalRead > 0) {
  290.     synchronized (bufferLock) {
  291. buffer1Pending = true;
  292. buffer1PendingLocation = nextLocation;
  293. buffer1Length = totalRead;
  294. nextLocation = -1; // assume next write is contiguous unless seeked
  295. // Notify availability to write thread
  296. if (DEBUG) 
  297. System.err.println("Notifying consumer");
  298. bufferLock.notifyAll();
  299.     }
  300. }
  301. // Send EOS if necessary
  302. if (bytesRead == -1) {
  303.     if (DEBUG) System.err.println("Got EOS");
  304.     receivedEOS = true;     
  305. }
  306.      }
  307.     }
  308.     /**
  309.      *  Asynchronous write thread
  310.      */
  311.     public void run() {    
  312. while (!(state == CLOSED)){
  313. if(state==STARTED){ 
  314.     synchronized (bufferLock) {          
  315. // Wait for some data or error
  316. while (!buffer1Pending && !buffer2Pending && 
  317.        state != CLOSED && !receivedEOS) {
  318.      if (DEBUG) 
  319.      System.err.println("Waiting for filled buffer");
  320.   try {
  321. bufferLock.wait(500);
  322.     } catch (InterruptedException ie) {
  323.     }
  324.     if (DEBUG) 
  325.      System.err.println("Consumer notified");
  326. }
  327.     }
  328.     // Something's pending
  329.     if (buffer2Pending) {
  330. if (DEBUG) 
  331. System.err.println("Writing Buffer2");
  332. // write that first
  333. write(buffer2, buffer2PendingLocation, buffer2Length);
  334. if (DEBUG) 
  335. System.err.println("Done writing Buffer2");
  336. buffer2Pending = false;
  337.     }
  338.     synchronized (bufferLock) {
  339. if (buffer1Pending) {
  340.     byte [] tempBuffer = buffer2;
  341.     buffer2 = buffer1;
  342.     buffer2Pending = true;
  343.     buffer2PendingLocation = buffer1PendingLocation;
  344.     buffer2Length = buffer1Length;
  345.     buffer1Pending = false;
  346.     buffer1 = tempBuffer;
  347.     if (DEBUG) System.err.println("Notifying producer");
  348.     bufferLock.notifyAll();
  349. } else {
  350.     if (receivedEOS)
  351. break;
  352. }
  353.     }
  354. }
  355. if (receivedEOS) {
  356.     if (DEBUG) 
  357.      System.err.println("Sending EOS: streamingEnabled is " + streamingEnabled);     
  358.     if (!streamingEnabled) {
  359. sendEndofStreamEvent();
  360.     }
  361. }
  362. }
  363.     }
  364. /**
  365.  * seek a specific location in the dataSource
  366.  * @return the location where the data source is now placed
  367.  */
  368.     public synchronized long seek(long where) {
  369. nextLocation = where;
  370. return where;
  371.     }
  372. /**
  373.  * Write the data in parameter in the data source handler buffer
  374.  * @param buffer - data to write into the data source handler buffer
  375.  * @param location - location to start 
  376.  * @param length - length to write
  377.  */
  378.     private void write(byte [] buffer, long location, int length) {
  379. int offset, toWrite;
  380.     offset = 0;
  381.     while (length > 0) {
  382. toWrite = WRITE_CHUNK_SIZE;
  383. if (length < toWrite)
  384.     toWrite = length;
  385. baout.write(buffer, offset, toWrite);
  386. bytesWritten += toWrite;
  387. length -= toWrite;
  388. offset += toWrite;
  389.     
  390. Thread.yield();
  391.     }
  392.     }
  393.     /**
  394.      * @see javax.media.DataSink#addDataSinkListener(javax.media.datasink.DataSinkListener)
  395.      */   
  396. public void addDataSinkListener(DataSinkListener dsl) {
  397. if (dsl != null)
  398. if (!listeners.contains(dsl))
  399. listeners.addElement(dsl);
  400. }
  401. /**
  402.  * @see javax.media.DataSink#removeDataSinkListener(javax.media.datasink.DataSinkListener)
  403.  */
  404. public void removeDataSinkListener(DataSinkListener dsl) {
  405. if (dsl != null)
  406. listeners.removeElement(dsl);
  407. }
  408. /**
  409.  * Send event to the listeners
  410.  * @param event - the vent to send
  411.  */
  412. protected void sendEvent(DataSinkEvent event) {
  413. if (!listeners.isEmpty()) {
  414. synchronized (listeners) {
  415. Enumeration list = listeners.elements();
  416. while (list.hasMoreElements()) {
  417. DataSinkListener listener = (DataSinkListener)list.nextElement();
  418. listener.dataSinkUpdate(event);
  419. }
  420. }
  421. }
  422. }
  423. /**
  424.  * remove All the listeners from this data source handler
  425.  */
  426. protected void removeAllListeners() {
  427. listeners.removeAllElements();
  428. }
  429. /**
  430.  * Send an end of stream event to the listeners
  431.  */    
  432. protected final void sendEndofStreamEvent() {
  433. sendEvent(new EndOfStreamEvent(this));
  434. }
  435. /**
  436.  * Send a data sink error event to the listeners
  437.  * @param reason - the reason of the error
  438.  */
  439. protected final void sendDataSinkErrorEvent(String reason) {
  440. sendEvent(new DataSinkErrorEvent(this, reason));
  441. }
  442. /**
  443.  * Get the recorded buffer of this data source handler
  444.  * @return byte array containing the data recorded
  445.  */
  446. public byte[] getRecordBuffer(){
  447. byte[] recordedBuffer= baout.toByteArray();
  448. baout.reset();
  449. return recordedBuffer;
  450. }
  451. }