.exe中的Python子进程

我正在创建一个python脚本,它将通过网络复制文件和文件夹。 它是跨平台的,所以我使用cx_freeze创建一个.exe文件

我使用了子进程模块的Popen方法

如果我运行.py文件它正在按预期运行,但是当我创建.exe子进程没有在系统中创建

我已经通过了子进程模块的所有文档,但是我没有找到任何解决方案

一切(我使用的Tkinter也可以正常工作)在.exe接受子进程中工作。

任何想法如何在.exe.file中调用子进程?

该文件正在调用另一个.py文件

def start_scheduler_action(self, scheduler_id, scheduler_name, list_index):
       scheduler_detail=db.get_scheduler_detail_using_id(scheduler_id)
        for detail in scheduler_detail:
            source_path=detail[2]
        if not os.path.exists(source_path):
            showerror("Invalid Path","Please select valid path", parent=self.new_frame)
            return

        self.forms.new_scheduler.start_scheduler_button.destroy()

        #Create stop scheduler button
        if getattr(self.forms.new_scheduler, "stop_scheduler_button", None)==None:

            self.forms.new_scheduler.stop_scheduler_button = tk.Button(self.new_frame, text='Stop scheduler', width=10, command=lambda:self.stop_scheduler_action(scheduler_id, scheduler_name, list_index))
            self.forms.new_scheduler.stop_scheduler_button.grid(row=11, column=1, sticky=E, pady=10, padx=1)

        scheduler_id=str(scheduler_id)

        # Get python paths
        if sys.platform == "win32":
            proc = subprocess.Popen(['where', "python"], env=None, stdout=subprocess.PIPE)

        else:
            proc = subprocess.Popen(['which', "python"], env=None,stdout=subprocess.PIPE)

        out, err = proc.communicate()

        if err or not out:
            showerror("", "Python not found", parent=self.new_frame)

        else:

            try:
                paths = out.split(os.pathsep)

                # Create python path
                python_path = (paths[len(paths) - 1]).split('n')[0]

                cmd = os.path.realpath('scheduler.py')
                #cmd='scheduler.py'

                if sys.platform == "win32":
                    python_path=python_path.splitlines()

                else:
                    python_path=python_path

                # Run the scheduler file using scheduler id

                proc = subprocess.Popen([python_path, cmd, scheduler_id], env=None, stdout=subprocess.PIPE)


                message="Started the scheduler : %s" %(scheduler_name)
                showinfo("", message, parent=self.new_frame)

                #Add process id to scheduler table
                process_id=proc.pid
                #showinfo("pid", process_id, parent=self.new_frame)
                def get_process_id(name):
                    child = subprocess.Popen(['pgrep', '-f', name], stdout=subprocess.PIPE, shell=False)
                    response = child.communicate()[0]
                    return [int(pid) for pid in response.split()]

                print(get_process_id(scheduler_name))

                # Add the process id in database
                self.db.add_process_id(scheduler_id, process_id)

                # Add the is_running status in database
                self.db.add_status(scheduler_id)

            except Exception as e:

                showerror("", e)

这个文件被称为:

def scheduler_copy():

    date= strftime("%m-%d-%Y %H %M %S", localtime())
    logFile = scheduler_name + "_"+scheduler_id+"_"+ date+".log"
    #file_obj=open(logFile, 'w')

    # Call __init__ method of xcopy file 
    xcopy=XCopy(connection_ip, username , password, client_name, server_name, domain_name)
    check=xcopy.connect()

    # Cretae a log file for scheduler
    file_obj=open(logFile, 'w')

    if check is False:

        file_obj.write("Problem in connection..Please check connection..!!")
        return

    scheduler_next_run=schedule.next_run()
    scheduler_next_run="Next run at: " +str(scheduler_next_run)

    # If checkbox_value selected copy all the file to new directory
    if checkbox_value==1:
        new_destination_path=xcopy.create_backup_directory(share_folder, destination_path, date)
    else:
        new_destination_path=destination_path

    # Call backup method for coping data from source to destination
    try:
        xcopy.backup(share_folder, source_path, new_destination_path, file_obj, exclude)
        file_obj.write("Scheduler completed successfully..n")

    except Exception as e:

        # Write the error message of the scheduler to log file
        file_obj.write("Scheduler failed to copy all data..nProblem in connection..Please check connection..!!n")
        # #file_obj.write("Error while scheduling")
        # return

    # Write the details of scheduler to log file
    file_obj.write("Total skipped unmodified file:")
    file_obj.write(str(xcopy.skipped_unmodified_count))
    file_obj.write("n")
    file_obj.write("Total skipped file:")
    file_obj.write(str(xcopy.skipped_file))
    file_obj.write("n")
    file_obj.write("Total copied file:")
    file_obj.write(str(xcopy.copy_count))
    file_obj.write("n")
    file_obj.write("Total skipped folder:")
    file_obj.write(str(xcopy.skipped_folder))
    file_obj.write("n")
    # file_obj.write(scheduler_next_run)
    file_obj.close()

