Skip to content

File ProcessManager_Mod.F90

File List > core > ProcessManager_Mod.F90

Go to the documentation of this file

module processmanager_mod
   use precision_mod
   use statemanager_mod, only : statemanagertype
   use error_mod, only : cc_success, cc_failure
   use processinterface_mod, only : processinterface, columnprocessinterface
   use processfactory_mod, only : processfactorytype
   use gridmanager_mod, only : gridmanagertype, columniteratortype
   use columninterface_mod, only : columnprocessortype, columnviewtype
   use virtualcolumn_mod, only : virtualcolumntype
   use configmanager_mod, only : configdatatype, runphasetype, processconfigtype

   implicit none
   private

   public :: processmanagertype

   ! 1. Define a wrapper otherwise the polymorphic array allocation fails
   type :: processcontainertype
      ! The 'allocatable' keyword here is the magic sauce
      class(ProcessInterface), allocatable :: item
   end type processcontainertype

   type :: processmanagertype
      private
      class(ProcessContainerType), allocatable, public  :: processes(:)
      integer :: num_processes = 0
      integer :: max_processes = 50
      type(ProcessFactoryType) :: factory
      type(ColumnProcessorType) :: column_processor
      character(len=64), public, allocatable :: required_met_fields(:)
   contains
      procedure :: init => manager_init
      procedure :: add_process => manager_add_process
      procedure :: run_all => manager_run_all
      procedure :: run_process => manager_run_process
      procedure :: run_column_processes => manager_run_column_processes
      procedure :: run_process_on_columns => manager_run_process_on_columns
      procedure :: finalize => manager_finalize
      procedure :: list_processes => manager_list_processes
      procedure :: get_column_processes => manager_get_column_processes
      procedure :: run_phase => manager_run_phase
      procedure :: run_all_phases => manager_run_all_phases
      procedure :: run_all_processes => manager_run_all_processes
      procedure :: set_max_processes => manager_set_max_processes
      procedure :: enable_column_batching => manager_enable_column_batching
      procedure :: register_process => manager_register_process
      procedure, private :: add_met_fields_from_process => manager_add_met_fields_from_process
   end type processmanagertype

