网络编程
位置:首页>> 网络编程>> Python编程>> python多线程方法详解

python多线程方法详解

作者:python_rser  发布时间:2023-10-16 02:46:31 

标签:python,多线程

处理多个数据和多文件时,使用for循环的速度非常慢,此时需要用多线程来加速运行进度,常用的模块为multiprocess和joblib,下面对两种包我常用的方法进行说明。

1、模块安装

pip install multiprocessing
pip install joblib

2、以分块计算NDVI为例

首先导入需要的包

import numpy as np
from osgeo import gdal
import time
from multiprocessing import cpu_count
from multiprocessing import Pool
from joblib import Parallel, delayed

定义GdalUtil类,以读取遥感数据

class GdalUtil:
   def __init__(self):
       pass
   @staticmethod
   def read_file(raster_file, read_band=None):
       """读取栅格数据"""
       # 注册栅格驱动
       gdal.AllRegister()
       gdal.SetConfigOption('gdal_FILENAME_IS_UTF8', 'YES')
       # 打开输入图像
       dataset = gdal.Open(raster_file, gdal.GA_ReadOnly)
       if dataset == None:
           print('打开图像{0} 失败.\n', raster_file)
       # 列
       raster_width = dataset.RasterXSize
       # 行
       raster_height = dataset.RasterYSize
       # 读取数据
       if read_band == None:
           data_array = dataset.ReadAsArray(0, 0, raster_width, raster_height)
       else:
           band = dataset.GetRasterBand(read_band)
           data_array = band.ReadAsArray(0, 0, raster_width, raster_height)
       return data_array

@staticmethod
   def read_block_data(dataset, band_num, cols_read, rows_read, start_col=0, start_row=0):
       band = dataset.GetRasterBand(band_num)
       res_data = band.ReadAsArray(start_col, start_row, cols_read, rows_read)
       return res_data

@staticmethod
   def get_raster_band(raster_path):
       # 注册栅格驱动
       gdal.AllRegister()
       gdal.SetConfigOption('gdal_FILENAME_IS_UTF8', 'YES')
       # 打开输入图像
       dataset = gdal.Open(raster_path, gdal.GA_ReadOnly)
       if dataset == None:
           print('打开图像{0} 失败.\n', raster_path)
       raster_band = dataset.RasterCount
       return raster_band

@staticmethod
   def get_file_size(raster_path):
       """获取栅格仿射变换参数"""
       # 注册栅格驱动
       gdal.AllRegister()
       gdal.SetConfigOption('gdal_FILENAME_IS_UTF8', 'YES')

# 打开输入图像
       dataset = gdal.Open(raster_path, gdal.GA_ReadOnly)
       if dataset == None:
           print('打开图像{0} 失败.\n', raster_path)
       # 列
       raster_width = dataset.RasterXSize
       # 行
       raster_height = dataset.RasterYSize
       return raster_width, raster_height

@staticmethod
   def get_file_geotransform(raster_path):
       """获取栅格仿射变换参数"""
       # 注册栅格驱动
       gdal.AllRegister()
       gdal.SetConfigOption('gdal_FILENAME_IS_UTF8', 'YES')

# 打开输入图像
       dataset = gdal.Open(raster_path, gdal.GA_ReadOnly)
       if dataset == None:
           print('打开图像{0} 失败.\n', raster_path)

# 获取输入图像仿射变换参数
       input_geotransform = dataset.GetGeoTransform()
       return input_geotransform

@staticmethod
   def get_file_proj(raster_path):
       """获取栅格图像空间参考"""
       # 注册栅格驱动
       gdal.AllRegister()
       gdal.SetConfigOption('gdal_FILENAME_IS_UTF8', 'YES')

# 打开输入图像
       dataset = gdal.Open(raster_path, gdal.GA_ReadOnly)
       if dataset == None:
           print('打开图像{0} 失败.\n', raster_path)

# 获取输入图像空间参考
       input_project = dataset.GetProjection()
       return input_project

