首页 > 解决方案 > Google Composer - 如何在环境中安装 Microsoft SQL Server ODBC 驱动程序

问题描述

我是 GCP 和 Airflow 的新手,正在尝试通过 python 3 的简单 PYODBC 连接运行我的 python 管道。但是,我相信我已经找到了需要在机器上安装的内容 [Microsoft doc] https://docs.microsoft .com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017,但我不知道该去哪里GCP 来运行这些命令。我已经钻了几个深洞寻找答案,但不知道如何解决问题

这是我在上传 DAG 时一直看到的错误:

气流错误

这是 PYODBC 连接:

pyodbc.connect('DRIVER={Microsoft SQL Server};SERVER=servername;DATABASE=dbname;UID=username;PWD=password')

当我在环境中打开我的 gcloud shell 并运行 Microsoft 下载时,它会中止,当我下载 SDK 并从本地下载连接到项目时,它会自动中止或无法识别来自 Microsoft 的命令。谁能给出一些关于从哪里开始以及我做错了什么的简单说明?

标签: sql-servergoogle-cloud-platformairflowgoogle-cloud-composer

解决方案


这很简单 !不需要DockerFile、KubernetesPodOperator、LD_LIBRARY_PATH 等,只需一个基本的python 操作符即可

需要考虑的要点

  • GCP Composer Worker 的 Pod 映像是 ubuntu 1604(只需使用命令 os.system('cat /etc/os-release') 运行一个基本的 python 操作符来检查)
  • 它已经在 worker 的 pod 映像上安装了 unixodbc-dev
  • Composer 创建桶并用气流安装它
  • 那么为什么不直接从 pypi 包中安装 pyodbc 并在 pyodbc 连接方法中提供 mssql odbc 驱动程序作为参数

这里 'gs://bucket_created_by_composer' == '/home/airflow/gcs'

gcs bucket created by composer ->
          -> data/
          -> dags/

循序渐进的方法

步骤 1:在任何 ubuntu 实例上安装 pyodbc、mssql odbc 以获取驱动程序文件

考虑一下,让我们在 GCP VM Intance 上使用 ubuntu 1804 映像进行操作

#update the packages
sudo apt update
sudo apt-get update -y
curl https://packages.microsoft.com/keys/microsoft.asc | sudo apt-key add -
curl https://packages.microsoft.com/config/ubuntu/18.04/prod.list | sudo tee /etc/apt/sources.list.d/msprod.list
sudo apt-get update -y
echo Installing mssql-tools and unixODBC developer...
sudo ACCEPT_EULA=Y apt-get install -y mssql-tools unixodbc-dev
sudo apt-get update -y
sudo apt-get install  -y mssql-tools #it includes sql_cmd and bcp (we dont need those)
sudo apt install python3-pip #installing pip3
pip3 install pyodbc 

第二步:获取Driver Files并上传到composer创建的gcs_bucket的data文件夹中

cd /opt/microsoft
#now you can see there is one directory 'msodbcsql17', version may change
#we need to upload this directory to the data folder of gcs_bucket

#for this you may choose which ever approach suits you
#copying the directory to /<home/user> for proper zipping/uploading to gcs
cp -r msodbcsql17 /home/<user> #you may need to use sudo 
#upload this /home/<user>/msodbcsql17 to any gcs_bucket 
gsutil cp -r /home/<user>/msodbcsql17 gs://<your-gcs-bucket>

将此文件夹从 gcs 存储桶下载到本地,并将此文件夹上传到由 composer 创建的 gcs 存储桶的数据文件夹

选择任何方法/方法,主要目的是获取composer创建的gcs存储桶的数据文件夹中的msodbcsql17文件夹

最终结构:

gcs bucket created by composer ->
          -> data/msodbcsql17/
          -> dags/<your_dags.py>

第 3 步:使用此 msodbcsql17 驱动程序进行 pyodbc 连接

示例 DAG:

import os
import time
import datetime
import argparse
import json
from airflow import DAG
import airflow

from airflow.operators import python_operator


default_dag_args = {
    'start_date': airflow.utils.dates.days_ago(0), #
    'provide_context': True
}



dag = DAG(
        'pyodbc_test',
        schedule_interval=None, #change for composer
        default_args=default_dag_args
        )


def check_connection(**kwargs):
    print('hello')
    driver='/home/airflow/gcs/data/msodbcsql17/lib64/libmsodbcsql-17.5.so.2.1'
    #this is the main driver file, the exact location can be found on gcs_bucket/data folder or check the /etc/odbcinst.in file of ubuntu instance in which you installed the pyodbc earlier

    def tconnection(ServerIp,LoginName,Password,mssql_portno):
        """ A method which return connection object"""
        import pyodbc
        pyodbc.pooling = False 
        try:   
            sql_conn = pyodbc.connect("DRIVER={4};SERVER={0},{1};UID={2};PWD={3}".format(ServerIp,mssql_portno,LoginName,Password,driver)) 
        except pyodbc.Error as ex:
            sqlstate = ex.args[1]
            raise


        return sql_conn

    con=tconnection('<your-server-ip>','<your-login-name>','<your-password>','1433')
    #recommendation is to take the password and login from airflow connections
    import pandas as pd
    q='select * from <your-db-name>.<your-schema-name>.<your-table-name>'
    df=pd.read_sql(q,con)
    print(df)

Tcheck_connection= python_operator.PythonOperator(
        task_id='Tcheck_connection',
        python_callable=check_connection,
        dag=dag ) 


#calling the task sequence
Tcheck_connection 

PYPI 包

pyodbc
pandas

最近在 Composer 上测试过


推荐阅读