contains

   subroutine manager_init(this, rc)
      class(ProcessManagerType), intent(inout) :: this
      integer, intent(out) :: rc

      call this%factory%init(rc)
      if (rc /= cc_success) return

      ! Initialize counters - allocatable array will be allocated on first assignment
      this%num_processes = 0

      ! Initialize empty required met fields array
      if (allocated(this%required_met_fields)) deallocate(this%required_met_fields)
      allocate(this%required_met_fields(0))

      ! Initialize column processor with default max columns
      call this%column_processor%init(100, rc)  ! Default max columns
      if (rc /= cc_success) return

      rc = cc_success
   end subroutine manager_init

   subroutine manager_add_process(this, process_name, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      character(len=*), intent(in) :: process_name
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: i
      class(ProcessInterface), allocatable :: new_process
      class(ProcessContainerType), allocatable :: tmp(:)

      if (this%num_processes >= this%max_processes) then
         rc = cc_failure
         return
      endif

      ! Create the process (scheme is read from configuration)
      call this%factory%create_process(process_name, container, new_process, rc)
      if (rc /= cc_success) return

      ! Initialize the process
      call new_process%init(container, rc)
      if (rc /= cc_success) return

      ! Set timestep for each process to the same value.
      call new_process%set_timestep(container%tstep)

      ! Collect required met fields from this process
      call this%add_met_fields_from_process(new_process, rc)
      if (rc /= cc_success) return

      ! Add to manager
      this%num_processes = this%num_processes + 1

      ! Check bounds first
      if (this%num_processes > this%max_processes) then
         rc = cc_failure
         return
      endif

      ! ! Handle polymorphic array allocation on first use
      ! if (.not. allocated(this%processes)) then
      !    ! For polymorphic arrays, we need to allocate with proper bounds
      !    ! We'll allocate the whole array when adding the first process
      !    block
      !       class(ProcessInterface), allocatable :: temp_array(:)
      !       allocate(temp_array(this%max_processes), source=new_process)
      !       call move_alloc(temp_array, this%processes)
      !    end block
      ! else
      !    ! Subsequent assignments - just copy into the allocated slot
      !    allocate(this%processes(this%num_processes), source=new_process)
      ! endif

      ! Allocate a temporary array of WRAPPERS with the new size
      allocate(tmp(this%num_processes))

      ! Move existing items into the new array
      ! We check if there was previous data to move
      if (allocated(this%processes)) then
         do i = 1, this%num_processes - 1
            ! We can move the internal allocatable item safely!
            call move_alloc(from=this%processes(i)%item, to=tmp(i)%item)
         end do
         ! Optional: Deallocate the old empty shell (Fortran does this auto, but safe to be explicit)
         deallocate(this%processes)
      endif

      ! Move the new process into the last slot
      call move_alloc(from=new_process, to=tmp(this%num_processes)%item)

      ! Move the wrapper array back to the manager
      call move_alloc(from=tmp, to=this%processes)

      rc = cc_success
   end subroutine manager_add_process

   subroutine manager_run_all(this, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: i, local_rc
      type(GridManagerType), pointer :: grid_mgr

      ! If there are no processes, succeed immediately
      if (this%num_processes == 0) then
         rc = cc_success
         return
      endif

      rc = cc_success

      ! Get grid manager from container
      grid_mgr => container%get_grid_manager()
      if (.not. associated(grid_mgr)) then
         rc = cc_failure
         return
      endif

      do i = 1, this%num_processes
         if (this%processes(i)%item%is_ready()) then
            ! Check if this is a column process
            select type(proc => this%processes(i)%item)
             class is (columnprocessinterface)
               ! Run column-based process
               call this%run_process_on_columns(i, container, local_rc)
             class default
               ! Run traditional 3D process
               call this%processes(i)%item%run(container, local_rc)
            end select

            if (local_rc /= cc_success) then
               rc = local_rc
               return
            endif
         endif
      enddo
   end subroutine manager_run_all

   subroutine manager_run_column_processes(this, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: i, local_rc, col_i, col_j, col_id
      type(GridManagerType), pointer :: grid_mgr
      type(ColumnIteratorType) :: col_iter
      type(VirtualColumnType) :: virtual_col

      rc = cc_success

      ! Get grid manager from container
      grid_mgr => container%get_grid_manager()
      if (.not. associated(grid_mgr)) then
         rc = cc_failure
         return
      endif

      ! Initialize column iterator using create_column_iterator
      col_iter = grid_mgr%create_column_iterator()

      ! Process each column
      do while (col_iter%has_next())
         call col_iter%next(rc)
         if (rc /= cc_success) return

         ! Get current column indices (i, j)
         call col_iter%get_current_indices(col_i, col_j)

         ! Get current column ID
         call col_iter%get_current_column_id(col_id)

         ! Create and populate virtual column for this (i, j)
         call container%create_virtual_column(col_i, col_j, col_id, virtual_col, rc)
         if (rc /= cc_success) return

         ! Run all column processes on this column
         do i = 1, this%num_processes
            select type(proc => this%processes(i)%item)
             class is (columnprocessinterface)
               if (proc%is_ready()) then
                  call proc%run_column(virtual_col, container, local_rc)
                  if (local_rc /= cc_success) then
                     rc = local_rc
                     return
                  endif
               endif
            end select
         enddo

         ! Apply virtual column changes back to container
         call container%apply_virtual_column(virtual_col, rc)
         if (rc /= cc_success) return
      enddo
      ! Clean up virtual column
      if (virtual_col%is_valid)  call virtual_col%cleanup()

   end subroutine manager_run_column_processes

   subroutine manager_run_process_on_columns(this, process_index, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      integer, intent(in) :: process_index
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      type(GridManagerType), pointer :: grid_mgr
      type(ColumnIteratorType) :: col_iter
      type(VirtualColumnType) :: virtual_col
      integer :: col_i, col_j, col_id

      rc = cc_success

      if (process_index < 1 .or. process_index > this%num_processes) then
         rc = cc_failure
         return
      endif

      ! Get grid manager from container
      grid_mgr => container%get_grid_manager()
      if (.not. associated(grid_mgr)) then
         rc = cc_failure
         return
      endif

      select type(proc => this%processes(process_index)%item)
       class is (columnprocessinterface)
         ! Initialize column iterator using create_column_iterator
         col_iter = grid_mgr%create_column_iterator()

         ! Process each column
         do while (col_iter%has_next())
            call col_iter%next(rc)
            if (rc /= cc_success) return

            ! Get current column indices
            call col_iter%get_current_indices(col_i, col_j)

            ! Get current column ID
            call col_iter%get_current_column_id(col_id)

            call container%create_virtual_column(col_i, col_j, col_id, virtual_col, rc)
            if (rc /= cc_success) return

            call proc%run_column(virtual_col, container, rc)
            if (rc /= cc_success) return

            ! Apply virtual column changes back to container for each column
            call container%apply_virtual_column(virtual_col, rc)
            if (rc /= cc_success) return
         enddo
         ! Clean up virtual column
         if (virtual_col%is_valid)  call virtual_col%cleanup()

      end select
   end subroutine manager_run_process_on_columns

   subroutine manager_run_process(this, process_name, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      character(len=*), intent(in) :: process_name
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: i

      rc = cc_failure
      do i = 1, this%num_processes
         if (trim(this%processes(i)%item%get_name()) == trim(process_name)) then
            ! Check if this is a column process and run appropriately
            select type(proc => this%processes(i)%item)
             class is (columnprocessinterface)
               call this%run_process_on_columns(i, container, rc)
             class default
               call this%processes(i)%item%run(container, rc)
            end select
            return
         endif
      enddo
   end subroutine manager_run_process

   subroutine manager_get_column_processes(this, column_indices, count)
      class(ProcessManagerType), intent(in) :: this
      integer, intent(out) :: column_indices(:)
      integer, intent(out) :: count

      integer :: i, max_count

      count = 0
      max_count = min(this%num_processes, size(column_indices))

      do i = 1, this%num_processes
         select type(proc => this%processes(i)%item)
          class is (columnprocessinterface)
            count = count + 1
            if (count <= max_count) then
               column_indices(count) = i
            endif
         end select
      enddo
   end subroutine manager_get_column_processes

   subroutine manager_run_phase(this, phase_name, config_data, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      character(len=*), intent(in) :: phase_name
      type(ConfigDataType), intent(in) :: config_data
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: j, local_rc, phase_idx, process_idx
      type(RunPhaseType) :: current_phase
      type(ProcessConfigType) :: process_config
      logical :: phase_found

      rc = cc_success
      phase_found = .false.

      ! Check if run phases are configured
      if (.not. allocated(config_data%run_phases)) then
         write(*,*) 'WARNING: No run phases configured, skipping phase execution'
         return
      endif

      ! Find the requested phase
      do phase_idx = 1, size(config_data%run_phases)
         if (trim(config_data%run_phases(phase_idx)%name) == trim(phase_name)) then
            current_phase = config_data%run_phases(phase_idx)
            phase_found = .true.
            exit
         endif
      enddo

      if (.not. phase_found) then
         write(*,*) 'WARNING: Phase "', trim(phase_name), '" not found in configuration'
         rc = cc_failure
         return
      endif

      write(*,*) 'INFO: Running phase "', trim(phase_name), '" with ', current_phase%num_processes, ' processes'

      ! Loop through each process in this phase
      do j = 1, current_phase%num_processes
         process_config = current_phase%processes(j)

         ! Check if process is enabled
         if (.not. process_config%enabled) then
            write(*,*) 'INFO: Skipping disabled process: ', trim(process_config%name)
            cycle
         endif

         ! Map process_index to actual process in manager's array
         process_idx = process_config%process_index

         ! Validate process index bounds
         if (process_idx < 1 .or. process_idx > this%num_processes) then
            write(*,*) 'WARNING: Process index ', process_idx, ' out of bounds for process: ', &
               trim(process_config%name)
            cycle
         endif

         write(*,*) 'INFO: Running process: ', trim(process_config%name), ' (index=', process_idx, ')'

         ! Run the process based on its type
         if (this%processes(process_idx)%item%is_ready()) then
            select type(proc => this%processes(process_idx)%item)
             class is (columnprocessinterface)
               !!write(*,*) 'Test phase process', process_idx, trim(proc%name) !debug only
               call this%run_process_on_columns(process_idx, container, local_rc)
             class default
               call this%processes(process_idx)%item%run(container, local_rc)
            end select

            if (local_rc /= cc_success) then
               write(*,*) 'ERROR: Process ', trim(process_config%name), ' failed with code: ', local_rc
               rc = local_rc
               return
            endif
         else
            write(*,*) 'WARNING: Process ', trim(process_config%name), ' is not ready'
         endif
      enddo

      write(*,*) 'INFO: Phase "', trim(phase_name), '" completed successfully'

   end subroutine manager_run_phase

   subroutine manager_run_all_phases(this, config_data, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      type(ConfigDataType), intent(in) :: config_data
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      integer :: phase_idx, local_rc
      character(len=64) :: phase_name

      rc = cc_success

      ! Check if run phases are configured
      if (.not. allocated(config_data%run_phases)) then
         write(*,*) 'WARNING: No run phases configured, skipping all phases execution'
         return
      endif

      if (size(config_data%run_phases) == 0) then
         write(*,*) 'WARNING: Empty run phases array, skipping all phases execution'
         return
      endif

      write(*,*) 'INFO: Running all phases - total phases: ', size(config_data%run_phases)

      ! Loop through all phases and run each one
      do phase_idx = 1, size(config_data%run_phases)
         phase_name = config_data%run_phases(phase_idx)%name

         write(*,*) 'INFO: Starting phase ', phase_idx, ' of ', size(config_data%run_phases), &
            ': "', trim(phase_name), '"'

         ! Run this phase
         call this%run_phase(phase_name, config_data, container, local_rc)
         if (local_rc /= cc_success) then
            write(*,*) 'ERROR: Phase "', trim(phase_name), '" failed with code: ', local_rc
            rc = local_rc
            return
         endif

         write(*,*) 'INFO: Phase "', trim(phase_name), '" completed successfully'
      enddo

      write(*,*) 'INFO: All ', size(config_data%run_phases), ' phases completed successfully'

   end subroutine manager_run_all_phases

   subroutine manager_run_all_processes(this, container, rc)
      class(ProcessManagerType), intent(inout) :: this
      type(StateManagerType), intent(inout) :: container
      integer, intent(out) :: rc

      call this%run_all(container, rc)

   end subroutine manager_run_all_processes

   subroutine manager_set_max_processes(this, max_processes, rc)
      class(ProcessManagerType), intent(inout) :: this
      integer, intent(in) :: max_processes
      integer, intent(out) :: rc

      ! TODO: Reallocate process array if needed
      this%max_processes = max_processes
      rc = cc_success

   end subroutine manager_set_max_processes

   subroutine manager_enable_column_batching(this, enable, rc)
      class(ProcessManagerType), intent(inout) :: this
      logical, intent(in) :: enable
      integer, intent(out) :: rc

      ! TODO: Configure column processor batching mode
      rc = cc_success

   end subroutine manager_enable_column_batching

   subroutine manager_finalize(this, rc)
      class(ProcessManagerType), intent(inout) :: this
      integer, intent(out) :: rc

      integer :: i, local_rc

      rc = cc_success
      do i = 1, this%num_processes
         call this%processes(i)%item%finalize(local_rc)
         if (local_rc /= cc_success) then
            rc = local_rc
         endif
      enddo

      ! Clean up column processor
      call this%column_processor%cleanup()

      if (allocated(this%processes)) deallocate(this%processes)
      if (allocated(this%required_met_fields)) deallocate(this%required_met_fields)
      this%num_processes = 0
   end subroutine manager_finalize

   subroutine manager_list_processes(this, process_names, count)
      class(ProcessManagerType), intent(in) :: this
      character(len=64), intent(out) :: process_names(:)
      integer, intent(out) :: count

      integer :: i, max_count

      max_count = min(this%num_processes, size(process_names))
      do i = 1, max_count
         process_names(i) = this%processes(i)%item%get_name()
      enddo
      count = max_count
   end subroutine manager_list_processes

   subroutine manager_register_process(this, name, category, description, creator, rc)
      use processregistry_mod, only: processcreatorinterface

      class(ProcessManagerType), intent(inout) :: this
      character(len=*), intent(in) :: name, category, description
      procedure(ProcessCreatorInterface) :: creator
      integer, intent(out) :: rc

      call this%factory%register_process(name, category, description, creator, rc)
   end subroutine manager_register_process

   subroutine manager_add_met_fields_from_process(this, process, rc)
      class(ProcessManagerType), intent(inout) :: this
      class(ProcessInterface), intent(in) :: process
      integer, intent(out) :: rc

      character(len=64), allocatable :: new_fields(:), merged_fields(:)
      character(len=64), allocatable :: current_fields(:)
      integer :: i, j, current_size, new_size, merged_size
      logical :: field_exists

      rc = cc_success

      ! Get required fields from the new process
      new_fields = process%get_required_met_fields()
      new_size = size(new_fields)

      ! If no new fields, nothing to do
      if (new_size == 0) return

      ! Get current fields (make a copy for merging)
      if (allocated(this%required_met_fields)) then
         current_size = size(this%required_met_fields)
         allocate(current_fields(current_size))
         current_fields = this%required_met_fields
      else
         current_size = 0
         allocate(current_fields(0))
      endif

      ! Merge fields, avoiding duplicates and filtering out TSTEP
      ! Worst case: all new fields are unique, so allocate maximum possible size
      allocate(merged_fields(current_size + new_size))
      merged_size = 0

      ! Start with current fields, but filter out TSTEP
      do i = 1, current_size
         if (trim(adjustl(current_fields(i))) /= 'TSTEP' .and. &
            trim(adjustl(current_fields(i))) /= 'LON'   .and. &
            trim(adjustl(current_fields(i))) /= 'LAT') then
            merged_size = merged_size + 1
            merged_fields(merged_size) = current_fields(i)
         endif
      end do

      ! Add new fields if they're not already present and not TSTEP
      do i = 1, new_size
         field_exists = .false.

         ! Skip TSTEP field (case insensitive)
         if (trim(adjustl(new_fields(i))) == 'TSTEP' .or. &
            trim(adjustl(new_fields(i))) == 'LON'   .or. &
            trim(adjustl(new_fields(i))) == 'LAT') then
            cycle
         endif

         ! Check if this field already exists (case insensitive)
         do j = 1, merged_size
            if (trim(adjustl(new_fields(i))) == trim(adjustl(merged_fields(j)))) then
               field_exists = .true.
               exit
            endif
         end do

         ! Add if it's a new field
         if (.not. field_exists) then
            merged_size = merged_size + 1
            merged_fields(merged_size) = new_fields(i)
         endif
      end do

      ! Update the manager's required fields list with proper size
      if (allocated(this%required_met_fields)) deallocate(this%required_met_fields)
      allocate(this%required_met_fields(merged_size))
      this%required_met_fields(1:merged_size) = merged_fields(1:merged_size)

      ! Cleanup
      if (allocated(current_fields)) deallocate(current_fields)
      if (allocated(new_fields)) deallocate(new_fields)
      if (allocated(merged_fields)) deallocate(merged_fields)

   end subroutine manager_add_met_fields_from_process

end module processmanager_mod