@staticmethod
   def write_file(dataset, geotransform, project, output_path, out_format='GTiff', eType=gdal.GDT_Float32):
       """写入栅格"""
       if np.ndim(dataset) == 3:
           out_band, out_rows, out_cols = dataset.shape
       else:
           out_band = 1
           out_rows, out_cols = dataset.shape

# 创建指定输出格式的驱动
       out_driver = gdal.GetDriverByName(out_format)
       if out_driver == None:
           print('格式%s 不支持Creat()方法.\n', out_format)
           return

out_dataset = out_driver.Create(output_path, xsize=out_cols,
                                       ysize=out_rows, bands=out_band,
                                       eType=eType)
       # 设置输出图像的仿射参数
       out_dataset.SetGeoTransform(geotransform)

# 设置输出图像的投影参数
       out_dataset.SetProjection(project)

# 写出数据
       if out_band == 1:
           out_dataset.GetRasterBand(1).WriteArray(dataset)
       else:
           for i in range(out_band):
               out_dataset.GetRasterBand(i + 1).WriteArray(dataset[i])
       del out_dataset

定义计算NDVI的函数

def cal_ndvi(multi):
   '''
   计算高分NDVI
   :param multi:格式为列表,依次包含[遥感文件路径,开始行号,开始列号,待读的行数,待读的列数]
   :return: NDVI数组
   '''
   input_file, start_col, start_row, cols_step, rows_step = multi
   dataset = gdal.Open(input_file, gdal.GA_ReadOnly)
   nir_data = GdalUtil.read_block_data(dataset, 4, cols_step, rows_step, start_col=start_col, start_row=start_row)
   red_data = GdalUtil.read_block_data(dataset, 3, cols_step, rows_step, start_col=start_col, start_row=start_row)
   ndvi = (nir_data - red_data) / (nir_data + red_data)
   ndvi[(ndvi > 1.5) | (ndvi < -1)] = 0
   return ndvi

定义主函数

if __name__ == "__main__":
   input_file = r'D:\originalData\GF1\namucuo2021.tif'
   output_file = r'D:\originalData\GF1\namucuo2021_ndvi.tif'
   method = 'joblib'
   # method = 'multiprocessing'
   # 获取文件主要信息
   raster_cols, raster_rows = GdalUtil.get_file_size(input_file)
   geotransform = GdalUtil.get_file_geotransform(input_file)
   project = GdalUtil.get_file_proj(input_file)
   # 定义分块大小
   rows_block_size = 50
   cols_block_size = 50
   multi = []
   for j in range(0, raster_rows, rows_block_size):
       for i in range(0, raster_cols, cols_block_size):
           if j + rows_block_size < raster_rows:
               rows_step = rows_block_size
           else:
               rows_step = raster_rows - j
           # 数据横向步长
           if i + cols_block_size < raster_cols:
               cols_step = cols_block_size
           else:
               cols_step = raster_cols - i
           temp_multi = [input_file, i, j, cols_step, rows_step]
           multi.append(temp_multi)

t1 = time.time()
   if method == 'multiprocessing':
       # multiprocessing方法
       pool = Pool(processes=cpu_count()-1)
       # 注意map函数中传入的参数应该是可迭代对象,如list;返回值为list
       res = pool.map(cal_ndvi, multi)
       pool.close()
       pool.join()
   else:
       # joblib方法
       res = Parallel(n_jobs=-1)(delayed(cal_ndvi)(input_list) for input_list in multi)

t2 = time.time()
   print("Total time:" + (t2 - t1).__str__())

# 将multiprocessing中的结果提取出来,放回对应的矩阵位置中
   out_data = np.zeros([raster_rows, raster_cols], dtype='float')
   for result, input_multi in zip(res, multi):
       start_col = input_multi[1]
       start_row = input_multi[2]
       cols_step = input_multi[3]
       rows_step = input_multi[4]
       out_data[start_row:start_row + rows_step, start_col:start_col + cols_step] = result

GdalUtil.write_file(out_data, geotransform, project, output_file)

双重for循环时,两层for循环都使用multiprocessing时会报错,这时可以外层for循环使用joblib方法,内层for循环改为multiprocessing方法,不会报错

来源:https://blog.csdn.net/u010562884/article/details/122543534

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com