Saturday, October 26, 2013

Java Synchronization and Concurrency Across Multiple JVMs, Multiple Computers, Part 2

I’m going to add a couple asides to this blog entry.  One of them is at the end, and details an additional consideration for the synchronization discussed in “Part 1” – reading the shared file, in addition to writing to it.  But before we get started with “Java Synchronization Across JVMs, Part 2”, I want to describe Flag Passing in general.  You will still use this general Flag Passing when sharing files on remote systems (not by drive mapping, a network path or a file:// URL), for instance by SFTP.  This is not even near “real-time” file sharing, and should be considered “scheduled” or “periodic” file sharing.

Our sample scenario is that one system (SOURCE) creates files and a second system (TARGET) processes those files, then deletes them.  When SOURCE creates a file, we will have it also create or update a file named “mark.mark”.  When TARGET connects (periodically) to SOURCE, the first thing TARGET will do is check for a file named “mark.mark” and get the date of the file.  Then TARGET will read and delete all files with modification dates before the date of “mark.mark”.  The date of the file “mark.mark” is the flag we are passing.  This is just one example of Flag Passing, but it helps us get our bearing for the following discussion.

When I was in high school in the 1970’s, my dad was stationed overseas in a relatively remote location with the US Air Force.  He called home every week, and it went like this.  He would say “Hello, I love you.  Over.”  And somewhere, somebody switched something so that we could reply.  At the end of our greetings, we said “Over” and dad could start talking again.  If we failed to say “Over”, then we would sit there with silence until we remembered.  It was like a conversation on a Citizens Band radio with a couple major differences: 1) there was a guy in the middle making a change to allow conversation to flow in the opposite direction, and 2) it was a conversation between only two parties (unless the NSA was listening in.)  Let me call this kind of coordination “flag passing”, since I don’t know a more technical term (perhaps semaphores or signaling).

Those conversations with my dad are the analogue, except for the middle-man, of another way to do Java (or any program) synchronization across operating environments.  (Actually, the analogy has several applications, but it also breaks down pretty quickly.  However, I like the recollection, so I’ll leave it in here.)  In this blog discussion, the presence of a file IS the flag that indicates something is available to be processed.

In this discussion, we are going to examine the case where multiple files (of the same name) are created, and a separate process (perhaps in a separate JVM) deals with the file and deletes it, when it appears.  First we will consider the case where the file is created by only one “Talker” and in all this blog we are limiting this discussion to the case where there is only one “Listener” dealing with the file.  The next case, which is really the base solution, is when there are multiple “Talkers”, actually one or more.  This first case does not use file locking, and it is inherently faulty – we only present it here in order to examine the problem.

The OneTalker class writes some output to a file named “message.txt”.  Before the file is written, OneTalker checks to see if the file already exists.  If it exists, then the Listener has not yet read, processed and deleted the file, so we sit and wait (sleep) until the file does not exist.  Then we simply write and close the file.  There is nothing inherently “exclusive” about this process, and therein lies the problem (discussed below).  Note that there is no added value in having OneTalker sleep for a second while waiting for the existing file to disappear.  We could write that loop like this: while(outFile.exists()){}

public class OneTalker {
public static void main(String[] args) {
// Set true to sleep forever (practically) with file open
boolean testNoClose = false;
BufferedOutputStream out = null;
try {
// This loop is for demo purposes - 5 iterations
for( int i = 0; i < 5; i++ ) {
File outFile = new File( "C:\\dac\\message.txt" );
// Listener has not picked up previous file
while( outFile.exists() ) {
Thread.sleep( 1000 );
}
System.out.println( "New File" );
out = new BufferedOutputStream( 
                                     new FileOutputStream( outFile ) );
// Handle data as byte array - most flexible
String newString = "message: " + i;
byte[] outBytes = newString.getBytes();
out.write( outBytes, 0, outBytes.length );
out.flush();
if( testNoClose ) Thread.sleep( 1000000000 );
out.close();
// Space out the messages - 10 seconds
Thread.sleep( 10000 );
}
} catch( Exception x ) {
x.printStackTrace();
} finally {
try {
if( out != null ) out.close();
} catch( Exception y ) {}
}
System.exit(0);
}
}

