[dependencies] polars = { version = "0.42", features = ["lazy","dtype-struct","dtype-array","polars-io","dtype-datetime","dtype-date","range","temporal","rank","serde","csv","ndarray","parquet","strings","list_eval"] } rand = "0.8.5" chrono = "0.4.38" serde_json = "1.0.124" itertools = "0.13"
#![allow(warnings,dead_code, unused,unused_imports, unused_variables, unused_mut)] use aggregations::agglist; use polars::prelude::*; use std::time::instant; use serde_json::*; use chrono::{naivedate}; fn main(){ //create_df_by_series(); //create_df_by_df_macro(); //df_apply(); // 需要把相关函数放在里面即可,这里不一一列示。 //df_to_vec_tuples_by_izip(); //write_read_parquet_files(); //date_to_str_in_column(); //str_to_datetime_date_cast_in_df(); //create_list_in_df_by_apply(); //unnest_struct_in_df(); //as_struct_in_df(); //struct_apply_in_df(); //create_list_in_df(); //structs_in_df(); //df_to_structs_by_zip(); //df_to_structs_by_iter_version_0_4_2(); //create_list_in_df(); eval_in_df(); } fn create_df_by_series(){ println!("------------- create_df_by_series test ---------------- "); let s1 = series::new("from vec", vec![4, 3, 2]); let s2 = series::new("from slice", &[true, false, true]); let s3 = series::new("from array", ["rust", "go", "julia"]); let df = dataframe::new(vec![s1, s2, s3]).unwrap(); println!("{:?}", &df); } fn create_df_by_df_macro(){ println!("------------- create_df_by_macro test ---------------- "); let df1: dataframe = df!("d1" => &[1, 3, 1, 5, 6],"d2" => &[3, 2, 3, 5, 3]).unwrap(); let df2 = df1 .lazy() .select(&[ col("d1").count().alias("total"), col("d1").filter(col("d1").gt(lit(2))).count().alias("d1 > 3"), ]) .collect() .unwrap(); println!("{}", df2); } fn rank(){ println!("------------- rank test ---------------- "); // 注意:toml => feature : rank let mut df = df!( "scores" => ["a", "a", "a", "b", "c", "b"], "class" => [1, 2, 3, 4, 2, 2] ).unwrap(); let df = df .clone().lazy() .with_column(col("class") .rank(rankoptions{method: rankmethod::ordinal, descending: false}, none) .over([col("scores")]) .alias("rank_") ).sort_by_exprs([col("scores"), col("class"), col("rank_")], default::default()) ; println!("{:?}", df.collect().unwrap().head(some(3))); } fn head_tail_sort(){ println!("------------------head_tail_sort test-------------------"); let df = df!( "scores" => ["a", "b", "c", "b", "a", "b"], "class" => [1, 3, 1, 1, 2, 3] ).unwrap(); let head = df.head(some(3)); let tail = df.tail(some(3)); // 对value列进行sort,生成新的series,并进行排序 let sort = df.lazy().select([col("class").sort(default::default())]).collect(); println!("df head :{:?}",head); println!("df tail:{:?}",tail); println!("df sort:{:?}",sort); } fn filter_group_by_agg(){ println!("----------filter_group_by_agg test--------------"); use rand::{thread_rng, rng}; let mut arr = [0f64; 5]; thread_rng().fill(&mut arr); let df = df! ( "nrs" => &[some(1), some(2), some(3), none, some(5)], "names" => &[some("foo"), some("ham"), some("spam"), some("eggs"), none], "random" => &arr, "groups" => &["a", "a", "b", "c", "b"], ).unwrap(); let df2 = df.clone().lazy().filter(col("groups").eq(lit("a"))).collect().unwrap(); println!("df2 :{:?}",df2); println!("{}", &df); let out = df .lazy() .group_by([col("groups")]) .agg([ sum("nrs"), // sum nrs by groups col("random").count().alias("count"), // count group members // sum random where name != null col("random") .filter(col("names").is_not_null()) .sum() .name() .suffix("_sum"), col("names").reverse().alias("reversed names"), ]) .collect().unwrap(); println!("{}", out); } fn filter_by_exclude(){ println!("----------filter_by_exclude----------------------"); let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let lst = df["date"].as_list().slice(1,1); println!("s :{:?}",lst); // 下面all() 可以用col(*)替代; let df_filter = df.lazy().select([all().exclude(["code","date"])]).collect().unwrap(); println!("df_filter :{}",df_filter); } fn windows_over(){ println!("------------- windows_over test ---------------- "); let df = df!( "key" => ["a", "a", "a", "a", "b", "c"], "value" => [1, 2, 1, 3, 3, 3] ).unwrap(); // over()函数:col("value").min().over([col("key")]),表示:请根据col("key")进行分类,再对分类得到的组求最小值操作; let df = df .clone().lazy() .with_column(col("value") .min() // .max(), .mean() .over([col("key")]) .alias("over_min")) .with_column(col("value").max().over([col("key")]).alias("over_max")); println!("{:?}", df.collect().unwrap().head(some(10))); } //read_csv fn lazy_read_csv(){ println!("------------- lazy_read_csv test ---------------- "); // features => lazy and csv // 请根据自己文件情况进行设置 let filepath = "../my_duckdb/src/test.csv"; // csv数据格式 // 600036.xshg,2079/7/24,3345.9,3357.8,3326.7,3357,33589,69181710.57,1 // 600036.xshg,2079/7/25,3346,3357.9,3326.8,3357.1,33590,69184251.47,1 let polars_lazy_csv_time = instant::now(); let p = lazycsvreader::new(filepath) .with_try_parse_dates(true) //需要增加available on crate feature temporal only. .with_has_header(true) .finish().unwrap(); let df = p.collect().expect("error to dataframe!"); println!("polars lazy 读出csv的行和列数:{:?}",df.shape()); println!("polars lazy 读csv 花时: {:?} 秒!", polars_lazy_csv_time.elapsed().as_secs_f32()); } fn read_csv(){ println!("------------- read_csv test ---------------- "); // features => polars-io use std::fs::file; let csv_time = instant::now(); let filepath = "../my_duckdb/src/test.csv"; // csv数据格式 // 600036.xshg,2079/7/24,3345.9,3357.8,3326.7,3357,33589,69181710.57,1 // 600036.xshg,2079/7/25,3346,3357.9,3326.8,3357.1,33590,69184251.47,1 let file = file::open(filepath) .expect("could not read file"); let df = csvreader::new(file).finish().unwrap(); //println!("df:{:?}",df); println!("读出csv的行和列数:{:?}",df.shape()); println!("读csv 花时: {:?} 秒!",csv_time.elapsed().as_secs_f32()); } fn read_csv2(){ println!("------------- read_csv2 test ---------------- "); // features => polars-io // 具体按自己目录路径下的文件 let filepath = "../my_duckdb/src/test.csv"; //请根据自已文件情况进行设置 // csv数据格式 // 600036.xshg,2079/7/24,3345.9,3357.8,3326.7,3357,33589,69181710.57,1 // 600036.xshg,2079/7/25,3346,3357.9,3326.8,3357.1,33590,69184251.47,1 let df = csvreadoptions::default() .with_has_header(true) .try_into_reader_with_file_path(some(filepath.into())).unwrap() .finish().unwrap(); println!("read_csv2 => df {:?}",df) } fn parse_date_csv(){ println!("------------- parse_date_csv test ---------------- "); // features => polars-io let filepath = "../my_duckdb/src/test.csv"; // 读出csv,并对csv中date类型进行转换 // csv数据格式 // 600036.xshg,2019/7/24,3345.9,3357.8,3326.7,3357,33589,69181710.57,1 // 600036.xshg,2019/7/25,3346,3357.9,3326.8,3357.1,33590,69184251.47,1 let df = csvreadoptions::default() .map_parse_options(|parse_options| parse_options.with_try_parse_dates(true)) .try_into_reader_with_file_path(some(filepath.into())) .unwrap() .finish() .unwrap(); println!("{}", &df); } fn write_csv_df(){ println!("----------- write_csv_df test -------------------------"); // toml features => csv // features => polars-io let mut df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let mut file = std::fs::file::create("600036sh.csv").unwrap(); csvwriter::new(&mut file).finish(&mut df).unwrap(); } fn iter_dataframe_as_row() { println!("------------- iter_dataframe_as_row test ---------------- "); let starttime = instant::now(); let df: dataframe = df!("d1" => &[1, 3, 1, 5, 6],"d2" => &[3, 2, 3, 5, 3]).unwrap(); let (_row,_col) = df.shape(); for i in 0.._row{ let mut rows = vec::new(); for j in 0.._col{ let value = df[j].get(i).unwrap(); rows.push(value); } } println!("dataframe按行遍历cost time :{:?} seconds!",starttime.elapsed().as_secs_f32()); } fn join_concat(){ println!("------------- join_concat test ---------------- "); // 创建表结构,内部有空数据 let df = df! [ // 表头 对应数据 "model" => ["iphone xs", "iphone 12", "iphone 13", "iphone 14", "samsung s11", "samsung s12", "mi a1", "mi a2"], "company" => ["apple", "apple", "apple", "apple", "samsung", "samsung", "xiao mi", "xiao mi"], "sales" => [80, 170, 130, 205, 400, 30, 14, 8], "comment" => [none, none, some("sold out"), some("new arrival"), none, some("sold out"), none, none], ].unwrap(); let df_price = df! [ "model" => ["iphone xs", "iphone 12", "iphone 13", "iphone 14", "samsung s11", "samsung s12", "mi a1", "mi a2"], "price" => [2430, 3550, 5700, 8750, 2315, 3560, 980, 1420], "discount" => [some(0.85), some(0.85), some(0.8), none, some(0.87), none, some(0.66), some(0.8)], ].unwrap(); // 合并 // join()接收5个参数,分别是:要合并的dataframe,左表主键,右表主键,合并方式 let df_join = df.join(&df_price, ["model"], ["model"], joinargs::from(jointype::inner)).unwrap(); println!("{:?}", &df_join); let df_v1 = df!( "a"=> &[1], "b"=> &[3], ).unwrap(); let df_v2 = df!( "a"=> &[2], "b"=> &[4], ).unwrap(); let df_vertical_concat = concat( [df_v1.clone().lazy(), df_v2.clone().lazy()], unionargs::default(), ).unwrap() .collect().unwrap(); println!("{}", &df_vertical_concat); } fn get_slice_scalar_from_df(){ println!("------------- get_slice_scalar_from_df test ---------------- "); let df: dataframe = df!("d1" => &[1, 2, 3, 4, 5],"d2" => &[3, 2, 3, 5, 3]).unwrap(); // slice(1,4): 从第2行开始(包含),各列向下共取4行 let slice = &df.slice(1,4); println!("slice :{:?}",&slice); // 获取第2列第3个值的标量 let scalar = df[1].get(3).unwrap(); println!("saclar :{:?}",scalar); } fn replace_drop_col(){ println!("------------- replace_drop_col test ---------------- "); // toml :features => replace let mut df: dataframe = df!("d1" => &[1, 2, 3, 4, 5],"d2" => &[3, 2, 3, 5, 3]).unwrap(); let new_s1 = series::new("", &[2,3,4,5,6]); // ""为名字不变; // d1列进行替换 let df2 = df.replace("d1", new_s1).unwrap(); // 删除d2列 let df3 = df2.drop_many(&["d2"]); println!("df3:{:?}",df3); } fn drop_null_fill_null(){ println!("------------- drop_null_fill_null test ---------------- "); let df: dataframe = df!("d1" => &[none, some(2), some(3), some(4), none],"d2" => &[3, 2, 3, 5, 3]).unwrap(); // 取当前列第一个非空的值填充后面的空值 let df2 = df.fill_null(fillnullstrategy::forward(none)).unwrap(); // forward(option):向后遍历,用遇到的第一个非空值(或给定下标位置的值)填充后面的空值 // backward(option):向前遍历,用遇到的第一个非空值(或给定下标位置的值)填充前面的空值 // mean:用算术平均值填充 // min:用最小值填充 // max: 用最大值填充 // zero:用0填充 // one:用1填充 // maxbound:用数据类型的取值范围的上界填充 // minbound:用数据类型的取值范围的下界填充 println!("fill_null :{:?}", df2); // 删除d1列中的none值 let df3 = df2.drop_nulls(some(&["d1"])).unwrap(); println!("drop_nulls :{:?}",df3); } fn compute_return(){ println!("-----------compute_return test -----------------------"); let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let _df = df .clone() .lazy() .with_columns([(col("close")/col("close").first()-lit(1.0)).alias("ret")]) .collect().unwrap(); println!("_df :{}",_df) } fn standardlize_center(){ println!("------------- standardlize_center test ---------------- "); let df: dataframe = df!("d1" => &[1, 2, 3, 4, 5],"d2" => &[3, 2, 3, 5, 3]).unwrap(); // 进行标准化:对所有的列,每个值除以本列最大值 // cast(): 由int =>float64 let standardization = df.lazy().select([col("*").cast(datatype::float64) / col("*").cast(datatype::float64).max()]); // 对于标准化后的列,进行中心化 let center = standardization .select([col("*") - col("*").mean()]) .collect() .unwrap(); println!("standardlize : {:?}",center); } fn create_list_in_df_by_apply(){ println!("----------creat_list_in_df_by_apply test ------------------------"); let df = df!( "lang" => &["go","rust", "go", "julia","julia","rust","rust"], "users" => &[223,1032, 222, 42,1222,3213,4445], "year" =>&["2020","2021","2022","2023","2024","2025","2026"] ).unwrap(); println!("df :{}",df); let out = df .clone() .lazy() .group_by([col("lang")]) .agg([ col("users") .apply(|s| { let v = s.i32().unwrap(); let out = v .into_iter() .map(|v| match v { some(v_) => v_ , _ => 0 }) .collect::<vec<i32>>(); ok(some(series::new("_", out))) }, getoutput::default()) .alias("aggr_vec"), ]) //.with_column(col("aggr_sum").list().alias("aggr_sum_first")) .collect() .unwrap(); println!("{}", out); } fn create_struct_in_df_by_apply(){ println!("-----------------create_struct_in_df_by_apply test -------------------------"); // toml features => "dtype-struct" use polars::prelude::*; let df = df!( "keys" => &["a", "a", "b"], "values" => &[10, 7, 1], ).unwrap(); let out = df .clone() .lazy() .with_column(col("values").apply( |s| { let s = s.i32()?; let out_1: vec<option<i32>> = s.into_iter().map(|v| match v { some(v_) => some(v_ * 10), _ => none, }).collect(); let out_2: vec<option<i32>> = s.into_iter().map(|v| match v { some(v_) => some(v_ * 20), _ => none, }).collect(); let out = df! ( "v1" => &out_1, "v2" => &out_2, ).unwrap() .into_struct("vals") .into_series(); ok(some(out)) }, getoutput::default())) .collect() .unwrap(); println!("{}", out); } fn field_value_counts(){ println!("--------------field_value_counts test---------------"); let ratings = df!( "movie"=> &["cars", "it", "et", "cars", "up", "it", "cars", "et", "up", "et"], "theatre"=> &["ne", "me", "il", "nd", "ne", "sd", "ne", "il", "il", "sd"], "avg_rating"=> &[4.5, 4.4, 4.6, 4.3, 4.8, 4.7, 4.7, 4.9, 4.7, 4.6], "count"=> &[30, 27, 26, 29, 31, 28, 28, 26, 33, 26], ).unwrap(); println!("{}", &ratings); let out = ratings .clone() .lazy() .select([col("theatre").value_counts(true, true, "count".to_string(), false)]) .collect().unwrap(); println!("{}", &out); } // 宏 macro_rules! structs_to_dataframe { ($input:expr, [$($field:ident),+]) => { { // extract the field values into separate vectors $(let mut $field = vec::new();)* for e in $input.into_iter() { $($field.push(e.$field);)* } df! { $(stringify!($field) => $field,)* } } }; } macro_rules! dataframe_to_structs_todo { ($df:expr, $structname:ident,[$($field:ident),+]) => { { // 把df 对应的fields =>vec<structname>, let mut vec:vec<$structname> = vec::new(); vec } }; } fn df_to_structs_by_macro_todo(){ println!("---------------df_to_structs_by_macro_todo test -------------------"); let df = df!( "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); // 把df =>vec<bar> struct bar { date:naivedate, close:f64, open:f64, high:f64, low:f64, } impl bar { fn bar(date:naivedate, close:f64,open:f64,high:f64,low:f64) -> self{ bar{date,close,open,high,low} } } let bars: vec<bar> = dataframe_to_structs_todo!(df, bar,[date,close,open,high,low]); println!("df:{:?}",df); } fn structs_to_df_by_macro(){ println!(" ---------------- structs_to_df_by_macro test -----------------------"); struct bar { date:naivedate, close:f64, open:f64, high:f64, low:f64, } impl bar { fn new(date:naivedate, close:f64,open:f64,high:f64,low:f64) -> self{ bar{date,close,open,high,low} } } let test_bars:vec<bar> = vec![bar::new(naivedate::from_ymd_opt(2024,1,1).unwrap(),10.1,10.12,10.2,9.99), bar::new(naivedate::from_ymd_opt(2024,1,2).unwrap(),10.2,10.22,10.3,10.1)]; let df = structs_to_dataframe!(test_bars, [date,close,open,high,low]).unwrap(); println!("df:{:?}",df); } // polars: version 0.41.3 =>work; version0.42 => no work! // fn df_to_structs_by_iter_version_0_4_1(){ // println!("---------------df_to_structs_by_iter test----------------"); // // toml :features => "dtype-struct" // let now = instant::now(); // #[derive(debug, clone)] // struct bar { // code :string, // date:naivedate, // close:f64, // open:f64, // high:f64, // low:f64, // } // impl bar { // fn new(code:string,date:naivedate, close:f64,open:f64,high:f64,low:f64) -> self{ // bar{code,date,close,open,high,low} // } // } // let df = df!( // "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], // "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), // naivedate::from_ymd_opt(2015, 3, 15).unwrap(), // naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], // "close" => &[1.21,1.22,1.23], // "open" => &[1.22,1.21,1.23], // "high" => &[1.22,1.25,1.24], // "low" => &[1.19, 1.20,1.21], // ).unwrap(); // let mut bars:vec<bar> = vec::new(); // let rows_data = df.into_struct("bars"); // let start_date = naivedate::from_ymd_opt(1970, 1, 2).unwrap(); // for row_data in &rows_data { // let code = row_data.get(0).unwrap(); // let mut new_code = "".to_string(); // if let &anyvalue::string(value) = code{ // new_code = value.to_string(); // } // let mut new_date = naivedate::from_ymd_opt(2000,1,1).unwrap(); // let since_days = start_date.signed_duration_since(naivedate::from_ymd_opt(1,1,1).unwrap()); // let date = row_data.get(1).unwrap(); // if let &anyvalue::date(dt) = date { // let tmp_date = naivedate::from_num_days_from_ce_opt(dt).unwrap(); // new_date = tmp_date.checked_add_signed(since_days).unwrap(); // } // let open =row_data[3].extract::<f64>().unwrap(); // let high = row_data[4].extract::<f64>().unwrap(); // let close =row_data[2].extract::<f64>().unwrap(); // let low = row_data[5].extract::<f64>().unwrap(); // bars.push(bar::new(new_code,new_date,close,open,high,low)); // } // println!("df_to_structs2 => structchunk : cost time :{:?}",now.elapsed().as_secs_f32()); // println!("bars :{:?}",bars); // } //polars version >=0.42 fn df_to_structs_by_iter_version_0_4_2(){ println!("---------------df_to_structs_by_iter_version_0_4_2 test----------------"); // toml :features => "dtype-struct" let now = instant::now(); #[derive(debug, clone)] struct bar { code :string, date:naivedate, close:f64, open:f64, high:f64, low:f64, } impl bar { fn new(code:string,date:naivedate, close:f64,open:f64,high:f64,low:f64) -> self{ bar{code,date,close,open,high,low} } } let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let mut bars:vec<bar> = vec::new(); let rows = df.into_struct("bars").into_series(); let start_date = naivedate::from_ymd_opt(1970, 1, 2).unwrap(); for i in 0..rows.len(){ let row_values = &rows.get(i).unwrap(); //println!("i:{} row_values:{}",i,row_values); let values:vec<anyvalue> = row_values._iter_struct_av().map(|v|v).collect(); let code = &values[0]; let mut new_code = "".to_string(); if let &anyvalue::string(value) = &code{ new_code = value.to_string(); } let mut new_date = naivedate::from_ymd_opt(2000,1,1).unwrap(); let since_days = start_date.signed_duration_since(naivedate::from_ymd_opt(1,1,1).unwrap()); let date = &values[1]; if let &anyvalue::date(dt) = date { let tmp_date = naivedate::from_num_days_from_ce_opt(dt).unwrap(); new_date = tmp_date.checked_add_signed(since_days).unwrap(); } let open = values[3].extract::<f64>().unwrap(); let high = values[4].extract::<f64>().unwrap(); let close = values[2].extract::<f64>().unwrap(); let low = values[5].extract::<f64>().unwrap(); //println!("code :{},date:{} open:{} high:{} close:{} low:{}",new_code,date,open,high,close,low); bars.push(bar::new(new_code,new_date,close,open,high,low)); } println!("df_to_structs_by_iter_version_0_4_2 : cost time :{:?}",now.elapsed().as_secs_f32()); println!("bars :{:?}",bars); } fn df_to_structs_by_zip(){ println!("-----------df_to_structs_by_zip test --------------------"); // 同样适用df -> struct ,tuple,hashmap 等 let now = instant::now(); #[derive(debug, clone)] struct bar { code :string, date:naivedate, close:f64, open:f64, high:f64, low:f64, } impl bar { fn new(code:string,date:naivedate, close:f64,open:f64,high:f64,low:f64) -> self{ bar{code,date,close,open,high,low} } } let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let bars : vec<bar> = df["code"].str().unwrap().iter() .zip(df["date"].date().unwrap().as_date_iter()) .zip(df["close"].f64().unwrap().iter()) .zip(df["open"].f64().unwrap().iter()) .zip(df["high"].f64().unwrap().iter()) .zip(df["low"].f64().unwrap().iter()) .map(|(((((code,date),close),open),high),low)| bar::new(code.unwrap().to_string(), date.unwrap(), close.unwrap(), open.unwrap(), high.unwrap(), low.unwrap())).collect(); println!("df_to_structs_by_zip => zip : cost time :{:?} seconds!",now.elapsed().as_secs_f32()); println!("bars :{:?}",bars); //izip! from itertools --其它参考--,省各种复杂的括号! //use itertools::izip; //izip!(code, date, close, open,high,low).collect::<vec<_>>() // vec of 4-tuples } fn df_to_vec_tuples_by_izip(){ println!("-------------df_to_tuple_by_izip test---------------"); use itertools::izip; // in my real code this is generated from two joined dfs. let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" => &[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let mut dates = df.column("date").unwrap().date().unwrap().as_date_iter(); let mut codes = df.column("code").unwrap().str().unwrap().iter(); let mut closes = df.column("close").unwrap().f64().unwrap().iter(); let mut tuples = vec::new(); for (date, code, close) in izip!(&mut dates, &mut codes, &mut closes) { //println!("{:?} {:?} {:?}", date.unwrap(), code.unwrap(), close.unwrap()); tuples.push((date.unwrap(),code.unwrap(),close.unwrap())); } // 或这种方式 let tuples2 = izip!(&mut dates, &mut codes, &mut closes).collect::<vec<_>>(); println!("tuples :{:?}",tuples); println!("tuples2 :{:?}",tuples2); } fn series_to_vec(){ println!("------------series_to_vec test-----------------------"); let df = df!( "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], ).unwrap(); let vec :vec<option<naivedate>>= df["date"].date().unwrap().as_date_iter().collect(); println!("vec :{:?}",vec) } fn series_to_vec2(){ println!("------------series_to_vec2 test----------------------"); let df = df!("lang" =>&["rust","go","julia"],).unwrap(); let vec:vec<option<&str>> = df["date"].str().unwrap() .into_iter() .map(|s| match s{ some(v_) => some(v_), _ => none, }).collect(); println!("vec:{:?}",vec); } fn structs_in_df(){ println!("-----------structs_in_df test -----------------"); // feature => dtype-struct let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap() .into_struct("bars") .into_series(); println!("{}", &df); // how to get series from struct column? let out = df.struct_().unwrap().field_by_name("close").unwrap(); println!("out :{}",out); // how to get struct value in df } fn list_in_df(){ println!("-------------list_in_df test ------------------------------"); let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let lst = df["close"].as_list().get(0).unwrap(); println!("lst :{:?}",lst); } fn serialize_df_to_json(){ println!("--------------- serialize_df_to_json test -----------------------"); // toml features => serde let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); let df_json = serde_json::to_value(&df).unwrap(); println!("df_json {df_json}"); } fn serialize_df_to_binary_todo(){ println!("---------serialize_df_to_binary_todo test -------------"); // toml features => serde let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); // todo //let df_binary = serde_json::to_value(&df).unwrap(); //println!("df_json {df_binary}"); } fn df_to_ndarray(){ println!("-------------- df_to_ndarray test ------------------------"); // toml features =>ndarray let df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" =>&[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); // ndarray 化: 先去除非f64列 let df_filter = df.lazy().select([all().exclude(["code","date"])]).collect().unwrap(); let ndarray = df_filter.to_ndarray::<float64type>(indexorder::fortran).unwrap(); println!("ndarray :{}",ndarray); } fn df_apply(){ println!("--------------df_apply--------------------"); // df_apply: apply应用于df的一列 // 将其中的"code"列小写改成大写 // mut ! let mut df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" => &[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); // fn code_to_uppercase(code_val: &series) -> series { code_val.str() .unwrap() .into_iter() .map(|opt_code: option<&str>| { opt_code.map(|code: &str| code.to_uppercase()) }) .collect::<stringchunked>() .into_series() } // 对 code列进行str_to_upper操作 ,把本列的小写改成大写,有两种方法 // method 1 //df.apply("code", code_to_uppercase).unwrap(); // method 2 df.apply_at_idx(0, code_to_uppercase).unwrap(); // 对第0列,即首列进行操作 println!("df {}",df); } fn write_read_parquet_files(){ println!("------------ write_read_parquet_files test -------------------------"); // features =>parquet let mut df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" => &[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); write_parquet(&mut df); let df_ = read_parquet("600036sh.parquet"); let _df_ = scan_parquet("600036sh.parquet").select([all()]).collect().unwrap(); assert_eq!(df,df_); assert_eq!(df,_df_); println!("pass write_read parquet test!"); fn write_parquet(df : &mut dataframe){ let mut file = std::fs::file::create("600036sh.parquet").unwrap(); parquetwriter::new(&mut file).finish(df).unwrap(); } fn read_parquet(filepath:&str) ->dataframe{ let mut file = std::fs::file::open(filepath).unwrap(); let df = parquetreader::new(&mut file).finish().unwrap(); df } fn scan_parquet(filepath:&str) ->lazyframe{ let args = scanargsparquet::default(); let lf = lazyframe::scan_parquet(filepath, args).unwrap(); lf } } fn date_to_str_in_column(){ println!("---------------date_t0_str test----------------------"); // feature => temporal let mut df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" => &[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], ).unwrap(); // 增加一列,把date -> date_str let df = df .clone() .lazy() .with_columns([cols(["date"]).dt().to_string("%y-%h-%d").alias("date_str")]) .collect() .unwrap(); println!("df:{}",df); } fn when_logicial_in_df(){ println!("------------------when_condition_in_df test----------------------"); let df = df!("name" =>&["c","julia","go","python","rust","c#","matlab"], "run-time"=>&[1.0,1.11,1.51,3.987,1.01,1.65,2.11]).unwrap(); // 当运行速度要在[1.0,1.5]之间为true,其它为false let df_conditional = df .clone() .lazy() .select([ col("run-time"), when(col("run-time").lt_eq(1.50).and(col("run-time").gt_eq(1.0))) .then(lit(true)) .otherwise(lit(false)) .alias("speed_conditional"), ]) .collect().unwrap(); println!("{}", &df_conditional); } fn str_to_datetime_date_cast_in_df(){ println!("--------------str_to_datetime_date_cast_in_df test---------------------------"); // features => strings 否则str()有问题! let df = df!( "custom" => &["tom","jack","rose"], "login" => &["2024-08-14","2024-08-12","2023-08-09"],//首次登陆日期 "order" => &["2024-08-14 10:15:32","2024-08-14 11:22:32","2024-08-14 14:12:52"],//下单时间 "send" => &["2024-08-15 10:25:38","2024-08-15 14:28:38","2024-08-16 09:07:32"],//快递时间 ).unwrap(); let out = df .lazy() .with_columns([col("login").str().to_date(strptimeoptions::default()).alias("login_dt")]) .with_columns([col("login").str().to_datetime( some(timeunit::microseconds), none, strptimeoptions::default(), lit("raise")).alias("login_dtime")]) .with_columns([ col("order").str().strptime( datatype::datetime(timeunit::milliseconds, none), strptimeoptions::default(), lit("raise"), ).alias("order_dtime"), col("send").str().strptime( datatype::datetime(timeunit::milliseconds, none), strptimeoptions::default(), lit("raise"), // raise an error if the parsing fails ).alias("send_dtime"), ]) .with_columns([(col("send_dtime") - col("order_dtime")) .alias("duration(seconds)") .dt() .total_seconds()]) .collect().unwrap(); println!("out :{}",out); } fn unnest_struct_in_df(){ println!("--------------- unnest_struct_in_df test---------------------"); // unnest() =>将dataframe中struct列执行展开操作 // 生成带struct的dataframe let mut df: dataframe = df!("company" => &["ailibaba", "baidu"], "profit" => &[777277778.0, 86555555.9]).unwrap(); let series = df.clone().into_struct("info").into_series(); let mut _df = df.insert_column(0, series).unwrap(); println!("_df :{}",df); // unnest() <=> into_struct let out = df.lazy() .with_column(col("info").struct_().rename_fields(vec!["co.".to_string(), "pl".to_string()])) // 将struct所有字段展开 .unnest(["info"]) .collect() .unwrap(); println!("out :{}", out); // _df :shape: (2, 3) // ┌───────────────────────────┬──────────┬──────────────┐ // │ info ┆ company ┆ profit │ // │ --- ┆ --- ┆ --- │ // │ struct[2] ┆ str ┆ f64 │ // ╞═══════════════════════════╪══════════╪══════════════╡ // │ {"ailibaba",7.77277778e8} ┆ ailibaba ┆ 7.77277778e8 │ // │ {"baidu",8.6556e7} ┆ baidu ┆ 8.6556e7 │ // └───────────────────────────┴──────────┴──────────────┘ // out :shape: (2, 4) // ┌──────────┬──────────────┬──────────┬──────────────┐ // │ co. ┆ pl ┆ company ┆ profit │ // │ --- ┆ --- ┆ --- ┆ --- │ // │ str ┆ f64 ┆ str ┆ f64 │ // ╞══════════╪══════════════╪══════════╪══════════════╡ // │ ailibaba ┆ 7.77277778e8 ┆ ailibaba ┆ 7.77277778e8 │ // │ baidu ┆ 8.6556e7 ┆ baidu ┆ 8.6556e7 │ // └──────────┴──────────────┴──────────┴──────────────┘ } fn as_struct_in_df(){ println!("---------- as_struct_in_df test ----------------------"); // features = >lazy let df: dataframe = df!("company" => &["ailibaba", "baidu"], "profit" => &[777277778.0, 86555555.9]).unwrap(); // as_struct: 生成相关struct列 let _df = df.clone().lazy() .with_columns( [as_struct(vec![col("company"),col("profit")]) .alias("info")]) .collect() .unwrap(); let df_ = df.clone().lazy() .with_columns( [as_struct(vec![col("*")]) .alias("info")]) .collect() .unwrap(); assert_eq!(_df,df_); println!("df :{}",_df); // df :shape: (2, 3) // ┌──────────┬──────────────┬───────────────────────────┐ // │ company ┆ profit ┆ info │ // │ --- ┆ --- ┆ --- │ // │ str ┆ f64 ┆ struct[2] │ // ╞══════════╪══════════════╪═══════════════════════════╡ // │ ailibaba ┆ 7.77277778e8 ┆ {"ailibaba",7.77277778e8} │ // │ baidu ┆ 8.6556e7 ┆ {"baidu",8.6556e7} │ // └──────────┴──────────────┴───────────────────────────┘ } fn struct_apply_in_df(){ println!("------------ struct_apply_in_df test---------------------"); // features => "dtype-struct" let df = df!( "lang" => &["julia", "go", "rust","c","c++"], "ratings" => &["aaaa", "aaa", "aaaaa","aaaa","aaa"], "users" =>&[201,303,278,99,87], "references"=>&[5,6,9,4,1] ).unwrap(); // 需求:生成一列struct {lang,ratings,users},并应用apply对struct进行操作,具体见表: let out = df .lazy() .with_columns([ // 得到 struct 列 as_struct(vec![col("lang"), col("ratings"),col("users")]) // 应用 apply .apply( |s| { // 从series得到struct let ss = s.struct_().unwrap(); // 拆出 series let s_lang = ss.field_by_name("lang").unwrap(); let s_ratings = ss.field_by_name("ratings").unwrap(); let s_users = ss.field_by_name("users").unwrap(); // downcast the `series` to their known type let _s_lang = s_lang.str().unwrap(); let _s_ratings = s_ratings.str().unwrap(); let _s_users = s_users.i32().unwrap(); // zip series` let out: stringchunked = _s_lang .into_iter() .zip(_s_ratings) .zip(_s_users) .map(|((opt_lang, opt_rating),opt_user)| match (opt_lang, opt_rating,opt_user) { (some(la), some(ra),some(us)) => some(format!("{}-{}-{}",la,ra,us)), _ => none, }) .collect(); ok(some(out.into_series())) }, getoutput::from_type(datatype::string), ) .alias("links-three"), ]) .collect().unwrap(); println!("{}", out); // shape: (5, 5) // ┌───────┬─────────┬───────┬────────────┬────────────────┐ // │ lang ┆ ratings ┆ users ┆ references ┆ links-three │ // │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ // │ str ┆ str ┆ i32 ┆ i32 ┆ str │ // ╞═══════╪═════════╪═══════╪════════════╪════════════════╡ // │ julia ┆ aaaa ┆ 201 ┆ 5 ┆ julia-aaaa-201 │ // │ go ┆ aaa ┆ 303 ┆ 6 ┆ go-aaa-303 │ // │ rust ┆ aaaaa ┆ 278 ┆ 9 ┆ rust-aaaaa-278 │ // │ c ┆ aaaa ┆ 99 ┆ 4 ┆ c-aaaa-99 │ // │ c++ ┆ aaa ┆ 87 ┆ 1 ┆ c++-aaa-87 │ // └───────┴─────────┴───────┴────────────┴────────────────┘ } fn create_list_in_df(){ // polars中list的元素可以是不同的类型,对应datatype::object. struct info{ code :string, is_h :bool, } impl info{ pub fn new(code:string,is_h:bool) -> self{ self{code:code,is_h:is_h} } } // 需要注意,一般自定义类型,如果不实现namefrom trait,是不能放在dataframe中去的。 // list元素如何在df!时生成? // data不可以vec<vec<f64>>模式 // 注:内部两列close数据可以不一样长。 let data = vec![series::new("close",[13.2,14.2,10.3]), series::new("close",[13.1,14.1,15.1,16.1,15.1,19.8,20.1])]; let code = vec!["600036sh","600000sh"]; // info不可以是vec<info>模式,因为info模式没有实现namefrom trait let info = [info::new("600036".to_string(),true), info::new("600000".to_string(),true)]; // 以下不可以 //let df = df!("data"=>data, "code" =>code,"info" =>info).unwrap(); let df = df!("data"=>data, "code" =>code).unwrap(); println!("df :{}",df); // df :shape: (2, 2) // ┌──────────────────────┬──────────┐ // │ data ┆ code │ // │ --- ┆ --- │ // │ list[f64] ┆ str │ // ╞══════════════════════╪══════════╡ // │ [13.2, 14.2, … 15.2] ┆ 600036sh │ // │ [13.1, 14.1, … 15.1] ┆ 600000sh │ // └──────────────────────┴──────────┘ //如何取出list 列中的值; 比如第2行,第1列的数据 let values= &df["data"].get(1).unwrap(); let value = match &values { &anyvalue::list(s) =>{ let tmp = s.get(0).unwrap(); let val = tmp.extract::<f64>().unwrap(); some(val) }, _ => none, }; println!("value:{:?}",value); } // fn eval_in_df(){ println!("----------- eval_in_df test ----------------------------"); //feature => list_eval let mut df = df!( "code" => &["600036.sh".to_string(),"600036.sh".to_string(),"600036.sh".to_string()], "date" => &[naivedate::from_ymd_opt(2015, 3, 14).unwrap(), naivedate::from_ymd_opt(2015, 3, 15).unwrap(), naivedate::from_ymd_opt(2015, 3, 16).unwrap(),], "close" => &[1.21,1.22,1.23], "open" => &[1.22,1.21,1.23], "high" => &[1.22,1.25,1.24], "low" => &[1.19, 1.20,1.21], "comments" =>&["666","very well!","8888"], ).unwrap(); // col(""):表示column列中的每一个元素 // eval:对list中元素执行表达式任务,比如排序,类型转换等等 // eval:基本上前面会有一个list() let out = df.lazy() .with_columns([ col("comments") .str() .split(lit(" ")) .list() .eval(col("") .cast(datatype::int64) .is_null(), false) // false:是指是否并行,这里设置为false .list() .sum() .alias("sum")]) .collect().unwrap(); println!("{}", &out); // shape: (3, 8) // ┌───────────┬────────────┬───────┬──────┬──────┬──────┬────────────┬─────┐ // │ code ┆ date ┆ close ┆ open ┆ high ┆ low ┆ comments ┆ sum │ // │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ // │ str ┆ date ┆ f64 ┆ f64 ┆ f64 ┆ f64 ┆ str ┆ u32 │ // ╞═══════════╪════════════╪═══════╪══════╪══════╪══════╪════════════╪═════╡ // │ 600036.sh ┆ 2015-03-14 ┆ 1.21 ┆ 1.22 ┆ 1.22 ┆ 1.19 ┆ 666 ┆ 0 │ // │ 600036.sh ┆ 2015-03-15 ┆ 1.22 ┆ 1.21 ┆ 1.25 ┆ 1.2 ┆ very well! ┆ 2 │ // │ 600036.sh ┆ 2015-03-16 ┆ 1.23 ┆ 1.23 ┆ 1.24 ┆ 1.21 ┆ 8888 ┆ 0 │ // └───────────┴────────────┴───────┴──────┴──────┴──────┴────────────┴─────┘ } // regex fn array_in_df(){ //todo! }
