python瓦片图下载/合并/绘图/标记(一)
思路首先我们合并好的瓦片图上只有像素一个计量单位,如果要化gps点上去的话,就要找到一个全局的参考坐标。找到左上角的点坐标。因为切出来的瓦片像素是我们自定义的,我用的是256*256,同时可以获取到瓦片的实际长度和宽度(就是bbox参数/墨卡托投影),由此我们可以算出单位像素对应的实际长度(单位是米)。通过左上角瓦片图的gps可以算出对应的瓦片图坐标,根据瓦片图坐标既可以算出瓦片的左下角坐标和右上
下载代码 仅供参考.瓦片服务是我们自己搭的服务器,参考geoserver。
import io
import math
import multiprocessing
import time
import urllib.request as ur
from threading import Thread
import traceback
import PIL.Image as pil
from pathlib import Path
from numpy.lib.type_check import imag
import sys
# from PIL import Image
# import Image
import cv2
import numpy as np
from osgeo import gdal, osr
from tools import *
# from tqdm import tqdm, trange
# -----------------------------------------------------------
# ---------------------------------------------------------
class Downloader(Thread):
# multiple threads downloader
def __init__(self, index, count, urls, datas):
# index represents the number of threads
# count represents the total number of threads
# urls represents the list of URLs nedd to be downloaded
# datas represents the list of data need to be returned.
super().__init__()
self.urls = urls
self.datas = datas
self.index = index
self.count = count
def download(self, url):
print("下载图片地址",url,)
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36 Edg/88.0.705.68'}
header = ur.Request(url, headers=HEADERS)
err = 1
while err < 11:
try:
data = ur.urlopen(header, timeout=10).read()
except:
traceback.print_exc()
print("下载错误 重新下载...")
err += 1
else:
img = cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
print("最小像素点", np.min(img))
if np.min(img) == 255:
print(f"图片是空白的,开始重新下载第{err}次")
err += 1
else:
return data
raise Exception("网络连接异常.")
def run(self):
for i, url in enumerate(self.urls):
print("下载任务", i, "/", len(self.urls))
tile_x, tile_y, z = url.split("&xyz=")[1].split(",")
# lon, lat = tile2lonlat(tile_x, tile_y, z)
# 如果i除线程总数1取余 不等于 第几个线程 总是0
if i % self.count != self.index:
print("warning!!!跳过该url,记录一下", url)
continue
self.datas[str(tile_x)+"_"+str(tile_y)] = self.download(url)
# ---------------------------------------------------------
# ---------------------------------------------------------
def getExtent(x1, y1, x2, y2, z, source="Google China"):
pos1x, pos1y = wgs_to_tile(x1, y1, z)
pos2x, pos2y = wgs_to_tile(x2, y2, z)
Xframe = pixls_to_mercator(
{"LT": (pos1x, pos1y), "RT": (pos2x, pos1y), "LB": (pos1x, pos2y), "RB": (pos2x, pos2y), "z": z})
for i in ["LT", "LB", "RT", "RB"]:
Xframe[i] = mercator_to_wgs(*Xframe[i])
if source == "Google":
pass
elif source == "ZKLF":
pass
elif source == "Google China":
for i in ["LT", "LB", "RT", "RB"]:
Xframe[i] = gcj_to_wgs(*Xframe[i])
else:
raise Exception("Invalid argument: source.")
return Xframe
def saveTiff(r, g, b, gt, filePath):
fname_out = filePath
driver = gdal.GetDriverByName('GTiff')
# Create a 3-band dataset
dset_output = driver.Create(fname_out, r.shape[1], r.shape[0], 3, gdal.GDT_Byte)
dset_output.SetGeoTransform(gt)
try:
proj = osr.SpatialReference()
proj.ImportFromEPSG(4326)
dset_output.SetSpatialRef(proj)
except:
print("Error: Coordinate system setting failed")
dset_output.GetRasterBand(1).WriteArray(r)
dset_output.GetRasterBand(2).WriteArray(g)
dset_output.GetRasterBand(3).WriteArray(b)
dset_output.FlushCache()
dset_output = None
print("Image Saved tif图片生成成功")
# ---------------------------------------------------------
# ---------------------------------------------------------
MAP_URLS = {
"onwer_server": "http://ip:port/geoserver/ows?service=WMS&version=1.3.0&transparent=true&request=GetMap&layers={style}&width=256&height=256&srs=EPSG%3A3857&format=image/png&bbox={bbox}&xyz={xyz}",
"Google": "http://mts0.googleapis.com/vt?lyrs={style}&x={x}&y={y}&z={z}",
"Google China": "http://mt2.google.cn/vt/lyrs={style}&hl=zh-CN&gl=CN&src=app&x={x}&y={y}&z={z}&s=Galile"}
def tile2lonlat(x, y, z):
"""
@description :切图坐标转lonlat
---------
@param x :瓦片的x轴
@param y :瓦片的y轴
@param z :瓦片的层级
-------
@Returns :[经度, 纬度]
-------
"""
x = float(x)
y = float(y)
n = math.pow(2, int(z))
lon = x / n * 360.0 - 180.0
lat = math.atan(math.sinh(math.pi * (1 - 2 * y / n)))
lat = lat * 180.0 / math.pi
return [lon, lat]
def get_url(source, x, y, z, style): #
if source == 'Google China':
url = MAP_URLS["Google China"].format(x=x, y=y, z=z, style=style)
elif source == 'Google':
url = MAP_URLS["Google"].format(x=x, y=y, z=z, style=style)
elif source == "onwer_server":
min_xy_list = tile2lonlat(x, y + 1, z)
max_xy_list = tile2lonlat(x + 1, y, z)
print("min_xy_list:", min_xy_list)
print("max_xy_list:", max_xy_list)
lng_min, lat_min = min_xy_list[0], min_xy_list[1]
lng_max, lat_max = max_xy_list[0], max_xy_list[1]
# gcj02转wgs84
# lng_min, lat_min = GCJ02_to_WGS84(min_xy_list[0], min_xy_list[1])
lng_min, lat_min = WGS84_to_WebMercator(lng_min, lat_min)
# lng_max, lat_max = GCJ02_to_WGS84(max_xy_list[0], max_xy_list[1])
lng_max, lat_max = WGS84_to_WebMercator(lng_max, lat_max)
bbox = ",".join([str(lng_min), str(lat_min), str(lng_max), str(lat_max)])
xyz = ",".join([str(x), str(y), str(z)])
url = MAP_URLS["ZKLF"].format(bbox=bbox, style=style, xyz=xyz)
else:
raise Exception("Unknown Map Source ! ")
return url
def get_urls(x1, y1, x2, y2, z, source, style):
"""
@description :左上角x1,y1右下角x2,y2
---------
@param :
-------
@Returns :
-------
"""
pos1x, pos1y = wgs_to_tile(x1, y1, z) # 左上角的瓦片图坐标
pos2x, pos2y = wgs_to_tile(x2, y2, z)
print("pos1x, pos1y:", pos1x, pos1y)
print("pos2x, pos2y:", pos2x, pos2y)
lenx = abs(pos2x - pos1x) + 1
leny = abs(pos2y - pos1y) + 1
print("Total tiles number:{x} X {y}".format(x=lenx, y=leny))
print("pos1y, pos1y + leny:", pos1y, pos1y + leny)
print("pos1x, pos1x + lenx:", pos1x, pos1x + lenx)
urls = [get_url(source, i, j, z, style) for j in range(pos1y, pos1y + leny) for i in range(pos1x, pos1x + lenx)]
return urls
# ---------------------------------------------------------
# ---------------------------------------------------------
def merge_tiles(datas, x1, y1, x2, y2, z):
pos1x, pos1y = wgs_to_tile(x1, y1, z)
pos2x, pos2y = wgs_to_tile(x2, y2, z)
lenx = pos2x - pos1x + 1
leny = pos2y - pos1y + 1
outpic = pil.new('RGBA', (lenx * 256, leny * 256))
for i, data in enumerate(datas):
picio = io.BytesIO(data)
small_pic = pil.open(picio)
y, x = i // lenx, i % lenx
outpic.paste(small_pic, (x * 256, y * 256))
print('Tiles merge completed')
return outpic
def download_tiles(urls, multi=1):
url_len = len(urls)
# datas = [None] * url_len
datas = dict()
if multi < 1 or multi > 20 or not isinstance(multi, int):
raise Exception("multi of Downloader shuold be int and between 1 to 20.")
tasks = [Downloader(i, multi, urls, datas) for i in range(multi)]
for i in tasks:
i.start()
for i in tasks:
i.join()
return datas
# ---------------------------------------------------------
# ---------------------------------------------------------
def main(left, top, right, bottom, zoom, filePath, style='s', server="Google China", zone_id=""):
"""
Download images based on spatial extent.
East longitude is positive and west longitude is negative.
North latitude is positive, south latitude is negative.
Parameters
----------
left, top : left-top coordinate, for example (100.361,38.866)
right, bottom : right-bottom coordinate
z : zoom
filePath : File path for storing results, TIFF format
style :
m for map;
s for satellite;
y for satellite with label;
t for terrain;
p for terrain with label;
h for label;
source : Google China (default) or Google
"""
# ---------------------------------------------------------
# Get the urls of all tiles in the extent
urls = get_urls(left, top, right, bottom, zoom, server, style)
print("瓦片图总数:", len(urls))
# Group URLs based on the number of CPU cores to achieve roughly equal amounts of tasks
urls_group = [urls[i:i + math.ceil(len(urls) / 2)] for i in
range(0, len(urls), math.ceil(len(urls) / 2))]
# urls_group = [urls[i:i + math.ceil(len(urls) / multiprocessing.cpu_count())] for i in
# range(0, len(urls), math.ceil(len(urls) / multiprocessing.cpu_count()))]
print(urls_group)
# return False
# Each set of URLs corresponds to a process for downloading tile maps
print('Tiles downloading......瓦片图下载中')
# results = []
pool = multiprocessing.Pool(2)
# pool = multiprocessing.Pool(multiprocessing.cpu_count())
print("cpu_count:", multiprocessing.cpu_count())
print("pool", pool)
results = pool.map(download_tiles, urls_group)
pool.close()
pool.join()
# print("results:", type(results[0]))
image_number = 1
for res in results:
for key in res.keys():
print(f"开始保存第{image_number}张图片")
print("image_name:", key)
x = str(key).split("_")[0]
y = str(key).split("_")[1]
parent_dir = Path(f"D:\\tiles\\{zone_id}\\tiles\\{zoom}\\{x}") # 父级目录
if not parent_dir.exists():
parent_dir.mkdir(exist_ok=True, parents=True)
with open(f"D:\\tiles\\{zone_id}\\tiles\\{zoom}\\{x}\\{y}.png", "wb") as f:
# print(_)
print("sys.getsizeof(res[key]):", sys.getsizeof(res[key]))
f.write(res[key])
image_number += 1
# print(res.get())
# result = [x for j in results for x in j]
print('Tiles download complete 瓦片图 下载成功')
# # break
# Combine downloaded tile maps into one map
# outpic = merge_tiles(result, left, top, right, bottom, zoom)
# outpic = outpic.convert('RGB')
# r, g, b = cv2.split(np.array(outpic))
# # Get the spatial information of the four corners of the merged map and use it for outputting
# extent = getExtent(left, top, right, bottom, zoom, server)
# gt = (extent['LT'][0], (extent['RB'][0] - extent['LT'][0]) / r.shape[1], 0, extent['LT'][1], 0,
# (extent['RB'][1] - extent['LT'][1]) / r.shape[0])
# saveTiff(r, g, b, gt, filePath)
# ---------------------------------------------------------
if __name__ == '__main__':
from sqls import select_zone_info
import json
start_time = time.time()
zone_id = "1547212382202232832"
# main(118.901, 32.066,118.934, 32.109, 18, r'.\Temp\test.tif', server="Google China")
# main(100.361, 38.866, 100.386, 38.839, 18, r'.\Temp\test.tif', server="Google China")
# main(97.376955,37.133309,97.4074156,37.118448, 5, r'.\Temp\test-one.tif', server="onwer_server", style="onwer_server_AREA:onwer_server_super_hd")
# path = [{"lng": 118.909207, "lat": 32.081909}, {"lng": 118.912005, "lat": 32.081703}, {"lng": 118.911776, "lat": 32.081512}, {"lng": 118.909180, "lat": 32.080421}]
left_top_x, left_top_y, right_buttom_x, right_buttom_y = get_zone_gps_max(zone_id)
# print(float(left_top[0]), float(left_top[1]), float(right_buttom[0]), float(right_buttom[1]))
for zoom in range(18, 19):
parent_dir = Path(f"D:\\tiles\\{zone_id}\\tiles\\{zoom}") # 父级目录
if not parent_dir.exists():
parent_dir.mkdir(exist_ok=True, parents=True)
if 1:
main(float(left_top_x),float(left_top_y),float(right_buttom_x),float(right_buttom_y), zoom, r'.\\Temp\\test-one.tif', server="onwer_server", style="onwer_server_AREA:onwer_server_super_hd", zone_id=zone_id)
#main(97.376955,37.133309,97.376955,37.133309, 19, r'.\Temp\test-one.tif', server="Google China")
end_time = time.time()
print('lasted a total of {:.2f} seconds'.format(end_time - start_time))
2.合并瓦片图 谷歌的瓦片图长这样.瓦片图是金字塔类型的,这里就不多做解释了.
合并代码 就是创建一个大的画布,然后把下载好的瓦片图一张张的贴上去,没有难度
import glob
import re
from PIL import Image
files = glob.glob('D:\\tiles\\1547212382202232832\\tiles\\17\\*\\*.png')
# print(re.findall(r"\d+", files[0])[-2:])
# print(tuple(int(i) for i in re.findall(r"\d+", files[0])[-2:]))
files.sort(key=lambda x: tuple(int(i) for i in re.findall(r"\d+", x)[-2:]))
# print(files)
imagefiles = {}
for item in files:
match = re.findall(r'\d+', item)
# print(match[-2])
pre = int(match[-2])
if not imagefiles.get(pre):
imagefiles[pre] = []
imagefiles[pre].append(item)
imagefiles = sorted(zip(imagefiles.keys(), imagefiles.values()))
print(imagefiles)
total_width = len(imagefiles) * 256
total_height = len(imagefiles[0][1]) * 256
new_image = Image.new('RGB', (total_width, total_height))
x_offset = 0
for item in imagefiles:
y_offset = 0
# print(item[1])
images = map(Image.open, item[1])
# print(list(images))
for subitem in images:
new_image.paste(subitem, (x_offset, y_offset))
y_offset += subitem.size[0]
_x = subitem.size[0]
# print(list(images))
x_offset += _x
new_image.save('merge.jpg', quality=90)
3.在合并好的瓦片图上绘制我们的mark点和多边形。
思路:首先我们合并好的瓦片图上只有像素一个计量单位,如果要化gps点上去的话,就要找到一个全局的参考坐标。最好的参考坐标就是左上角第一块瓦片。找到左上角的点坐标。因为切出来的瓦片像素是我们自定义的,我用的是256*256,同时可以获取到瓦片的实际长度和宽度(就是bbox参数/墨卡托投影),由此我们可以算出单位像素对应的实际长度(单位是米)。 接下来我们只需要找到左上角第一块瓦片的左上角的点坐标即可。 通过左上角瓦片图的gps可以算出对应的瓦片图坐标,根据瓦片图坐标既可以算出 瓦片的左下角坐标和右上角坐标,既得左上角坐标。再转成墨卡托投影即可作为全局的参考坐标了。 ps:瓦片的范围用最小外接矩形来算。
def get_zone_gps_max(zone_id):
path_info = select_zone_info(zone_id)
path = json.loads(path_info[0]["path"])
path = [list(WGS84_to_WebMercator(_["lng"], _["lat"])) for _ in path]
print(path)
# left_top, right_buttom = get_maxarea_box(path)
min_x, max_x, min_y, max_y = get_maxarea_box(path)
print("右下→左下→左上→右上:", min_x, max_x, min_y, max_y)
left_top_x, left_top_y = WebMercator_to_WGS84(min_x, max_y)
right_buttom_x, right_buttom_y = WebMercator_to_WGS84(max_x, min_y)
return (float(left_top_x), float(left_top_y), float(right_buttom_x), float(right_buttom_y))
def get_first_tile_poi(zone_id, z=17):
"""
@description :获取第一块瓦片图的坐标
---------
@param :
-------
@Returns :左下角和右上角
-------
"""
left_top_x, left_top_y, right_buttom_x, right_buttom_y = get_zone_gps_max(zone_id)
pos1x, pos1y = wgs_to_tile(left_top_x, left_top_y, z) # 左上角的瓦片图坐标
min_xy_list = tile2lonlat(pos1x, pos1y + 1, z)
max_xy_list = tile2lonlat(pos1x + 1, pos1y, z)
lng_min, lat_min = min_xy_list[0], min_xy_list[1]
lng_max, lat_max = max_xy_list[0], max_xy_list[1]
# gcj02转wgs84
# lng_min, lat_min = GCJ02_to_WGS84(min_xy_list[0], min_xy_list[1])
lng_min, lat_min = WGS84_to_WebMercator(lng_min, lat_min)
# lng_max, lat_max = GCJ02_to_WGS84(max_xy_list[0], max_xy_list[1])
lng_max, lat_max = WGS84_to_WebMercator(lng_max, lat_max)
bbox = [lng_min, lat_min, lng_max, lat_max]
return bbox
有疑问或者错误的地方 可以留言讨论 互相学习

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)