Use third party/custom library in Snowpark procedure
Usually Python libraries required by Snowpark procedure code will be installed by Snowflake automatically onto backend Python engine/container where the Snowpark procedure or UDF runs. This is via Anaconda support in Snowflake.
We need to manually enable Anaconda in advance.
Following example code shows how to specify requested libraries in Snowpark procedure using “PACKAGES = (‘
create or replace table pay (job_name text, annual_wage float, country text);
insert into pay values('Data Engineer', 90000, 'CA'),
('Software Engineer',110000,'CA'),
('Software Engineer',100000, 'US'),
('Data Engineer', 120000, 'US');
create or replace procedure query_wage_from_pay_table (job_name text, country_name text)
returns text
language python
runtime_version=3.8
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'run'
AS
$$
import pandas as pd
def run(session, job_name, country_name):
sf_df_results = pd.DataFrame(session.sql(f'''select annual_wage from pay
where country='{country_name}' and job_name='{job_name}'
''').collect()).iloc[0,0]
return sf_df_results
$$;
call query_wage_from_pay_table('Data Engineer', 'CA');
Note, specifying version like this:
PACKAGES = ('snowflake-snowpark-python', 'pandas==1.5.3')
List Python packages/versions that can be used in Snowpark procedure:
select * from information_schema.packages where (package_name = 'pandas' and language = 'python');
What if we need a Python library which is not in Anaconda repos, aka third party library?
Not from built-in Python runtime(sys, os etc.), not from Anaconda(numpy, pandas etc.)…from the third party.
Use case 1: The library is written in Python
It can be a home grown Python code or from pypi. We can put library source code file to Snowflake stage and tell procedure to load the source file to a designated location in Python engine container via “IMPORTS”.
Use pycountry library as example,
1> Download pycountry source zip file from github https://github.com/flyingcircusio/pycountry
2> Put the zip file to Snowflake internal stage
c:\tmp>snowsql -a mqb56970.us-east-1 -u fengliplatform
fengliplatform#COMPUTE_WH@(no database).(no schema)>use FENGDB;
fengliplatform#COMPUTE_WH@FENGDB.PUBLIC>put file:///tmp/pycountry-main.zip @feng_stage auto_compress=false;
+--------------------+--------------------+-------------+-------------+--------------------+--------------------+----------+---------+
| source | target | source_size | target_size | source_compression | target_compression | status | message |
|--------------------+--------------------+-------------+-------------+--------------------+--------------------+----------+---------|
| pycountry-main.zip | pycountry-main.zip | 6184051 | 6184064 | NONE | NONE | UPLOADED | |
+--------------------+--------------------+-------------+-------------+--------------------+--------------------+----------+---------+
1 Row(s) produced. Time Elapsed: 6.250s
3> Snowpark procedure code
create or replace table pay (job_name text, annual_wage float, country text);
insert into pay values('Data Engineer', 90000, 'CANADA'),
('Software Engineer',110000,'CANADA'),
('Software Engineer',100000, 'United States'),
('Data Engineer', 120000, 'United States');
create or replace procedure query_country_code (job_name text, wage float)
returns text
language python
runtime_version=3.8
PACKAGES = ('snowflake-snowpark-python', 'pandas')
imports = ('@feng_stage/pycountry-main.zip')
HANDLER = 'run'
AS
$$
import sys
import zipfile
import pandas as pd
import_dir = sys._xoptions.get("snowflake_import_directory")
target_dir = "/tmp/pycountry_src_package"
with zipfile.ZipFile(import_dir + "pycountry-main.zip", 'r') as pycountry_zip_file:
pycountry_zip_file.extractall(target_dir)
sys.path.append(target_dir + '/pycountry-main/src')
import pycountry
def run(session, job_name, annual_wage):
sf_df_country = pd.DataFrame(session.sql(f'''select country from pay
where annual_wage='{annual_wage}' and job_name='{job_name}'
''').collect()).iloc[0,0]
return str(pycountry.countries.search_fuzzy(sf_df_country)[0].alpha_2)
$$;
call query_country_code('Data Engineer', 90000);
1> Sample code searched pay table to find country name “CANADA” and called pycountry library to get country code “CA”.
2> We need to allocate pycountry source zip file from Snowflake procedure variable “snowflake_import_directory” and extract it in our procedure code.
3> Also need to add this extracted location to Python path using sys.path.append so our import line would work.
Use case 2: The library is NOT written in Python
For example, MySQL connector library. Same to other libraries you are interested.
We’ll have to download binary version of MySQL connector and feed it to Snowpark procedure like we did before.
But what binary version can run on Snowflake Python engine container runtime?
We know it’s a Python 3.8 Linux env. So we’ll try our best to install MySQL connector binary compiled for “manylinux” platform hoping it work in the Python engine container.
(snowpark) C:\tmp>pip install --platform manylinux2010_x86_64 --implementation cp --python 3.8 --only-binary=:all: --upgrade --target mysql_connector_lib mysql-connector-python
Collecting mysql-connector-python
Downloading mysql_connector_python-8.0.32-cp38-cp38-manylinux1_x86_64.whl (23.5 MB)
---------------------------------------- 23.5/23.5 MB 4.4 MB/s eta 0:00:00
Collecting protobuf<=3.20.3,>=3.11.0
Downloading protobuf-3.20.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
---------------------------------------- 1.0/1.0 MB 3.1 MB/s eta 0:00:00
Installing collected packages: protobuf, mysql-connector-python
Successfully installed mysql-connector-python-8.0.32 protobuf-3.20.3
(snowpark) C:\tmp>ls mysql_connector_lib
_mysql_connector.cpython-38-x86_64-linux-gnu.so mysql_connector_python-8.0.32.dist-info
_mysqlxpb.cpython-38-x86_64-linux-gnu.so mysqlx
google protobuf-3.20.3-py3.8-nspkg.pth
mysql protobuf-3.20.3.dist-info
(snowpark) C:\tmp>ls mysql_connector_lib/mysql
__init__.py __pycache__ connector vendor
We installed binary version of “mysql-connector-python” for manylinux2010_x86_64 platform and Python 3.8 runtime to a local directory mysql_connector_lib.
Want to know more about Python library binary packages/wheel files? Checkout another post in which we’ve done similar for AWS Lambda runtime.
Now, zip it up to be mysql_connector_lib.zip and put to Snowflake stage.
fengliplatform#COMPUTE_WH@FENGDB.PUBLIC>put file:///tmp/mysql_connector_lib.zip @feng_stage auto_compress=false;
+-------------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------+
| source | target | source_size | target_size | source_compression | target_compression | status | message |
|-------------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------|
| mysql_connector_lib.zip | mysql_connector_lib.zip | 24386927 | 24386928 | NONE | NONE | UPLOADED | |
+-------------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------+
1 Row(s) produced. Time Elapsed: 44.197s
Test out following code…
create or replace procedure test_mysql_connector ()
returns text
language python
runtime_version=3.8
PACKAGES = ('snowflake-snowpark-python', 'pandas')
imports = ('@feng_stage/mysql_connector_lib.zip')
HANDLER = 'run'
AS
$$
import sys
import zipfile
import pandas as pd
import_dir = sys._xoptions.get("snowflake_import_directory")
target_dir = "/tmp/mysql_connector"
with zipfile.ZipFile(import_dir + "mysql_connector_lib.zip", 'r') as mysql_connector_zip_file:
mysql_connector_zip_file.extractall(target_dir)
sys.path.append(target_dir + '/mysql_connector_lib')
import mysql.connector
def run(session):
return "success"
$$;
call test_mysql_connector();
Got success without complaining “module not found”, great!
So in next post, we may want to use Snowpark procedure read data from MySQL and save them to Snowflake…
But wait, procedure code runs on Snowflake Python engine container which has NO internet connection by design!
That also means if you want to load data from your database to Snowflake directly you can not use Snowflake procedure for now.
To do so we are encouraged to use payed ETL tools like Informatica, Fivetran, Rivery etc. OR export database to S3 first and ingest to Snowflake after.
Two related previous posts if you’re interested:
- Ingesting Data Into Snowflake (5): Snowflake Partner Connect
- Ingesting data into Snowflake (3): from PostgreSQL Database
Use case 3: Use platform independent wheel file
For example for Apache Tika which parses files for metadata, it builds none platform dependent wheel file (tika-2.6.0-py3-none-any.whl)when doing “pip install tika” on a Windows laptop.
(snowpark) C:\Users\fengli\snowflake\mysql-snowflake>pip install tika
Collecting tika
Downloading tika-2.6.0.tar.gz (27 kB)
Preparing metadata (setup.py) ... done
...
Building wheels for collected packages: tika
Building wheel for tika (setup.py) ... done
Created wheel for tika: filename=tika-2.6.0-py3-none-any.whl size=32624 sha256=3712e903e29dc18b0055731d345f1f0d9856cbd057f38ae5417e5862b036fe63
Stored in directory: c:\users\fengli\appdata\local\pip\cache\wheels\02\bd\74\313abcb9271e041e30734880e6813385150dd93627e9659de5
Successfully built tika
Installing collected packages: tika
Successfully installed tika-2.6.0
This wheel file can then be used in Snowpark Python engine which runs as Linux container. Put this wheel file to Snowflake stage and refer it in procedure using ‘imports’.
create or replace procedure tika_sp ()
returns text
language python
runtime_version=3.8
PACKAGES = ('snowflake-snowpark-python', 'requests')
imports = ('@feng_stage/tika-2.6.0-py3-none-any.whl')
HANDLER = 'run'
execute as owner
AS
$$
import sys
import_dir = sys._xoptions.get("snowflake_import_directory")
sys.path.append(import_dir + '/tika-2.6.0-py3-none-any.whl')
import tika
from tika import parser
def run(session):
parsed = parser.from_file(import_dir + '/tika-2.6.0-py3-none-any.whl')
return(parsed["metadata"])
$$;
call tika_sp();
1> import tika will succeeded using this wheel file. This is the point of use case 3.
2> call procedure will fail with below errors — this is related to how Tika works: it looks for tika-server.jar file from Maven repo over internet. Snowpark procedure/UDF won’t have internet access from backend Python engine container where they run. This is Snowpark by design for security reasons.
...
File "/home/udf/374400811345//tika-2.6.0-py3-none-any.whl/tika/tika.py", line 825, in checkPortIsOpen
remoteServerIP = socket.gethostbyname(remoteServerHost)
socket.gaierror: [Errno -3] Temporary failure in name resolution
in function TIKA_SP with handler run
Appendix:
1 Multiple ways to specify PACKAGES, IMPORTS for Snowpark procedures/UDFs
1> Like in this post, in DDL from Snowflake worksheet.
2> In Snowpark application file at session level
3> In Snowpark application file at procedure/UDF level
(Check this post for examples of 2> and 3>)
2 No Java support in Python Engine container for UDF/procedure
If your Python library has dependency on Java (Apache Tika again) it can not be used in UDF/procedure for now.
Happy Reading!