In the examples in this blog discussion, I am writing and reading byte arrays.  Unless you are always writing a line of text with a line-end (for example; carriage return / line feed) , as in Part 1 of this blog topic -- the centralized log file, then writing byte arrays is preferred.  [One exception is when you are writing and reading serialized objects.]

For testing purposes, we have a “for” loop that runs through five iterations of the file creation process.  Then to get a little reality “feel”, we sleep for ten seconds between iterations.  Note the Boolean, testNoClose – we will discuss how to use that below.

The Listener code, OneListener checks to see if the file exists, and if so, processes the file and then deletes it.  Again, there is nothing exclusive in this process, and if the file is currently being written, two problems will occur: 1) the current partial file will be read, and 2) the file will not be deleted.  Normally, with this test code, the time required to write or read the file is so short that it would be difficult to experience the problem, but you should never code for “best-case-scenario”.  Instead, as a secure programmer you need to see the potential problem and code for it (which we will do).

You can test for this problem by setting the boolean, testNoClose in OneTalker to true.  This will create the file and then go into a long sleep without closing the file.  In that state, when you run OneListener, the same (partial) file will be read and processed repeatedly, and you will see that the file does not get deleted.

public class OneListener {
public static void main(String[] args) {
BufferedInputStream in = null;
try {
// Same number loops as Talker
for( int i = 0; i < 5; i++ ) {
File inFile = new File( "C:\\dac\\message.txt" );
// Wait for file to appear
while( ! inFile.exists() ) {
Thread.sleep( 1000 );
}
// This is where problem may occur – 
                                // file may be being written
// In that case, a partial file will be 
                                // read and delete will fail
in = new BufferedInputStream( 
                                    new FileInputStream( inFile ) );
byte[] inBytes = new byte[2000];
int readQty;
while( (readQty = 
                                     in.read( inBytes, 0, inBytes.length )) > 0 ) 
                                {
System.out.write( inBytes, 0, readQty );
}
System.out.println();
in.close();
inFile.delete();
if( inFile.exists() ) 
                                     System.out.println( "Delete failed" );
}
} catch( Exception x ) {
x.printStackTrace();
} finally {
try {
if( in != null ) in.close();
} catch( Exception y ) {}
}
System.exit(0);
}
}

Just for consistency in our demonstration, OneListener has a “for” loop set to consume as many messages as OneTalker will be sending.  OneListener also tests to see if a message file exists.  If no message file exists, OneListener waits a second (but doesn’t have to) and checks again.  When a message file exists, OneListener consumes it then deletes the file.  That is another flag of sorts that indicates to OneTalker that he may send another message.

So let’s fix the problem with OneTalker / OneListener, and expand our scenario.  We expand the Talker code in ManyTalker to lock the file while writing.  Since we are locking the file, we can assure that only one Talker instance will be writing at any one time – so now we can handle multiple parties writing the file.  ManyTalker includes all the same code as OneTalker, with the addition of the FileLock and an extra, labeled RETRY while() loop for handling the FileLock acquisition.  Whenever we cannot acquire the lock on the file, or experience an Exception while trying to acquire the lock, we loop to the RETRY label.  When we acquire the lock, we break out of the RETRY while() loop and write the file as we did in OneTalker.

