Safe, or playing with fire?

Windows service: Generating a set of FileWatcher objects from a list of directories to watch in a config file, have the following requirements:

  • File processing can be time consuming - events must be handled on their own task threads
  • Keep handles to the event handler tasks to wait for completion in an OnStop() event.
  • Track the hashes of uploaded files; don't reprocess if not different
  • Persist the file hashes to allow OnStart() to process files uploaded while the service was down.
  • Never process a file more than once.
  • (Regarding #3, we do get events when there are no changes... most notably because of the duplicate-event issue with FileWatchers)

    To do these things, I have two dictionaries - one for the files uploaded, and one for the tasks themselves. Both objects are static, and I need to lock them when adding/removing/updating files and tasks. Simplified code:

    public sealed class TrackingFileSystemWatcher : FileSystemWatcher {
    
        private static readonly object fileWatcherDictionaryLock = new object();
        private static readonly object runningTaskDictionaryLock = new object();
    
        private readonly Dictionary<int, Task> runningTaskDictionary = new Dictionary<int, Task>(15);
        private readonly Dictionary<string, FileSystemWatcherProperties>  fileWatcherDictionary = new Dictionary<string, FileSystemWatcherProperties>();
    
        //  Wired up elsewhere
        private void OnChanged(object sender, FileSystemEventArgs eventArgs) {
            this.ProcessModifiedDatafeed(eventArgs);
        }
    
        private void ProcessModifiedDatafeed(FileSystemEventArgs eventArgs) {
    
            lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
    
                //  Read the file and generate hash here
    
                //  Properties if the file has been processed before
                //  ContainsNonNullKey is an extension method
                if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) {
    
                    try {
                        fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
                    }
                    catch (KeyNotFoundException keyNotFoundException) {}
                    catch (ArgumentNullException argumentNullException) {}
                }
                else {  
                    // Create a new properties object
                }
    
    
                fileProperties.ChangeType = eventArgs.ChangeType;
                fileProperties.FileContentsHash = md5Hash;
                fileProperties.LastEventTimestamp = DateTime.Now;
    
                Task task;
                try {
                    task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
                }
                catch {
                  ..
                }
    
                //  Only lock long enough to add the task to the dictionary
                lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
                     try {
                        this.runningTaskDictionary.Add(task.Id, task);  
                    }
                    catch {
                      ..
                    }    
                }
    
    
                try {
                    task.ContinueWith(t => {
                        try {
                            lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
                                this.runningTaskDictionary.Remove(t.Id);
                            }
    
                            //  Will this lock burn me?
                            lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
                                //  Persist the file watcher properties to
                                //  disk for recovery at OnStart()
                            }
                        }
                        catch {
                          ..
                        }
                    });
    
                    task.Start();
                }
                catch {
                  ..
                }
    
    
            }
    
        }
    
    }
    

    What's the effect of requesting a lock on the FileSystemWatcher collection in the ContinueWith() delegate when the delegate is defined within a lock on the same object? I would expect it to be fine, that even if the task starts, completes, and enters the ContinueWith() before ProcessModifiedDatafeed() releases the lock, the task thread would simply be suspended until the creating thread has released the lock. But I want to make sure I'm not stepping on any delayed execution landmines.

    Looking at the code, I may be able to release the lock sooner, avoiding the issue, but I'm not certain yet... need to review the full code to be sure.


    UPDATE

    To stem the rising "this code is terrible" comments, there are very good reasons why I catch the exceptions I do, and am catching so many of them. This is a Windows service with multi-threaded handlers, and it may not crash. Ever. Which it will do if any of those threads have an unhandled exception.

    Also, those exceptions are written to future bulletproofing. The example I've given in comments below would be adding a factory for the handlers... as the code is written today, there will never be a null task, but if the factory is not implemented correctly, the code could throw an exception. Yes, that should be caught in testing. However, I have junior developers on my team... "May. Not. Crash." (also, it must shut down gracefully if there is an unhandled exception, allowing currently-running threads to complete - which we do with an unhandled exception handler set in main()). We have enterprise-level monitors configured to send alerts when application errors appear on the event log – those exceptions will log and flag us. The approach was a deliberate and discussed decision.

    Each possible exception has each been carefully considered and chosen to fall into one of two categories - those that apply to a single datafeed and will not shut down the service (the majority), and those that indicate clear programming or other errors that fundamentally render the code useless for all datafeeds. For example, we've chosen to shut down the service down if we can't write to the event log, as that's our primary mechanism for indicating datafeeds are not getting processed. The exceptions are caught locally, because the local context is the only place where the decision to continue can be made. Furthermore, allowing exceptions to bubble up to higher levels (1) violates the concept of abstraction, and (2) makes no sense in a worker thread.

    I'm surprised at the number of people who argue against handling exceptions. If I had a dime for every try..catch(Exception){do nothing} I see, you'd get your change in nickels for the rest of eternity. I would argue to the death1 that if a call into the .NET framework or your own code throws an exception, you need to consider the scenario that would cause that exception to occur and explicitly decide how it should be handled. My code catches UnauthorizedExceptions in IO operations, because when I considered how that could happen, I realized that adding a new datafeed directory requires permissions to be granted to the service account (it won't have them by default).

    I appreciate the constructive input... just please don't criticize simplified example code with a broad "this sucks" brush. The code does not suck - it is bulletproof, and necessarily so.


    1 I would only argue a really long time if Jon Skeet disagrees


    No it will not burn you. Even if the ContinueWith is inlined into to the current thread that was running the new Task(() => new DatafeedUploadHandler().. it will get the lock eg no dead lock. The lock statement is using the Monitor class internally, and it is reentrant . eg a thread can aquire a lock multiple times if it already got/owns the lock. Multithreading and Locking (Thread-Safe operations)

    And the other case where the task.ContinueWith starts before the ProcessModifiedDatafeed finished is like you said. The thread that is running the ContinueWith simply would have to wait to get the lock.

    I would really consider to do the task.ContinueWith and the task.Start() outside of the lock if you reviewed it. And it is possible based on your posted code.

    You should also take a look at the ConcurrentDictionary in the System.Collections.Concurrent namespace. It would make the code easier and you dont have to manage the locking yourself. You are doing some kind of compare exchange/update here if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) . eg only add if not already in the dictionary. This is one atomic operation. There is no function to do this with a ConcurrentDictionary but there is an AddOrUpdate method. Maybe you can rewrite it by using this method. And based on your code you could safely use the ConcurrentDictionary at least for the runningTaskDictionary

    Oh and TaskCreationOptions.LongRunning is literally creating a new thread for every task which is kind of an expensive operation. The windows internal thread pool is intelligent in new windows versions and is adapting dynamically. It will "see" that you are doing lots of IO stuff and will spawn new threads as needed and practical.

    Greetings


    First, your question: it's not a problem in itself to request lock inside ContinueWith. If you bother you do that inside another lock block - just don't. Your continuation will execute asynchronously, in different time, different thread.

    Now, code itself is questionable. Why do you use many try-catch blocks around statements that almost cannot throw exceptions? For example here:

     try {
         task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
     }
     catch {}
    

    You just create task - I cannot imagine when this can throw. Same story with ContinueWith. Here:

    this.runningTaskDictionary.Add(task.Id, task); 
    

    you can just check if such key already exists. But even that is not necessary because task.Id is unique id for given task instance which you just created. This:

    try {
        fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
    }
    catch (KeyNotFoundException keyNotFoundException) {}
    catch (ArgumentNullException argumentNullException) {}
    

    is even worse. You should not use exceptions lile this - don't catch KeyNotFoundException but use appropriate methods on Dictionary (like TryGetValue).

    So to start with, remove all try catch blocks and either use one for the whole method, or use them on statements that can really throw exceptions and you cannot handle that situation otherwise (and you know what to do with exception thrown).

    Then, your approach to handle filesystem events is not quite scaleable and reliable. Many programs will generate multiple change events in short intervals when they are saving changes to a file (there are also other cases of multiple events for the same file going in sequence). If you just start processing file on every event, this might lead to different kind of troubles. So you might need to throttle events coming for a given file and only start processing after certain delay after last detected change. That might be a bit advanced stuff, though.

    Don't forget to grab a read lock on the file as soon as possible, so that other processes cannot change file while you are working with it (for example, you might calculate md5 of a file, then someone changes file, then you start uploading - now your md5 is invalid). Other approach is to record last write time and when it comes to uploading - grab read lock and check if file was not changed in between.

    What is more important is that there can be a lot of changes at once. Say I copied 1000 files very fast - you do not want to start uploading them all at once with 1000 threads. You need a queue of files to process, and take items from that queue with several threads. This way thousands of events might happen at once and your upload will still work reliably. Right now you create new thread for each change event, where you immediatly start upload (according to method names) - this will fail under serious load of events (and in cases described above).


    I have not fully followed the logic of this code but are you aware that task continuations and calls to Wait/Result can be inlined onto the current thread? This can cause reentrancy.

    This is very dangerous and has burned many.

    Also I don't quite see why you are starting task delayed. This is a code smell. Also why are you wrapping the task creation with try ? This can never throw.

    This clearly is a partial answer. But the code looks very tangled to me. If it's this hard to audit it you probably should write it differently in the first place.

    链接地址: http://www.djcxy.com/p/29446.html

    上一篇: 更好的方式多次调用函数

    下一篇: 安全还是玩火?