源代码中有一些尴尬,但我不会在这方面花费时间。 例如,如果你想找到source_path,最好用break / else使用for循环:

for detail in scheduler_detail:
    source_path = detail[2]
    break  # found
else:
    # not found: raise an exception
    ...

一些忠告:

  • 尝试分离用户界面代码和子处理,避免混淆两者。
  • 使用异常和异常处理程序。
  • 如果你想要便携式代码:避免系统调用(Windows上没有pgrep )。

  • 由于你的应用程序被打包在一个virtualenv中(我假定cx_freeze做这种事情),你无法访问系统范围的Python。 你甚至没有在Windows上。 所以你需要使用打包的Python(无论如何这是最佳实践)。

    如果您想调用像子scheduler.py一样的Python脚本,这意味着您有两个打包的应用程序:您需要为主应用程序 scheduler.py脚本创建一个exe 。 但是,与它沟通并不容易。

    另一个解决方案是使用multiprocessing来产生一个新的Python进程。 由于您不想等待处理结束(可能很长),因此您需要创建守护程序进程。 在multiprocessing模块中解释了这一点。

    基本上:

    import time
    from multiprocessing import Process
    
    def f(name):
        print('hello', name)
    
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.daemon = True
        p.start()
    
        # let it live and die, don't call: `p.join()`
        time.sleep(1)
    

    当然,我们需要适应您的问题。

    以下是我如何做到这一点(为了清晰起见,我删除了与UI相关的代码):

    import scheduler
    
    
    class SchedulerError(Exception):
        pass
    
    
    class YourClass(object):
        def start_scheduler_action(self, scheduler_id, scheduler_name, list_index):
            scheduler_detail = db.get_scheduler_detail_using_id(scheduler_id)
            for detail in scheduler_detail:
                source_path = detail[2]
                break
            else:
                raise SchedulerError("Invalid Path", "Missing source path", parent=self.new_frame)
    
            if not os.path.exists(source_path):
                raise SchedulerError("Invalid Path", "Please select valid path", parent=self.new_frame)
    
            p = Process(target=scheduler.scheduler_copy, args=('source_path',))
            p.daemon = True
            p.start()
    
            self.db.add_process_id(scheduler_id, p.pid)
    

    要检查您的流程是否仍在运行,我建议您使用psutil。 这真是一个伟大的工具!

    你可以像这样定义你的scheduler.py脚本:

    def scheduler_copy(source_path):
        ...
    

    多进程与线程Python

    引用这个答案:https://stackoverflow.com/a/3044626/1513933

    threading模块使用线程, multiprocessing模块使用进程。 不同之处在于线程运行在相同的内存空间中,而进程具有单独的内存。 这使得在多进程进程之间共享对象变得更加困难。 由于线程使用相同的内存,因此必须采取预防措施,或者两个线程同时写入同一内​​存。 这是全球解释器锁的用途。

    这里,多线程多处理的优点是可以杀死(或终止)进程; 你不能杀死一个线程。 你可能需要psutil。


    这不是您正在寻找的确切解决方案,但有两个原因应该首选以下建议。

  • 这些是更pythonic的方式
  • subprocess稍微昂贵
  • 您可以考虑的建议

  • 不要使用子进程获取系统路径。 尝试检查os.getenv('PATH')以获取env变量并尝试查找python是否在路径中。 对于Windows,你必须手动添加Python路径,否则你可以直接检查我猜的Program Files

  • 为了检查进程ID,你可以尝试psutils 。 这里提供了一个很好的答案,我如何在Python中获得进程列表?

  • 从python脚本调用另一个脚本。 这看起来不很酷。 不错,但我不会喜欢这个。

  • 在上面的代码中,行 - if sys.platform == "win32":ifelse条件==中具有相同的值, if在这里不需要条件语句。

  • 你写了相当好的工作代码来告诉你。 保持编码!


    如果你想在exe文件中运行一个子进程,那么你可以使用

    import subprocess
    
    program=('example')
    arguments=('/command')
    subprocess.call([program, arguments])
    
    链接地址: http://www.djcxy.com/p/55219.html

    上一篇: Python subprocess in .exe

    下一篇: Python multithreading best practices