public class ManyTalker {
public static void main(String[] args) {
// Set true to sleep forever (practically) with file locked
boolean testNoRelease = false;
FileLock fl = null;
BufferedOutputStream out = null;
try {
// This loop is for demo purposes - 5 iterations
for( int i = 0; i < 5; i++ ) {
File outFile = new File( "C:\\dac\\message.txt" );
FileOutputStream outFOS = null;
RETRY: while( true ) {
// Someone else is writing or reading file
while( outFile.exists() ) {
    Thread.sleep( 1000 );
}
try {
    // In mean time, someone else might 
                                            // create file and get lock
    outFOS = new FileOutputStream( outFile );
     fl = outFOS.getChannel().tryLock();
     if( fl == null ) continue RETRY;
     // Else I got the lock! 
                                             // Break out of while(true)
     break;
} catch( Exception z ) {
     z.printStackTrace();
     continue RETRY;
}
}
System.out.println( "New File" );
out = new BufferedOutputStream( outFOS );
// Handle data as byte array - most flexible
String newString = "message: " + i;
byte[] outBytes = newString.getBytes();
out.write( outBytes, 0, outBytes.length );
out.flush();
if( testNoRelease ) Thread.sleep( 1000000000 );
// Must release lock before closing stream
fl.release();
out.close();
// Space out the messages - 10 seconds
Thread.sleep( 10000 );
}
} catch( Exception x ) {
x.printStackTrace();
} finally {
try {
if( fl != null ) fl.release();
if( out != null ) out.close();
} catch( Exception y ) {}
}
System.exit(0);
}
}

Note that the FileLock is acquired on the FileOutputStream.  It is an exclusive lock, the only kind we’ve seen so far, using the FileChannel.tryLock() method.  As in OneTalker, but in a different order, we encapsulate the FileOutputStream in a BufferedOutputStream for efficiency.  Note that the FileLock must be released before you close the stream!

Finally, we have the ManyListener class, which shares the file lock with ManyTalker.  In this case, the name ManyListener may be deceiving.  There should only be one Listener class.  The reason for that is the way we need to get a lock on the FileInputStream for reading – we pass the start value (0), the length to be read (Long.MAXVALUE) and a Boolean which indicates we are locking the file in “shared” mode.  This type of lock will keep any of our ManyTalker instances from acquiring an “exclusive” lock, but another ManyListener instance could also acquire a “shared” lock on this same file – so we should only have one ManyListener instance.

public class ManyListener {
public static void main(String[] args) {
// Set true to sleep forever (practically) with file locked
boolean testNoRelease = false;
BufferedInputStream in = null;
FileLock fl = null;
try {
// Same number loops as Talker
for( int i = 0; i < 5; i++ ) {
File inFile = new File( "C:\\dac\\message.txt" );
FileInputStream inFIS = null;
RETRY: while( true ) {
// Someone else is writing or reading file
while( ! inFile.exists() ) {
    Thread.sleep( 1000 );
}
try {
    // In mean time, someone else might get lock
    inFIS = new FileInputStream( inFile );
    // This is a shared lock, 
                                            // required for reading FileInputStream
    // It will keep Talker from acquiring 
                                            // an exclusive lock
    fl = inFIS.getChannel().
                                            tryLock(0, Long.MAX_VALUE, true );
    if( fl == null ) continue RETRY;
    // Else I got the lock! 
                                            // Break out of while(true)
    break;
} catch( Exception z ) {
    z.printStackTrace();
    System.exit(0);
    continue RETRY;
}
}
in = new BufferedInputStream( inFIS );
byte[] inBytes = new byte[2000];
int readQty;
while( ( readQty = 
                                     in.read( inBytes, 0, inBytes.length ) ) > 0 ) 
                                {
System.out.write( inBytes, 0, readQty );
}
System.out.println();
if( testNoRelease ) Thread.sleep( 1000000000 );
// Must release lock before closing and deleting
// OK, since Talker won't write to an existing file
// And there should only be one Listener!
fl.release();
in.close();
inFile.delete();
if( inFile.exists() ) 
                                    System.out.println( "Delete failed" );
}
} catch( Exception x ) {
x.printStackTrace();
} finally {
try {
if( fl != null ) fl.release();
if( in != null ) in.close();
} catch( Exception y ) {}
}
System.exit(0);
}
}

As a challenge, you might look at my use of the separate “lock.lock” file on which we acquire an exclusive FileLock, in Part 1 of this blog discussion, and use that same locking paradigm to implement a true “ManyListener” – that is, supporting multiple instances of ManyListener.  Note that ManyTalker will have to be similarly modified in order to share the FileLock on “lock.lock”.  There is a simple alternative modification that can be made to ManyListener – use a RandomAccessFile instead of the FileInputStream / BufferedInputStream combination.  When you instantiate the RandomAccessFile, declare the read/write mode (“rw”).  Then  you can get an exclusive lock (via FileChannel.tryLock()) and read the file.  However, there are some efficiency losses in that solution.  A picture is worth a thousand words, and so is code, so here is the RandomAccessFile solution.

public class ManyListener {
public static void main(String[] args) {
// Set true to sleep forever (practically) with file locked
boolean testNoRelease = true;
RandomAccessFile in = null;
FileLock fl = null;
try {
for( int i = 0; i < 5; i++ ) {
File inFile = new File( "C:\\dac\\message.txt" );
RETRY: while( true ) {
while( ! inFile.exists() ) {
    Thread.sleep( 1000 );
}
try {
    // Note that to get an exclusive lock, 
    // you must open the file for read/write
    in = new RandomAccessFile(
                                                "C:\\dac\\message.txt", "rw" );
    fl = in.getChannel().tryLock();
    if( fl == null ) continue RETRY;
    break;
} catch( Exception z ) {
    z.printStackTrace();
    System.exit(0);
    continue RETRY;
}
}
byte[] inBytes = new byte[2000];
int readQty;
while( ( readQty = 
                                    in.read( inBytes, 0, inBytes.length ) ) > 0 ) 
                               {
System.out.write( inBytes, 0, readQty );
}
System.out.println();
if( testNoRelease ) Thread.sleep( 1000000000 );
fl.release();
in.close();
inFile.delete();
if( inFile.exists() ) 
                                    System.out.println( "Delete failed" );
}
} catch( Exception x ) {
x.printStackTrace();
} finally {
try {
if( fl != null ) fl.release();
if( in != null ) in.close();
} catch( Exception y ) {}
}
System.exit(0);
}
}

For further research, you might try running the pair of ManyTalker and ManyListener with the Boolean “testNoRelease” set to true (in one at a time). You will see how they wait indefinitely for one another to release the lock.

One more challenge, if you care to extend this, is to consider how these ideas can be adapted to a two-way file exchange where each class both Talks and Listens, synchronously (I speak and you listen, then we switch roles) or asynchronously (I speak, and either you speak or listen, and I might speak again before you reply).  I can imagine the code but can’t think of a good application – perhaps “Auto-Chat”.

Now our second “aside” -- this should have been included in the “Part 1” blog of this discussion.  In addition to writing to the centralized log, you may want to read from it and place the output in a report web page.  In this case, you should acquire a FileLock on the same file used for writing to the log, as shown in the reportCentralLog() method, below.  The only concern here is that, the way this is written, you will retain the lock while you read through the entire file – thus keeping others from writing to the log for an extended period.  This is probably not a good idea.  Perhaps you should only read the tail end of the file (for example, get the file length (e.g.,  50,000) and start reading from an offset 5,000 before the end (e.g., using the byte array reading paradigm, in.read( inBytes, 45000, readQty);) ).

private static String reportCentralLog( String message ) 
            throws ServletException 
        {
StringBuffer rtrnSB = new StringBuffer();
// Need something external to this JVM to test singularity
FileOutputStream fos = null;
FileLock fl = null;
BufferedReader centralIn = null;
try {
fos = new FileOutputStream( "\\\\server\\dir\\lock.lock" );
fl = fos.getChannel().tryLock();
// Null when can't get exclusive lock on file
while (fl == null) {
try {
Thread.sleep(1000);
fl = fos.getChannel().tryLock();
} catch (Exception v) {}
}
// At this point, I have exclusive lock on file!
centralIn = new BufferedReader( 
                            new FileReader( "\\\\server\\dir\\central.log" ) );
// File read (and written) a line at a time
String inString;
while( ( inString = centralIn.readLine() ) != null ) {
rtrnSB.append( inString );
}
} catch( Exception x ) {
throw new ServletException( x.toString() );
} finally {
try {
if( centralIn != null ) centralIn.close();
} catch( Exception y ) {}
try {
if( fl != null ) fl.release();
if( fos != null ) fos.close();
} catch( Exception y ) {}
}
return rtrnSB.toString();
}

No comments:

Post a Comment